日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

發(fā)布時間:2023/11/28 生活经验 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

整合?Kafka

說明

Kafka特定配置

???????KafkaSoure

1.消費一個Topic數(shù)據(jù)

2.消費多個Topic數(shù)據(jù)

3.消費通配符匹配Topic數(shù)據(jù)

???????KafkaSink


???????整合?Kafka

說明

http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

Apache Kafka 是目前最流行的一個分布式的實時流消息系統(tǒng),給下游訂閱消費系統(tǒng)提供了并行處理和可靠容錯機制,現(xiàn)在大公司在流式數(shù)據(jù)的處理場景,Kafka基本是標配。

Structured Streaming很好的集成Kafka,可以從Kafka拉取消息,然后就可以把流數(shù)據(jù)看做一個DataFrame, 一張無限增長的大表,在這個大表上做查詢,Structured Streaming保證了端到端的 exactly-once,用戶只需要關心業(yè)務即可,不用費心去關心底層是怎么做的StructuredStreaming既可以從Kafka讀取數(shù)據(jù),又可以向Kafka 寫入數(shù)據(jù)

添加Maven依賴:

dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency>

?

  • 注意:

目前僅支持Kafka 0.10.+版本及以上,底層使用Kafka New Consumer API拉取數(shù)據(jù) ???

  • 消費位置

Kafka把生產(chǎn)者發(fā)送的數(shù)據(jù)放在不同的分區(qū)里面,這樣就可以并行進行消費了。每個分區(qū)里面的數(shù)據(jù)都是遞增有序的,跟structured commit log類似,生產(chǎn)者和消費者使用Kafka 進行解耦,消費者不管你生產(chǎn)者發(fā)送的速率如何,只要按照一定的節(jié)奏進行消費就可以了。每條消息在一個分區(qū)里面都有一個唯一的序列號offset(偏移量),Kafka 會對內(nèi)部存儲的消息設置一個過期時間,如果過期了,就會標記刪除,不管這條消息有沒有被消費。

Kafka 可以被看成一個無限的流,里面的流數(shù)據(jù)是短暫存在的,如果不消費,消息就過期滾動沒了。如果開始消費,就要定一下從什么位置開始。

?

1.earliest:從最起始位置開始消費,當然不一定是從0開始,因為如果數(shù)據(jù)過期就清掉了,所以可以理解為從現(xiàn)存的數(shù)據(jù)里最小位置開始消費;

2.latest:從最末位置開始消費;

3.per-partition assignment:對每個分區(qū)都指定一個offset,然后從offset位置開始消費;

當?shù)谝淮伍_始消費一個Kafka 流的時候,上述策略任選其一,如果之前已經(jīng)消費了,而且做了 checkpoint ,這時候就會從上次結束的位置開始繼續(xù)消費。目前StructuredStreaming和Flink框架從Kafka消費數(shù)據(jù)時,都支持上述的策略。

?

???????Kafka特定配置

從Kafka消費數(shù)據(jù)時,相關配置屬性可以通過帶有kafka.prefix的DataStreamReader.option進行設置,例如前面設置Kafka Brokers地址屬性:stream.option("kafka.bootstrap.servers", "host:port"),更多關于Kafka 生產(chǎn)者Producer Config配置屬和消費者Consumer Config配置屬性,參考文檔:

?生產(chǎn)者配置(Producer Configs):

http://kafka.apache.org/20/documentation.html#producerconfigs

?消費者配置(New Consumer Configs):

http://kafka.apache.org/20/documentation.html#newconsumerconfigs

注意以下Kafka參數(shù)屬性可以不設置,如果設置的話,Kafka source或者sink可能會拋出錯誤:

?

1)、group.id:Kafka source將會自動為每次查詢創(chuàng)建唯一的分組ID;

2)、auto.offset.reset在將source選項startingOffsets設置為指定從哪里開始。結構化流管理內(nèi)部消費的偏移量,而不是依賴Kafka消費者來完成。這將確保在topic/partitons動態(tài)訂閱時不會遺漏任何數(shù)據(jù)。注意,只有在啟動新的流式查詢時才會應用startingOffsets,并且恢復操作始終會從查詢停止的位置啟動

3)、key.deserializer/value.deserializer:Keys/Values總是被反序列化為ByteArrayDeserializer的字節(jié)數(shù)組,使用DataFrame操作顯式反序列化keys/values;

4)、key.serializer/value.serializer:keys/values總是使用ByteArraySerializer或StringSerializer進行序列化,使用DataFrame操作將keysvalues/顯示序列化為字符串或字節(jié)數(shù)組;

5)、enable.auto.commit:Kafka source不提交任何offset;

6)、interceptor.classes:Kafka source總是以字節(jié)數(shù)組的形式讀取key和value。使用ConsumerInterceptor是不安全的,因為它可能會打斷查詢;

?

???????KafkaSoure

Structured Streaming消費Kafka數(shù)據(jù),采用的是poll方式拉取數(shù)據(jù),與Spark Streaming中New Consumer API集成方式一致。從Kafka Topics中讀取消息,需要指定數(shù)據(jù)源(kafka)、Kafka集群的連接地址(kafka.bootstrap.servers)、消費的topic(subscribe或subscribePattern), 指定topic 的時候,可以使用正則來指定,也可以指定一個 topic 的集合。

官方提供三種方式從Kafka topic中消費數(shù)據(jù),主要區(qū)別在于每次消費Topic名稱指定,

1.消費一個Topic數(shù)據(jù)

?

?

2.消費多個Topic數(shù)據(jù)

?

3.消費通配符匹配Topic數(shù)據(jù)

?

從Kafka 獲取數(shù)據(jù)后Schema字段信息如下,既包含數(shù)據(jù)信息有包含元數(shù)據(jù)信息:

?

在實際開發(fā)時,往往需要獲取每條數(shù)據(jù)的消息,存儲在value字段中,由于是binary類型,需要轉換為字符串String類型;此外了方便數(shù)據(jù)操作,通常將獲取的key和value的DataFrame轉換為Dataset強類型,偽代碼如下:

?

從Kafka數(shù)據(jù)源讀取數(shù)據(jù)時,可以設置相關參數(shù),包含必須參數(shù)和可選參數(shù):

  • ?必須參數(shù):kafka.bootstrap.servers和subscribe,可以指定開始消費偏移量assign。

?

?

  • ?可選參數(shù):

?

?

???????KafkaSink

往Kafka里面寫數(shù)據(jù)類似讀取數(shù)據(jù),可以在DataFrame上調(diào)用writeStream來寫入Kafka,設置參數(shù)指定value,其中key是可選的,如果不指定就是null。

  • 配置說明

將DataFrame寫入Kafka時,Schema信息中所需的字段:

?

需要寫入哪個topic,可以像上述所示在操作DataFrame 的時候在每條record上加一列topic字段指定,也可以在DataStreamWriter上指定option配置

寫入數(shù)據(jù)至Kafka,需要設置Kafka Brokers地址信息及可選配置:

1.kafka.bootstrap.servers,使用逗號隔開【host:port】字符;

2.topic,如果DataFrame中沒有topic列,此處指定topic表示寫入Kafka Topic。

官方提供示例代碼如下:

?

總結

以上是生活随笔為你收集整理的2021年大数据Spark(四十九):Structured Streaming 整合 Kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。