日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink 1.7.2 dataset transformation 示例

發布時間:2023/12/19 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 1.7.2 dataset transformation 示例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink 1.7.2 dataset transformation 示例

源碼

  • https://github.com/opensourceteams/flink-maven-scala

概述

  • Flink transformation示例
  • map,flatMap,filter,reduce,groupBy,reduceGroup,combineGroup,Aggregate(sum,max,min)
  • distinct,join,join funtion,leftOuterJoin,rightOuterJoin,fullOuterJoin,union,first,coGroup,cross

transformation

map

  • 對集合元素,進行一一遍歷處理
  • 示例功能:給集合中的每一一行,都拼接字符串
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mapimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.map(_.toUpperCase + "字符串連接")dataSet2.print()}}
  • 輸出結果
C A B D A C字符串連接 D C A B C D字符串連接

flatMap

  • 對集合元素,進行一一遍歷處理,并把子集合中的數據拉到一個集合中
  • 示例功能:把行進行拆分后,再把不同的行拆分之后的元素,匯總到一個集合中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.flatmapimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" "))dataSet2.print()}}
  • 輸出結果
C A B D A C D C A B C D

filter

  • 對集合元素,進行一一遍歷處理,只過濾滿足條件的元素
  • 示例功能:過濾空格數據
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.filterimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** filter 過濾器,對數據進行過濾處理*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" ")).filter(_.nonEmpty)dataSet2.print()}}
  • 輸出結果
C A B D A C D C A B C D

reduce

  • 對集合中所有元素,兩兩之間進行reduce函數表達式的計算
  • 示例功能:統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mappackage com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于進行所有元素的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(3,5,8,9)// 3 + 5 + 8 + 9val dataSet2 = dataSet.reduce((a,b) => {println(s"${a} + $ = ${a +b}")a + b})dataSet2.print()}}
  • 輸出結果
3 + 5 = 8 8 + 8 = 16 16 + 9 = 25 25

reduce (先groupBy)

  • 對集合中所有元素,按指定的key分組,按組執行reduce
  • 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object ReduceGroupRun2 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).reduce((x,y) => {(x._1,x._2 + y._2)})dataSet2.print()}}
  • 輸出結果
(d,1) (a,2) (f,2) (b,1) (c,2) (g,1)

groupBy (class Fields)

  • 對集合中所有元素,按用例類中的屬性,進行分組
  • 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByClassFieldsimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map(WordCount(_,1)).groupBy("word").reduce((x,y) => WordCount(x.word, x.count + y.count))dataSet2.print()}case class WordCount(word:String,count:Int)}
  • 輸出結果
WordCount(d,1) WordCount(a,2) WordCount(f,2) WordCount(b,1) WordCount(c,2) WordCount(g,1)

groupBy (key Selector)

  • 對集合中所有元素,按key 選擇器進行分組
  • 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByKeySelectorimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map((_,1)).groupBy(_._1).reduce((x,y) => (x._1,x._2 +y._2))dataSet2.print()}}
  • 輸出結果
WordCount(d,1) WordCount(a,2) WordCount(f,2) WordCount(b,1) WordCount(c,2) WordCount(g,1)

reduceGroup

  • 對集合中所有元素,按指定的key分組,把相同key的元素,做為參數,調用reduceGroup()函數
  • 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做為參數傳進來了*/ object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中間數據* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).reduceGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
  • 輸出結果
(a,3) (b,1) (c,1)

combineGroup

  • 對集合中所有元素,按指定的key分組,把相同key的元素,做為參數,調用combineGroup()函數,會在本地進行合并
  • 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.combineGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做為參數傳進來了*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中間數據* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).combineGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
  • 輸出結果
(a,3) (b,1) (c,1)

Aggregate sum

  • 按key分組 對Tuple2(String,Int) 中value進行求和操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.sum(1)dataSet2.print()}}
  • 輸出結果
(f,15)

Aggregate max

  • 按key分組 對Tuple2(String,Int) 中value進行求最大值操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.max(1)dataSet2.print()}}
  • 輸出結果
(f,5)

Aggregate min

  • 按key分組 對Tuple2(String,Int) 中value進行求最小值操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.min(1)dataSet2.print()}}
  • 輸出結果
(f,1)

Aggregate sum (groupBy)

  • 按key分組 對Tuple2(String,Int) 中的所有元素進行求和操作
  • 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).sum(1)dataSet2.print()}}
  • 輸出結果
