2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
目錄
整合?Kafka
說明
Kafka特定配置
???????KafkaSoure
1.消費一個Topic數(shù)據(jù)
2.消費多個Topic數(shù)據(jù)
3.消費通配符匹配Topic數(shù)據(jù)
???????KafkaSink
???????整合?Kafka
說明
http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
Apache Kafka 是目前最流行的一個分布式的實時流消息系統(tǒng),給下游訂閱消費系統(tǒng)提供了并行處理和可靠容錯機制,現(xiàn)在大公司在流式數(shù)據(jù)的處理場景,Kafka基本是標配。
Structured Streaming很好的集成Kafka,可以從Kafka拉取消息,然后就可以把流數(shù)據(jù)看做一個DataFrame, 一張無限增長的大表,在這個大表上做查詢,Structured Streaming保證了端到端的 exactly-once,用戶只需要關心業(yè)務即可,不用費心去關心底層是怎么做的StructuredStreaming既可以從Kafka讀取數(shù)據(jù),又可以向Kafka 寫入數(shù)據(jù)
添加Maven依賴:
dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency>
?
- 注意:
目前僅支持Kafka 0.10.+版本及以上,底層使用Kafka New Consumer API拉取數(shù)據(jù) ???
- 消費位置
Kafka把生產(chǎn)者發(fā)送的數(shù)據(jù)放在不同的分區(qū)里面,這樣就可以并行進行消費了。每個分區(qū)里面的數(shù)據(jù)都是遞增有序的,跟structured commit log類似,生產(chǎn)者和消費者使用Kafka 進行解耦,消費者不管你生產(chǎn)者發(fā)送的速率如何,只要按照一定的節(jié)奏進行消費就可以了。每條消息在一個分區(qū)里面都有一個唯一的序列號offset(偏移量),Kafka 會對內(nèi)部存儲的消息設置一個過期時間,如果過期了,就會標記刪除,不管這條消息有沒有被消費。
Kafka 可以被看成一個無限的流,里面的流數(shù)據(jù)是短暫存在的,如果不消費,消息就過期滾動沒了。如果開始消費,就要定一下從什么位置開始。
?
1.earliest:從最起始位置開始消費,當然不一定是從0開始,因為如果數(shù)據(jù)過期就清掉了,所以可以理解為從現(xiàn)存的數(shù)據(jù)里最小位置開始消費;
2.latest:從最末位置開始消費;
3.per-partition assignment:對每個分區(qū)都指定一個offset,然后從offset位置開始消費;
當?shù)谝淮伍_始消費一個Kafka 流的時候,上述策略任選其一,如果之前已經(jīng)消費了,而且做了 checkpoint ,這時候就會從上次結束的位置開始繼續(xù)消費。目前StructuredStreaming和Flink框架從Kafka消費數(shù)據(jù)時,都支持上述的策略。
?
???????Kafka特定配置
從Kafka消費數(shù)據(jù)時,相關配置屬性可以通過帶有kafka.prefix的DataStreamReader.option進行設置,例如前面設置Kafka Brokers地址屬性:stream.option("kafka.bootstrap.servers", "host:port"),更多關于Kafka 生產(chǎn)者Producer Config配置屬和消費者Consumer Config配置屬性,參考文檔:
?生產(chǎn)者配置(Producer Configs):
http://kafka.apache.org/20/documentation.html#producerconfigs
?消費者配置(New Consumer Configs):
http://kafka.apache.org/20/documentation.html#newconsumerconfigs
注意以下Kafka參數(shù)屬性可以不設置,如果設置的話,Kafka source或者sink可能會拋出錯誤:
?
1)、group.id:Kafka source將會自動為每次查詢創(chuàng)建唯一的分組ID;
2)、auto.offset.reset:在將source選項startingOffsets設置為指定從哪里開始。結構化流管理內(nèi)部消費的偏移量,而不是依賴Kafka消費者來完成。這將確保在topic/partitons動態(tài)訂閱時不會遺漏任何數(shù)據(jù)。注意,只有在啟動新的流式查詢時才會應用startingOffsets,并且恢復操作始終會從查詢停止的位置啟動;
3)、key.deserializer/value.deserializer:Keys/Values總是被反序列化為ByteArrayDeserializer的字節(jié)數(shù)組,使用DataFrame操作顯式反序列化keys/values;
4)、key.serializer/value.serializer:keys/values總是使用ByteArraySerializer或StringSerializer進行序列化,使用DataFrame操作將keysvalues/顯示序列化為字符串或字節(jié)數(shù)組;
5)、enable.auto.commit:Kafka source不提交任何offset;
6)、interceptor.classes:Kafka source總是以字節(jié)數(shù)組的形式讀取key和value。使用ConsumerInterceptor是不安全的,因為它可能會打斷查詢;
?
???????KafkaSoure
Structured Streaming消費Kafka數(shù)據(jù),采用的是poll方式拉取數(shù)據(jù),與Spark Streaming中New Consumer API集成方式一致。從Kafka Topics中讀取消息,需要指定數(shù)據(jù)源(kafka)、Kafka集群的連接地址(kafka.bootstrap.servers)、消費的topic(subscribe或subscribePattern), 指定topic 的時候,可以使用正則來指定,也可以指定一個 topic 的集合。
官方提供三種方式從Kafka topic中消費數(shù)據(jù),主要區(qū)別在于每次消費Topic名稱指定,
1.消費一個Topic數(shù)據(jù)
?
?
2.消費多個Topic數(shù)據(jù)
?
3.消費通配符匹配Topic數(shù)據(jù)
?
從Kafka 獲取數(shù)據(jù)后Schema字段信息如下,既包含數(shù)據(jù)信息有包含元數(shù)據(jù)信息:
?
在實際開發(fā)時,往往需要獲取每條數(shù)據(jù)的消息,存儲在value字段中,由于是binary類型,需要轉換為字符串String類型;此外了方便數(shù)據(jù)操作,通常將獲取的key和value的DataFrame轉換為Dataset強類型,偽代碼如下:
?
從Kafka數(shù)據(jù)源讀取數(shù)據(jù)時,可以設置相關參數(shù),包含必須參數(shù)和可選參數(shù):
- ?必須參數(shù):kafka.bootstrap.servers和subscribe,可以指定開始消費偏移量assign。
?
?
- ?可選參數(shù):
?
?
???????KafkaSink
往Kafka里面寫數(shù)據(jù)類似讀取數(shù)據(jù),可以在DataFrame上調(diào)用writeStream來寫入Kafka,設置參數(shù)指定value,其中key是可選的,如果不指定就是null。
- 配置說明
將DataFrame寫入Kafka時,Schema信息中所需的字段:
?
需要寫入哪個topic,可以像上述所示在操作DataFrame 的時候在每條record上加一列topic字段指定,也可以在DataStreamWriter上指定option配置。
寫入數(shù)據(jù)至Kafka,需要設置Kafka Brokers地址信息及可選配置:
1.kafka.bootstrap.servers,使用逗號隔開【host:port】字符;
2.topic,如果DataFrame中沒有topic列,此處指定topic表示寫入Kafka Topic。
官方提供示例代碼如下:
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(四十九):Structured Streaming 整合 Kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十八):S
- 下一篇: 2021年大数据Spark(五十):St