Flink常见流处理API
Flink 流處理API的編程可以分為environment,source,transform,sink四大部分
1 Flink支持的數據類型
??在Flink底層因為要對所有的數據序列化,反序列化對數據進行傳輸,以便通過網絡傳送它們,或者從狀態后端、檢查點和保存點讀取它們。所以Flink要有一套自己的類型提取系統,就是TypeInformation機制。Flink使用類型信息的概念來表示數據類型,并為每個數據類型生成特定的序列化器、反序列化器和比較器。這里其實就是說在轉換過程中必須是他支持的數據類型才能轉換成TypeInformation。
基本上我們一般能夠用到的數據類型常見的都支持,如下:
??(1)Flink支持所有的Java和Scala基礎數據類型,Int, Double, Long, String, …
??(2)Java和Scala元組(Tuples),最多25個字段,不支持空字段
??(3)cala樣例類(case classes),最多22個字段,不支持空字段
??(4) Java簡單對象(POJOs)
??(5)Row具有任意數量字段的元組并支持空字段
??(6)Arrays, Lists, Maps, Enums, 等等
2 執行環境Environment
??Flink編程的第一步首先是創建一個執行環境,表示當前執行程序的上下文。Environment可以通過以下幾種方式構建
??(1)getExecutionEnvironment
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment 或 val env: ExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment??如果程序是獨立調用的,則此方法返回本地執行環境;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。如果沒有設置并行度,會以flink-conf.yaml中的配置為準,默認是1。
??(2)createLocalEnvironment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)??返回本地執行環境,需要在調用時指定默認的并行度。
??(3)createRemoteEnvironment
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", YOURJobManagerHOST,"YOURPATH//wordcount.jar")??返回集群執行環境,將Jar提交到遠程服務器。需要在調用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包。
3 Source
??(1)從集合讀取數據
val stream = env.fromCollection(List("a","b","c"))??(2)從文件讀取數據
val stream = env.readTextFile("YOUR_FILE_PATH")??(3)kafka消息隊列的數據作為來源
??需要引入kafka連接器的依賴:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version> </dependency>??具體代碼如下:
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest")val stream = env.addSource(new FlinkKafkaConsumer011[String]("topic_name", new SimpleStringSchema(), properties))??(4)自定義Source
????除了以上的source數據來源,我們還可以自定義source。需要做的,只是傳入一個SourceFunction就可以。具體調用如下:
val stream = env.addSource( new MySource() )case class CustomSource(id:String,times:String) class MySource extends SourceFunction[CustomSource]{// running表示數據源是否還在正常運行var running: Boolean = trueoverride def cancel(): Unit = {running = false}override def run(ctx: SourceFunction.SourceContext[CustomSource]): Unit = {while(running){ctx.collect(CustomSource(UUID.randomUUID().toString,System.currentTimeMillis().toString))Thread.sleep(100)}} }4 Transform
??(1)map:輸入一個元素,輸出一個元素,可以用來做一些清洗,轉換工作。DataStream → DataStream
val streamMap = stream.map { x => x * 2 }??(2)flatMap:和Map相似,可以理解為將輸入的元素壓平,從而對輸出結果的數量不做要求,可以為0、1或者多個,多用于拆分操作。DataStream → DataStream
val streamFlatMap = stream.flatMap{x => x.split(" ") }??(3)filter:過濾篩選,將所有符合判斷條件的結果集輸出,DataStream → DataStream
val streamFilter = stream.filter{x => x > 1 }??(4)KeyBy:邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的,返回KeyedStream。DataStream -> KeyedStream
注意:以下類型無法作為key①POJO類,且沒有實現hashCode函數②任意形式的數組類型
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple??(5)滾動聚合算子(Rolling Aggregation)
對KeyedStream按指定字段滾動聚合并輸出每一次滾動聚合后的結果,常見的有sum(),min(),max(),minBy(),maxBy()等,KeyedStream → DataStream
min(),max(), minBy(),maxBy()這些算子可以針對KeyedStream的每一個支流做聚合
keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")??min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含的最小值的元素(同樣元原理適用于max和maxBy)
??(6)fold:用一個初始的一個值,與其每個元素進行滾動合并操作。KeyedStream → DataStream
val result: DataStream[String] =keyedStream.fold("start")((str, i) => { str + "-" + i })當應用于序列(1,2,3,4,5)時,發出序列“start-1”、“start-1-2”、“start-1-2”,…
??(6)reduce:一個分組數據流的聚合操作,合并當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。KeyedStream → DataStream
case class WC(val word: String, val count: Int)val wordCounts = stream.groupBy("word").reduce {(w1, w2) => new WC(w1.word, w1.count + w2.count) }??(7) Split 和 Select
??Split :根據某些特征把一個DataStream拆分成兩個或者多個DataStream。DataStream → SplitStream
??Select:從一個SplitStream中獲取一個或者多個DataStream。SplitStream→DataStream
val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")} )val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")??(8) Connect和 CoMap、CoFlatMap
??Connect:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了同一個流中,內部依然保持各自的數據形式不發生任何變化,兩個流相互獨立。DataStream,DataStream → ConnectedStreams
??CoMap,CoFlatMap:作用于ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。ConnectedStreams → DataStream
someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)connectedStreams.map((_ : Int) => true,(_ : String) => false ) connectedStreams.flatMap((_ : Int) => true,(_ : String) => false )??Connect與 Union 區別:①Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調整成為一樣的。② Connect只能操作兩個流,Union可以操作多個。
??(9)iterate
??在流程中創建一個反饋循環,將一個操作的輸出重定向到之前的操作。DataStream --> IterativeStream --> DataStream
initialStream.iterate {iteration => {val iterationBody = iteration.map {/*do something*/}(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))} }??(10)extract timestamps
??提取記錄中的時間戳來跟需要事件時間的window一起發揮作用。DataStream --> DataStream
stream.assignTimestamps { timestampExtractor }5 Sink
??官方提供了一部分的框架的sink。除此以外,需要用戶自定義實現sink。
Apache Kafka (source/sink) Apache Cassandra (sink) Amazon Kinesis Streams (source/sink) Elasticsearch (sink) Hadoop FileSystem (sink) RabbitMQ (source/sink) Apache NiFi (source/sink) Twitter Streaming API (source)Apache ActiveMQ (source/sink) Apache Flume (sink) Redis (sink) Akka (sink) Netty (source)??(1)kafka
??需要添加依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version> </dependency>??主函數中添加sink
datastream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema()))??(2)redis
??添加依賴
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version> </dependency>??定義一個redis的mapper類,用于定義保存到redis時調用的命令:
class RedisExampleMapper extends RedisMapper[(String, String)]{override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")}override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2 }??在主函數中調用:
val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))??(3)Elasticsearch
??添加依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.10.0</version> </dependency>??在主函數中調用:
import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimport org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requestsimport java.util.ArrayList import java.util.Listval input: DataStream[String] = ...val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("127.0.0.1", 9300, "http")) httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))val esSinkBuilder = new ElasticsearchSink.Builer[String](httpHosts,new ElasticsearchSinkFunction[String] {def createIndexRequest(element: String): IndexRequest = {val json = new java.util.HashMap[String, String]json.put("data", element)return Requests.indexRequest().index("my-index").type("my-type").source(json)}} )// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1)// provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory(restClientBuilder -> {restClientBuilder.setDefaultHeaders(...)restClientBuilder.setMaxRetryTimeoutMillis(...)restClientBuilder.setPathPrefix(...)restClientBuilder.setHttpClientConfigCallback(...)} )// finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build)??(4)JDBC 自定義sink
??以mysql為例,添加依賴
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version> </dependency>??添加MysqlJdbcSink
class MysqlJdbcSink() extends RichSinkFunction[(String, String)]{var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// open 主要是創建連接override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root")insertStmt = conn.prepareStatement("INSERT INTO mysqljdbcsink (id, name) VALUES (?, ?)")updateStmt = conn.prepareStatement("UPDATE mysqljdbcsink SET id = ? WHERE name = ?")}// 調用連接,執行sqloverride def invoke(value: (String, String), context: SinkFunction.Context[_]): Unit = {updateStmt.setString(1, value._1)updateStmt.setString(2, value._2)updateStmt.execute()if (updateStmt.getUpdateCount == 0) {insertStmt.setString(1, value._1)insertStmt.setString(2, value._2)insertStmt.execute()}}override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()} }??主函數中調用
dataStream.addSink(new MysqlJdbcSink())6 UDF函數
6.1 函數類(Function Classes)
??函數類:就是在Flink里面每一步運算,轉換,包括source和sink。每一個算子里面的參數都可以傳入一個所謂的函數類。就提供了更多更靈活的實現自己功能的方法。Flink暴露了所有udf函數的接口(實現方式為接口或者抽象類)。例如MapFunction, FilterFunction, ProcessFunction等等
??實現了FilterFunction接口如下:
class MyFilter extends FilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")} } val filterStream = stream.filter(new FlinkFilter)??將函數實現成匿名類
val filterStream = stream.filter(new RichFilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")}} )6.2 富函數(Rich Functions)
??“富函數”是DataStream API提供的一個函數類的接口,所有Flink函數類都有其Rich版本。它與常規函數的不同在于,可以獲取運行環境的上下文,并擁有一些生命周期方法,所以可以實現更復雜的功能。如RichMapFunction, RichFlatMapFunction,RichFilterFunction
??Rich Function有一個生命周期的概念。典型的生命周期方法有:
??①open()方法是rich function的初始化方法,當一個算子例如map或者filter被調用之前open()會被調用。
??②close()方法是生命周期中的最后一個調用的方法,做一些清理工作。
??③getRuntimeContext()方法提供了函數的RuntimeContext的一些信息,例如函數執行的并行度,任務的名字,以及state狀態
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {var subTaskIndex = 0override def open(configuration: Configuration): Unit = {subTaskIndex = getRuntimeContext.getIndexOfThisSubtask// 以下可以做一些初始化工作, }override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {if (in % 2 == subTaskIndex) {out.collect((subTaskIndex, in))} }override def close(): Unit = {// 以下做一些清理工作} } 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Flink常见流处理API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Codeforces Round #69
- 下一篇: 拟合与岭回归