日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink和Kafka入门

發布時間:2023/12/3 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink和Kafka入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

介紹

Apache Flink是用于分布式流和批處理數據處理的開源平臺。 Flink是具有多個API的流數據流引擎,用于創建面向數據流的應用程序。

Flink應用程序通常使用Apache Kafka進行數據輸入和輸出。 本文將指導您逐步使用Apache Flink和Kafka。

先決條件

  • Apache Kafka 0.9.x
  • 吉特
  • Maven 3.x或更高版本

創建您的Flink流項目

第一步是創建Java應用程序,最簡單的方法是使用flink-quickstart-java原型,該原型包含核心依賴關系和打包任務。 本文與Apache Flink快速入門示例相似,重點明確介紹了MapR Streams的數據輸入和輸出。

在此應用程序中,我們將創建兩個作業:

  • WriteToKafka :生成隨機字符串,然后使用Kafka Flink連接器及其Producer API將其發布到MapR Streams主題。
  • ReadFromKafka :讀取相同的主題,并使用Kafka Flink連接器及其使用方在標準輸出中顯示消息。 API。

完整項目可在GitHub上找到:

  • Flink和Kakfa應用

讓我們使用Apache Maven創建項目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false

Maven將創建以下結構:

tree kafka-flink-101/ kafka-flink-101/ ├── pom.xml └── src└── main├── java│?? └── com│?? └── grallandco│?? └── demos│?? ├── BatchJob.java│?? ├── SocketTextStreamWordCount.java│?? ├── StreamingJob.java│?? └── WordCount.java└── resources└── log4j.properties7 directories, 6 files

該項目被配置為創建一個Jar文件,該文件包含您的flink項目代碼,還包括運行該文件所需的所有依賴項。

該項目包含其他一些示例工作,本文不需要它們,您可以將其用于教育目的,也可以將其從項目中刪除。

添加Kafka連接器

打開pom.xml并將以下依賴項添加到您的項目中:

第一步,我們必須添加Flink Kafka連接器作為依賴項,以便我們可以使用Kafka接收器。 將此添加到“依賴項”部分的pom.xml文件中:

您現在必須添加Flink Kafka Connector依賴項才能使用Kafka接收器。 在<dependencies>元素中添加以下條目:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>

現在,Flink項目已準備就緒,可以通過Kafka連接器使用DataStream,因此您可以從Apache Kafka發送和接收消息。

安裝并啟動Kafka

下載Kafka,在終端中輸入以下命令:

curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz tar -xzf kafka_2.11-0.9.0.0.tgz cd kafka_2.11-0.9.0.0

Kafka使用ZooKeeper,如果您沒有運行Zookeeper,則可以使用以下命令啟動它:

./bin/zookeeper-server-start.sh config/zookeeper.properties

通過在新終端中運行以下命令來啟動Kafka代理:

./bin/kafka-server-start.sh config/server.properties

在另一個終端中,運行以下命令來創建一個名為flink-demo的Kafka主題:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo

使用Kafka工具將消息發布和使用到flink-demo主題。

制片人

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo

消費者

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning

在生產者窗口中,您可以發布一些消息,并在消費者窗口中查看它們。 我們將使用這些工具來跟蹤Kafka和Flink之間的交互。

編寫您的Flink應用程序

現在讓我們使用Flink Kafka Connector將消息發送到Kafka并使用它們。

制片人

生產者使用SimpleStringGenerator()類生成消息,并將該字符串發送到flink-demo主題。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}

SimpleStringGenerator()方法代碼在此處可用。

主要步驟是:

  • 在任何Flink應用程序的基礎上創建一個新的StreamExecutionEnvironment
  • 在應用程序環境中創建一個新的DataStream時, SimpleStringGenerator類將Flink中所有流數據源的Source接口實現SourceFunction 。
  • 將FlinkKafkaProducer09器添加到主題。

消費者

使用者只需從flink-demo主題中讀取消息,然后將它們打印到控制臺中即可。

