日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink常见流处理API

發布時間:2024/7/5 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink常见流处理API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink 流處理API的編程可以分為environment,source,transform,sink四大部分

1 Flink支持的數據類型

??在Flink底層因為要對所有的數據序列化,反序列化對數據進行傳輸,以便通過網絡傳送它們,或者從狀態后端、檢查點和保存點讀取它們。所以Flink要有一套自己的類型提取系統,就是TypeInformation機制。Flink使用類型信息的概念來表示數據類型,并為每個數據類型生成特定的序列化器、反序列化器和比較器。這里其實就是說在轉換過程中必須是他支持的數據類型才能轉換成TypeInformation。

基本上我們一般能夠用到的數據類型常見的都支持,如下:

??(1)Flink支持所有的Java和Scala基礎數據類型,Int, Double, Long, String, …

??(2)Java和Scala元組(Tuples),最多25個字段,不支持空字段

??(3)cala樣例類(case classes),最多22個字段,不支持空字段

??(4) Java簡單對象(POJOs)

??(5)Row具有任意數量字段的元組并支持空字段

??(6)Arrays, Lists, Maps, Enums, 等等

2 執行環境Environment

??Flink編程的第一步首先是創建一個執行環境,表示當前執行程序的上下文。Environment可以通過以下幾種方式構建

??(1)getExecutionEnvironment

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment 或 val env: ExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

??如果程序是獨立調用的,則此方法返回本地執行環境;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。如果沒有設置并行度,會以flink-conf.yaml中的配置為準,默認是1。

??(2)createLocalEnvironment

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

??返回本地執行環境,需要在調用時指定默認的并行度。

??(3)createRemoteEnvironment

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", YOURJobManagerHOST,"YOURPATH//wordcount.jar")

??返回集群執行環境,將Jar提交到遠程服務器。需要在調用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包。

3 Source

??(1)從集合讀取數據

val stream = env.fromCollection(List("a","b","c"))

??(2)從文件讀取數據

val stream = env.readTextFile("YOUR_FILE_PATH")

??(3)kafka消息隊列的數據作為來源

??需要引入kafka連接器的依賴:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version> </dependency>

??具體代碼如下:

val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest")val stream = env.addSource(new FlinkKafkaConsumer011[String]("topic_name", new SimpleStringSchema(), properties))

??(4)自定義Source

????除了以上的source數據來源,我們還可以自定義source。需要做的,只是傳入一個SourceFunction就可以。具體調用如下:

val stream = env.addSource( new MySource() )case class CustomSource(id:String,times:String) class MySource extends SourceFunction[CustomSource]{// running表示數據源是否還在正常運行var running: Boolean = trueoverride def cancel(): Unit = {running = false}override def run(ctx: SourceFunction.SourceContext[CustomSource]): Unit = {while(running){ctx.collect(CustomSource(UUID.randomUUID().toString,System.currentTimeMillis().toString))Thread.sleep(100)}} }

4 Transform

??(1)map:輸入一個元素,輸出一個元素,可以用來做一些清洗,轉換工作。DataStream → DataStream

val streamMap = stream.map { x => x * 2 }

??(2)flatMap:和Map相似,可以理解為將輸入的元素壓平,從而對輸出結果的數量不做要求,可以為0、1或者多個,多用于拆分操作。DataStream → DataStream

val streamFlatMap = stream.flatMap{x => x.split(" ") }

??(3)filter:過濾篩選,將所有符合判斷條件的結果集輸出,DataStream → DataStream

val streamFilter = stream.filter{x => x > 1 }

??(4)KeyBy:邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的,返回KeyedStream。DataStream -> KeyedStream

注意:以下類型無法作為key①POJO類,且沒有實現hashCode函數②任意形式的數組類型

dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple

??(5)滾動聚合算子(Rolling Aggregation)

對KeyedStream按指定字段滾動聚合并輸出每一次滾動聚合后的結果,常見的有sum(),min(),max(),minBy(),maxBy()等,KeyedStream → DataStream

min(),max(), minBy(),maxBy()這些算子可以針對KeyedStream的每一個支流做聚合

keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")

??min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含的最小值的元素(同樣元原理適用于max和maxBy)

??(6)fold:用一個初始的一個值,與其每個元素進行滾動合并操作。KeyedStream → DataStream

val result: DataStream[String] =keyedStream.fold("start")((str, i) => { str + "-" + i })

當應用于序列(1,2,3,4,5)時,發出序列“start-1”、“start-1-2”、“start-1-2”,…

??(6)reduce:一個分組數據流的聚合操作,合并當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。KeyedStream → DataStream

case class WC(val word: String, val count: Int)val wordCounts = stream.groupBy("word").reduce {(w1, w2) => new WC(w1.word, w1.count + w2.count) }

??(7) Split 和 Select

??Split :根據某些特征把一個DataStream拆分成兩個或者多個DataStream。DataStream → SplitStream

??Select:從一個SplitStream中獲取一個或者多個DataStream。SplitStream→DataStream

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")} )val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")

??(8) Connect和 CoMap、CoFlatMap

??Connect:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了同一個流中,內部依然保持各自的數據形式不發生任何變化,兩個流相互獨立。DataStream,DataStream → ConnectedStreams