(d,1) (a,2) (f,2) (b,1) (c,2) (g,1)

Aggregate max (groupBy) 等于 maxBy

  • 按key分組 對Tuple2(String,Int) 中value 進行求最大值
  • 示例功能:按key分組統計最大值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).max(1)dataSet2.print()}}
  • 輸出結果
(d,1) (a,2) (f,1) (b,1) (c,4) (g,1)

Aggregate min (groupBy) 等于minBy

  • 按key分組 對Tuple2(String,Int) 中value 進行求最小值
  • 示例功能:按key分組統計最小值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).min(1)dataSet2.print()}}
  • 輸出結果
(d,1) (a,1) (f,1) (b,1) (c,1) (g,1)

distinct 去重

  • 按指定的例,去重
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/ object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.distinct(1)dataSet2.print()}}
  • 輸出結果
(a,3) (b,1) (c,5)

join

  • 連接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0)dataSet3.print()}}
  • 輸出結果
((d,1),(d,1)) ((f,1),(f,1)) ((f,1),(f,1)) ((f,1),(f,1)) ((f,1),(f,1)) ((g,1),(g,1))

join (Function)

  • 連接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinFunctionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0){(x,y) => (x._1,x._2+ y._2)}dataSet3.print()}}
  • 輸出結果
(f,3) (g,6)

leftOuterJoin

  • 左外連接,左邊的Dataset中的每一個元素,去連接右邊的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.leftOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.leftOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(y != null ){count = y._2}(x._1,x._2+ count)}}dataSet3.print()}}
  • 輸出結果
(d,1) (a,3) (a,1) (f,3) (b,1) (c,5) (c,1) (g,6)

rightOuterJoin

  • 右外連接,左邊的Dataset中的每一個元素,去連接左邊的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.rightOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.rightOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(x != null ){count = x._2}(x._1,y._2 + count)}}dataSet3.print()}}
  • 輸出結果
(f,2) (g,2)

fullOuterJoin

  • 全外連接,左右兩邊的元素,全部連接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.fullOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var countY = 0;if(y != null ){countY = y._2}var countX = 0;if(x != null ){countX = x._2}(x._1,countX + countY)}}dataSet3.print()}}
  • 輸出結果
(f,2) (g,2)

union

  • 連接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.union(dataSet2)dataSet3.print()}}
  • 輸出結果
(a,1) (d,1) (g,1) (f,1) (f,1) (g,1) (f,1)

first n

  • 前面幾條數據
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.firstimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.first(3)dataSet3.print()}}
  • 輸出結果
(a,3) (b,1) (c,5)

coGroup

  • 相當于,取出兩個數據集的所有去重的key,然后,再把第一個DataSet中的這個key的所有元素放到可迭代對象中,再把第二個DataSet中的這個key的所有元素放到可迭代對象中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.cogroupimport java.langimport org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.util.Collectorobject Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("a",1))val dataSet2 = env.fromElements(("a",1),("f",1))//全外連接val dataSet3 = dataSet.coGroup(dataSet2).where(0).equalTo(0){new CoGroupFunction[(String,Int),(String,Int), Collector[(String,Int)]] {override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, Int)], out: Collector[Collector[(String, Int)]]): Unit = {println("==============開始")println("first")println(first)val iteratorFirst = first.iterator()while (iteratorFirst.hasNext()){println(iteratorFirst.next())}println("second")println(second)val iteratorSecond = second.iterator()while (iteratorSecond.hasNext()){println(iteratorSecond.next())}println("==============結束")}}}dataSet3.print()}}
  • 輸出結果
==============開始 first org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@3500e7b0 (a,1) (a,1) second org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@41230ea2 (a,1) ==============結束 ==============開始 first org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@14602d0a (g,1) second [] ==============結束 ==============開始 first [] second org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@2b0a15b5 (f,1) ==============結束Process finished with exit code 0

cross

  • 交叉連接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.crossimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.cross(dataSet2)dataSet3.print()}}
  • 輸出結果
((a,1),(d,1)) ((a,1),(f,1)) ((a,1),(g,1)) ((a,1),(f,1)) ((g,1),(d,1)) ((g,1),(f,1)) ((g,1),(g,1)) ((g,1),(f,1)) ((f,1),(d,1)) ((f,1),(f,1)) ((f,1),(g,1)) ((f,1),(f,1)) 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Flink 1.7.2 dataset transformation 示例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。