public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}

主要步驟是:

  • 在任何Flink應用程序的基礎上創建一個新的StreamExecutionEnvironment
  • 使用消費者信息創建一組屬性,在此應用程序中,我們只能設置消費者group.id 。
  • 使用FlinkKafkaConsumer09從主題flink-demo獲取消息

生成并運行應用程序

讓我們直接從Maven(或從您最喜歡的IDE)運行應用程序。

1-建立專案:

$ mvn clean package

2-運行Flink生產者作業

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka

3-運行Flink消費者工作

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka

在終端中,您應該看到生產者生成的消息

現在,您可以在Flink群集上部署并執行此作業。

結論

在本文中,您學習了如何將Flink與kafka結合使用來寫入和讀取數據流。

翻譯自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html

總結

以上是生活随笔為你收集整理的Apache Flink和Kafka入门的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 啪啪精品| 一区二区中文 | 国产女同在线观看 | 国产无套丰满白嫩对白 | 久久免费视频6 | 青春草免费视频 | 女人天堂网| 国产绿帽一区二区三区 | 在线看国产 | 中文字幕网站 | 亚洲va国产天堂va久久 en | 老太婆av| 免费观看黄一级视频 | 欧美三日本三级少妇三级99观看视频 | 亚洲午夜精品一区 | 日本天堂网在线观看 | 久久综合影视 | 黄色一机片 | 国产大片一区二区三区 | 成人久久久精品乱码一区二区三区 | 嫩草伊人久久精品少妇av | 免费a在线 | 中文字幕一区二区三区5566 | 亚洲国产成人精品久久 | 国产免费一区,二区 | 误杀1电影免费观看高清完整版 | 成人妇女淫片aaaa视频 | 色婷婷综合久久久久中文字幕 | 国产精品久免费的黄网站 | 美女免费黄色 | 亚洲网站在线看 | 疯狂撞击丝袜人妻 | 天天操夜夜添 | 黄色在线播放视频 | 伊人影院综合在线 | 蜜桃视频在线观看一区二区 | 91黄色视屏| 亚洲免费不卡视频 | 天堂аⅴ在线最新版在线 | 国产黄色大片 | 中文字幕av片 | 黑人干亚洲女 | 热久久网站 | 亚洲三级在线看 | 日本人妻不卡一区二区三区中文字幕 | 人人九九精 | 久久久久久久久久久久久女过产乱 | 色综合天天操 | 欧美大色一区 | 国产日韩精品久久 | 国产精品粉嫩 | 国产精品系列在线观看 | 竹菊影视一区二区三区 | 在线不卡二区 | 久久精品人人做人人爽 | 国产精品成av人在线视午夜片 | 欧美无砖砖区免费 | 国产区小视频 | 一级毛片基地 | 肌肉猛男裸体gay网站免费 | 亚洲在线日韩 | 精品无码久久久久成人漫画 | 激情免费视频 | 自拍偷自拍亚洲精品播放 | 奇米影视在线 | 日av中文字幕 | 麻豆视频免费网站 | 成人综合激情 | 亚洲免费大片 | 亚洲中文字幕久久无码 | 91免费视频网站 | 一区不卡在线 | 欧美暧暧视频 | 一级黄色片国产 | 天天操夜夜撸 | 日本不卡视频一区二区三区 | 男女被到爽流尿 | 免费一二区 | 国产精品96久久久久久 | 色一情一乱一伦 | 一极黄色大片 | 精品久| 日韩av中文字幕在线播放 | 91麻豆精品国产 | 精品一区二区三区四区五区六区 | 99精品区 | av青青草原 | 日韩图片一区 | a一级黄色 | 黄色小视频免费 | 久草青青草 | 又黄又爽视频在线观看 | 日韩精品一卡二卡 | 国产午夜一区二区 | 日韩第一页 | 在线观看亚洲网站 | 99精品久久久久久久婷婷 | 国产国产精品 | 免费成人在线观看视频 |