case class到底啥用
生活随笔
收集整理的這篇文章主要介紹了
case class到底啥用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一句話講清楚case class的作用:
自定義一個復雜的數據類型.
示例如下:
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector//-------------輸入的是帶有時間戳的數據------------ object ProcessFunctionScalaV2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] = env.socketTextStream("Desktop", 9999)val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2)) // 數據前面加個隨機數,時間戳保持不變val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print("第一次keyby輸出")val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print("第二次keyby輸出")env.execute()}case class DataJast(key: String, count: Long)}// 代碼來自: // https://blog.csdn.net/zhangshenghang/article/details/105322423這里的DataJast的意思就是用scala的兩個基本數據類型組合成一個新的復雜的數據類型.
總結
以上是生活随笔為你收集整理的case class到底啥用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 全战三国怎么联姻
- 下一篇: Caused by: java.lang