日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka学习(一)-------- Quickstart

發(fā)布時間:2023/12/2 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka学习(一)-------- Quickstart 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

參考官網(wǎng):http://kafka.apache.org/quickstart

一、下載Kafka

官網(wǎng)下載地址 http://kafka.apache.org/downloads

截至2019年7月8日 最新版本為 2.3.0 2.12為編譯的scala版本 2.3.0為kafka版本

  • Scala 2.12 ?- kafka_2.12-2.3.0.tgz (asc, sha512)

    解壓 > tar -xzf kafka_2.12-2.3.0.tgz > cd kafka_2.12-2.3.0

二、啟動服務(wù)

要先啟動zookeeper kafka內(nèi)置了一個 也可以不用

> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...

三、創(chuàng)建topic

replication-factor為1 partitions為1 > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 查看topic > bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test

也可以不創(chuàng)建topic 設(shè)置自動創(chuàng)建 當(dāng)publish的時候

四、發(fā)送消息

用command line client 進(jìn)行測試 一行就是一條消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message

五、消費者

command line consumer 可以接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message

六、設(shè)置多broker集群

單broker沒有意思 我們可以設(shè)置三個broker

首先為每個broker 復(fù)制配置文件

> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties

然后編輯

config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-2

broker.id是唯一的 cluster中每一個node的名字 我們在same machine上 所有要設(shè)置listeners和log.dirs 以防沖突

建一個topic 一個partitions 三個replication-factor

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic 用describe看看都是什么情況 > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
  • 有幾個概念 :

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.

  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

    ?

剛才那個topic > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

發(fā)送 接收

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C

試一下容錯 fault-tolerance

> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564看一下變化:Leader換了一個 因為1被干掉了 > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0 還是收到了消息 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C

七、使用kafka import/export data

剛才都是console 的數(shù)據(jù),其他的sources other systems呢 用Kafka Connect

弄一個數(shù)據(jù) > echo -e "foo\nbar" > test.txt 啟動 指定配置文件 > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 驗證一下 > more test.sink.txt foo bar 消費者端 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} ... 可以繼續(xù)寫入 > echo Another line>> test.txt

八、使用Kafka Streams

http://kafka.apache.org/22/documentation/streams/quickstart

WordCountDemo

https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

代碼片段

// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long();// Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input",Consumed.with(stringSerde, stringSerde);KTable<String, Long> wordCounts = textLines// Split each text line, by whitespace, into words..flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))// Group the text words as message keys.groupBy((key, value) -> value)// Count the occurrences of each word (message key)..count()// Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

建一個 Kafka producer 指定input topic output topic

> bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 1 \--topic streams-wordcount-output \--config cleanup.policy=compact Created topic "streams-wordcount-output".

啟動WordCount demo application

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

啟動一個生產(chǎn)者寫數(shù)據(jù)

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka hello kafka streams

啟動一個消費者接數(shù)據(jù)

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerall 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 kafka 1

轉(zhuǎn)載于:https://www.cnblogs.com/tree1123/p/11150927.html

總結(jié)

以上是生活随笔為你收集整理的Kafka学习(一)-------- Quickstart的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。