Kafka学习(一)-------- Quickstart
參考官網(wǎng):http://kafka.apache.org/quickstart
一、下載Kafka
官網(wǎng)下載地址 http://kafka.apache.org/downloads
截至2019年7月8日 最新版本為 2.3.0 2.12為編譯的scala版本 2.3.0為kafka版本
Scala 2.12 ?- kafka_2.12-2.3.0.tgz (asc, sha512)
解壓 > tar -xzf kafka_2.12-2.3.0.tgz > cd kafka_2.12-2.3.0
二、啟動服務(wù)
要先啟動zookeeper kafka內(nèi)置了一個 也可以不用
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...三、創(chuàng)建topic
replication-factor為1 partitions為1 > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 查看topic > bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test也可以不創(chuàng)建topic 設(shè)置自動創(chuàng)建 當(dāng)publish的時候
四、發(fā)送消息
用command line client 進(jìn)行測試 一行就是一條消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message五、消費者
command line consumer 可以接收消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message六、設(shè)置多broker集群
單broker沒有意思 我們可以設(shè)置三個broker
首先為每個broker 復(fù)制配置文件
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties然后編輯
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-2broker.id是唯一的 cluster中每一個node的名字 我們在same machine上 所有要設(shè)置listeners和log.dirs 以防沖突
建一個topic 一個partitions 三個replication-factor
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic 用describe看看都是什么情況 > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0有幾個概念 :
"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
?
發(fā)送 接收
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C試一下容錯 fault-tolerance
> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564看一下變化:Leader換了一個 因為1被干掉了 > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0 還是收到了消息 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C七、使用kafka import/export data
剛才都是console 的數(shù)據(jù),其他的sources other systems呢 用Kafka Connect
弄一個數(shù)據(jù) > echo -e "foo\nbar" > test.txt 啟動 指定配置文件 > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 驗證一下 > more test.sink.txt foo bar 消費者端 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} ... 可以繼續(xù)寫入 > echo Another line>> test.txt八、使用Kafka Streams
http://kafka.apache.org/22/documentation/streams/quickstart
WordCountDemo
https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
代碼片段
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long();// Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input",Consumed.with(stringSerde, stringSerde);KTable<String, Long> wordCounts = textLines// Split each text line, by whitespace, into words..flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))// Group the text words as message keys.groupBy((key, value) -> value)// Count the occurrences of each word (message key)..count()// Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));建一個 Kafka producer 指定input topic output topic
> bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 1 \--topic streams-wordcount-output \--config cleanup.policy=compact Created topic "streams-wordcount-output".啟動WordCount demo application
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo啟動一個生產(chǎn)者寫數(shù)據(jù)
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka hello kafka streams啟動一個消費者接數(shù)據(jù)
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerall 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 kafka 1轉(zhuǎn)載于:https://www.cnblogs.com/tree1123/p/11150927.html
總結(jié)
以上是生活随笔為你收集整理的Kafka学习(一)-------- Quickstart的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 信息系统开发平台OpenExpressA
- 下一篇: OP AMP - 反馈理论在运放中的应用