kafka消费者和生产者为内/外网映射情况的配置
IP:??(請修改自己的ip地址)
????內網:172.18.10.10??????????
????外網:120.78.22.22
1.?先在/etc/hosts文件中添加解析記錄
????????????172.18.10.10????HostName
????????????????
2.?修改 kafka_2.11-2.0.0/config????server.properties
????????listeners=PLAINTEXT://0.0.0.0:9092
????????advertised.listeners=PLAINTEXT://120.78.22.22:9092
????????advertised.host.name=HostName??
有用戶密碼驗證的情況下
IP:??(請修改自己的ip地址)
????內網:172.18.10.10??????????
????外網:120.78.22.22
1.?先在/etc/hosts文件中添加解析記錄
????????????172.18.10.10????HostName
????????????????
2.?修改 kafka_2.11-2.0.0/config????server.properties
????????listeners=SASL_PLAINTEXT:://0.0.0.0:9092
????????advertised.listeners=SASL_PLAINTEXT:://120.78.22.22:9092
????????advertised.host.name=HostName????
?
代用戶密碼驗證的java客戶端連接
Properties props = new Properties();Map map = new Gson().fromJson(kafkaServer,Map.class);//集群地址,多個地址用","分隔props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,map.get("bootstrap.servers"));//設置消費者的group idprops.put("group.id", taskid+"");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); // props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule \ // "username="admin\" password=\"<api-secret>\";");props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"chinaums@000000\";");//如果為真,consumer所消費消息的offset將會自動的同步到zookeeper。如果消費者死掉時,由新的consumer使用繼續接替props.put("enable.auto.commit", "false");props.put("max.poll.records",3000);//consumer向zookeeper提交offset的頻率 // props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("auto.offset.reset","earliest");props.put("fetch.max.bytes",524288000);//反序列化props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // TopicPartition partition0 = new TopicPartition(topic, 0);// 訂閱topic,可以為多個用,隔開,此處訂閱了"test-partition-1", "test"這兩個主題consumer.subscribe(Arrays.asList(topic));總結
以上是生活随笔為你收集整理的kafka消费者和生产者为内/外网映射情况的配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis配置全解
- 下一篇: nginx生成自定义证书