??CoMap,CoFlatMap:作用于ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。ConnectedStreams → DataStream

someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)connectedStreams.map((_ : Int) => true,(_ : String) => false ) connectedStreams.flatMap((_ : Int) => true,(_ : String) => false )

??Connect與 Union 區別:①Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調整成為一樣的。② Connect只能操作兩個流,Union可以操作多個。

??(9)iterate

??在流程中創建一個反饋循環,將一個操作的輸出重定向到之前的操作。DataStream --> IterativeStream --> DataStream

initialStream.iterate {iteration => {val iterationBody = iteration.map {/*do something*/}(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))} }

??(10)extract timestamps

??提取記錄中的時間戳來跟需要事件時間的window一起發揮作用。DataStream --> DataStream

stream.assignTimestamps { timestampExtractor }

5 Sink

??官方提供了一部分的框架的sink。除此以外,需要用戶自定義實現sink。

Apache Kafka (source/sink) Apache Cassandra (sink) Amazon Kinesis Streams (source/sink) Elasticsearch (sink) Hadoop FileSystem (sink) RabbitMQ (source/sink) Apache NiFi (source/sink) Twitter Streaming API (source)Apache ActiveMQ (source/sink) Apache Flume (sink) Redis (sink) Akka (sink) Netty (source)

??(1)kafka

??需要添加依賴

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version> </dependency>

??主函數中添加sink

datastream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema()))

??(2)redis

??添加依賴

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version> </dependency>

??定義一個redis的mapper類,用于定義保存到redis時調用的命令:

class RedisExampleMapper extends RedisMapper[(String, String)]{override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")}override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2 }

??在主函數中調用:

val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

??(3)Elasticsearch

??添加依賴

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.10.0</version> </dependency>

??在主函數中調用:

import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimport org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requestsimport java.util.ArrayList import java.util.Listval input: DataStream[String] = ...val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("127.0.0.1", 9300, "http")) httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))val esSinkBuilder = new ElasticsearchSink.Builer[String](httpHosts,new ElasticsearchSinkFunction[String] {def createIndexRequest(element: String): IndexRequest = {val json = new java.util.HashMap[String, String]json.put("data", element)return Requests.indexRequest().index("my-index").type("my-type").source(json)}} )// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1)// provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory(restClientBuilder -> {restClientBuilder.setDefaultHeaders(...)restClientBuilder.setMaxRetryTimeoutMillis(...)restClientBuilder.setPathPrefix(...)restClientBuilder.setHttpClientConfigCallback(...)} )// finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build)

??(4)JDBC 自定義sink

??以mysql為例,添加依賴

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version> </dependency>

??添加MysqlJdbcSink

class MysqlJdbcSink() extends RichSinkFunction[(String, String)]{var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// open 主要是創建連接override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root")insertStmt = conn.prepareStatement("INSERT INTO mysqljdbcsink (id, name) VALUES (?, ?)")updateStmt = conn.prepareStatement("UPDATE mysqljdbcsink SET id = ? WHERE name = ?")}// 調用連接,執行sqloverride def invoke(value: (String, String), context: SinkFunction.Context[_]): Unit = {updateStmt.setString(1, value._1)updateStmt.setString(2, value._2)updateStmt.execute()if (updateStmt.getUpdateCount == 0) {insertStmt.setString(1, value._1)insertStmt.setString(2, value._2)insertStmt.execute()}}override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()} }

??主函數中調用

dataStream.addSink(new MysqlJdbcSink())

6 UDF函數

6.1 函數類(Function Classes)

??函數類:就是在Flink里面每一步運算,轉換,包括source和sink。每一個算子里面的參數都可以傳入一個所謂的函數類。就提供了更多更靈活的實現自己功能的方法。Flink暴露了所有udf函數的接口(實現方式為接口或者抽象類)。例如MapFunction, FilterFunction, ProcessFunction等等

??實現了FilterFunction接口如下:

class MyFilter extends FilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")} } val filterStream = stream.filter(new FlinkFilter)

??將函數實現成匿名類

val filterStream = stream.filter(new RichFilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")}} )

6.2 富函數(Rich Functions)

??“富函數”是DataStream API提供的一個函數類的接口,所有Flink函數類都有其Rich版本。它與常規函數的不同在于,可以獲取運行環境的上下文,并擁有一些生命周期方法,所以可以實現更復雜的功能。如RichMapFunction, RichFlatMapFunction,RichFilterFunction

??Rich Function有一個生命周期的概念。典型的生命周期方法有:

??①open()方法是rich function的初始化方法,當一個算子例如map或者filter被調用之前open()會被調用。

??②close()方法是生命周期中的最后一個調用的方法,做一些清理工作。

??③getRuntimeContext()方法提供了函數的RuntimeContext的一些信息,例如函數執行的并行度,任務的名字,以及state狀態

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {var subTaskIndex = 0override def open(configuration: Configuration): Unit = {subTaskIndex = getRuntimeContext.getIndexOfThisSubtask// 以下可以做一些初始化工作, }override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {if (in % 2 == subTaskIndex) {out.collect((subTaskIndex, in))} }override def close(): Unit = {// 以下做一些清理工作} } 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Flink常见流处理API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。