日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等

發(fā)布時間:2024/9/27 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.12.Flink Kafka-Connector詳解
1.12.1.Kafka Consumer消費(fèi)策略設(shè)置
1.12.2.Kafka Consumer的容錯
1.12.3.動態(tài)加載Topic
1.12.4.Kafka Consumers Offset 自動提交
1.12.5.Kafka Producer
1.12.6.Kafka Producer的容錯-Kafka 0.9 and 0.10
1.12.7.Kafka Producer的容錯-Kafka 0.11

1.12.Flink Kafka-Connector詳解

?Kafka中的partition機(jī)制和Flink的并行度機(jī)制深度結(jié)合。
?Kafka可以作為Flink的source和sink
?任務(wù)失敗,通過設(shè)置kafka的offset來恢復(fù)應(yīng)用

import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;/*** kafkaSink** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingKafkaSink {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig().setCheckpointTimeout(60000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設(shè)置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));DataStreamSource<String> text = env.socketTextStream("hadoop100", 9001, "\n");String brokerList = "hadoop110:9092";String topic = "t1";Properties prop = new Properties();prop.setProperty("bootstrap.servers",brokerList);//第一種解決方案,設(shè)置FlinkKafkaProducer011里面的事務(wù)超時時間//設(shè)置事務(wù)超時時間//prop.setProperty("transaction.timeout.ms",60000*15+"");//第二種解決方案,設(shè)置kafka的最大事務(wù)超時時間//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());//使用僅一次語義的kafkaProducerFlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);text.addSink(myProducer);env.execute("StreamingFromCollection");} } import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Properties;/*** kafkaSource** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingKafkaSource {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig().setCheckpointTimeout(60000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設(shè)置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String topic = "t1";Properties prop = new Properties();prop.setProperty("bootstrap.servers","hadoop110:9092");prop.setProperty("group.id","con1");FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);myConsumer.setStartFromGroupOffsets();//默認(rèn)消費(fèi)策略DataStreamSource<String> text = env.addSource(myConsumer);text.print().setParallelism(1);env.execute("StreamingFromCollection");} }

Scala案例:

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingKafkaSinkScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設(shè)置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));val text = env.socketTextStream("hadoop100",9001,'\n')val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","hadoop110:9092")//第一種解決方案,設(shè)置FlinkKafkaProducer011里面的事務(wù)超時時間//設(shè)置事務(wù)超時時間//prop.setProperty("transaction.timeout.ms",60000*15+"");//第二種解決方案,設(shè)置kafka的最大事務(wù)超時時間//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());//使用支持僅一次語義的形式val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)text.addSink(myProducer)env.execute("StreamingFromCollectionScala")}} import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingKafkaSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設(shè)置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","hadoop110:9092")prop.setProperty("group.id","con1")val myConsumer = new FlinkKafkaConsumer011[String](topic,new SimpleStringSchema(),prop)val text = env.addSource(myConsumer)text.print()env.execute("StreamingFromCollectionScala")}}

1.12.1.Kafka Consumer消費(fèi)策略設(shè)置

?setStartFromGroupOffsets() 【默認(rèn)消費(fèi)策略】

  • ?默認(rèn)讀取上次保存的offset信息
  • ?如果是應(yīng)用第一次啟動,讀取不到上次的offset信息,則會根據(jù)這個參數(shù)auto.offset.reset的值來進(jìn)行消費(fèi)數(shù)據(jù)。
    ?setStartFromEarliest()
  • ?從最早的數(shù)據(jù)開始進(jìn)行消費(fèi),忽略存儲的offset信息。
    ?setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)

1.12.2.Kafka Consumer的容錯

?當(dāng)checkpoint機(jī)制開啟的時候,Kafka Consumer會定期把kafka的offset信息還有其他operator的狀態(tài)信息一塊保存起來。當(dāng)job失敗重啟的時候,Flink會從最近一次的checkpoint中進(jìn)行恢復(fù)數(shù)據(jù),重新消費(fèi)kafka中的數(shù)據(jù)。
?為了能夠使用支持容錯的kafka Consumer,需要開啟checkpoint

  • ?env.enableCheckpointing(5000); // 每5s checkpoint一次

1.12.3.動態(tài)加載Topic

1.12.4.Kafka Consumers Offset 自動提交

?針對job是否開啟checkpoint來區(qū)分
?Checkpoint關(guān)閉時: 可以通過下面兩個參數(shù)配置

  • ?enable.auto.commit
  • ?auto.commit.interval.ms

?Checkpoint開啟時:當(dāng)執(zhí)行checkpoint的時候才會保存offset,這樣保證了kafka的offset和checkpoint的狀態(tài)偏移量保持一致。

  • ?可以通過這個參數(shù)設(shè)置setCommitOffsetsOnCheckpoints(boolean)
  • ?這個參數(shù)默認(rèn)就是true。表示在checkpoint的時候提交offset
  • ?此時,kafka中的自動提交機(jī)制就會被忽略

1.12.5.Kafka Producer

1.12.6.Kafka Producer的容錯-Kafka 0.9 and 0.10

?如果Flink開啟了checkpoint,針對FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的語義,還需要配置下面兩個參數(shù)

  • ?setLogFailuresOnly(false)
  • ?setFlushOnCheckpoint(true)

?注意:建議修改kafka 生產(chǎn)者的重試次數(shù)
retries【這個參數(shù)的值默認(rèn)是0】

1.12.7.Kafka Producer的容錯-Kafka 0.11

?如果Flink開啟了checkpoint,針對FlinkKafkaProducer011 就可以提供 exactly-once的語義
?但是需要選擇具體的語義

  • ?Semantic.NONE
  • ?Semantic.AT_LEAST_ONCE【默認(rèn)】
  • ?Semantic.EXACTLY_ONCE

總結(jié)

以上是生活随笔為你收集整理的1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。