日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Flink 如何分流数据

發(fā)布時(shí)間:2024/5/24 综合教程 50 生活家
生活随笔 收集整理的這篇文章主要介紹了 Flink 如何分流数据 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

場(chǎng)景
分流方式
如何分流

使用Filter分流
使用Split分流
使用Side Output分流

場(chǎng)景

獲取流數(shù)據(jù)的時(shí)候,通常需要根據(jù)所需把流拆分出其他多個(gè)流,根據(jù)不同的流再去作相應(yīng)的處理。

舉個(gè)例子:創(chuàng)建一個(gè)商品實(shí)時(shí)流,商品有季節(jié)標(biāo)簽,需要對(duì)不同標(biāo)簽的商品做統(tǒng)計(jì)處理,這個(gè)時(shí)候就需要把商品數(shù)據(jù)流根據(jù)季節(jié)標(biāo)簽分流。

分流方式

使用Filter分流
使用Split分流
使用Side Output分流

如何分流

先模擬一個(gè)實(shí)時(shí)的數(shù)據(jù)流

import lombok.Data;
@Data
public class Product {
    public Integer id;
    public String seasonType;
}

自定義Source

import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.Random;

public class ProductStremingSource implements SourceFunction<Product> {
    private boolean isRunning = true;

    @Override
    public void run(SourceContext<Product> ctx) throws Exception {
        while (isRunning){
            // 每一秒鐘產(chǎn)生一條數(shù)據(jù)
            Product product = generateProduct();
            ctx.collect(product);
            Thread.sleep(1000);
        }
    }

    private Product generateProduct(){
        int i = new Random().nextInt(100);
        ArrayList<String> list = new ArrayList();
        list.add("spring");
        list.add("summer");
        list.add("autumn");
        list.add("winter");
        Product product = new Product();
        product.setSeasonType(list.get(new Random().nextInt(4)));
        product.setId(i);
        return product;
    }
    @Override
    public void cancel() {

    }
}

輸出:

使用Filter分流

使用 filter 算子根據(jù)數(shù)據(jù)的字段進(jìn)行過濾。

import common.Product;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import source.ProductStremingSource;

public class OutputStremingDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Product> source = env.addSource(new ProductStremingSource());

        // 使用Filter分流
        SingleOutputStreamOperator<Product> spring = source.filter(product -> "spring".equals(product.getSeasonType()));
        SingleOutputStreamOperator<Product> summer = source.filter(product -> "summer".equals(product.getSeasonType()));
        SingleOutputStreamOperator<Product> autumn  = source.filter(product -> "autumn".equals(product.getSeasonType()));
        SingleOutputStreamOperator<Product> winter  = source.filter(product -> "winter".equals(product.getSeasonType()));
        source.print();
        winter.printToErr();

        env.execute("output");
    }
}

結(jié)果輸出(紅色為季節(jié)標(biāo)簽是winter的分流輸出):

使用Split分流

重寫OutputSelector內(nèi)部類的select()方法,根據(jù)數(shù)據(jù)所需要分流的類型反正不同的標(biāo)簽下,返回SplitStream,通過SplitStream的select()方法去選擇相應(yīng)的數(shù)據(jù)流。

只分流一次是沒有問題的,但是不能使用它來做連續(xù)的分流。

SplitStream已經(jīng)標(biāo)記過時(shí)了

public class OutputStremingDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Product> source = env.addSource(new ProductStremingSource());

        // 使用Split分流
        SplitStream<Product> dataSelect = source.split(new OutputSelector<Product>() {
            @Override
            public Iterable<String> select(Product product) {
                List<String> seasonTypes = new ArrayList<>();
                String seasonType = product.getSeasonType();
                switch (seasonType){
                    case "spring":
                        seasonTypes.add(seasonType);
                        break;
                    case "summer":
                        seasonTypes.add(seasonType);
                        break;
                    case "autumn":
                        seasonTypes.add(seasonType);
                        break;
                    case "winter":
                        seasonTypes.add(seasonType);
                        break;
                    default:
                        break;
                }
                return seasonTypes;
            }
        });
        DataStream<Product> spring = dataSelect.select("machine");
        DataStream<Product> summer = dataSelect.select("docker");
        DataStream<Product> autumn = dataSelect.select("application");
        DataStream<Product> winter = dataSelect.select("middleware");
        source.print();
        winter.printToErr();

        env.execute("output");
    }
}

使用Side Output分流

推薦使用這種方式

首先需要定義一個(gè)OutputTag用于標(biāo)識(shí)不同流

可以使用下面的幾種函數(shù)處理流發(fā)送到分流中:

ProcessFunction
KeyedProcessFunction
CoProcessFunction
KeyedCoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction

之后再用getSideOutput(OutputTag)選擇流。

public class OutputStremingDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Product> source = env.addSource(new ProductStremingSource());

        // 使用Side Output分流
        final OutputTag<Product> spring = new OutputTag<Product>("spring");
        final OutputTag<Product> summer = new OutputTag<Product>("summer");
        final OutputTag<Product> autumn = new OutputTag<Product>("autumn");
        final OutputTag<Product> winter = new OutputTag<Product>("winter");
        SingleOutputStreamOperator<Product> sideOutputData = source.process(new ProcessFunction<Product, Product>() {
            @Override
            public void processElement(Product product, Context ctx, Collector<Product> out) throws Exception {
                String seasonType = product.getSeasonType();
                switch (seasonType){
                    case "spring":
                        ctx.output(spring,product);
                        break;
                    case "summer":
                        ctx.output(summer,product);
                        break;
                    case "autumn":
                        ctx.output(autumn,product);
                        break;
                    case "winter":
                        ctx.output(winter,product);
                        break;
                    default:
                        out.collect(product);
                }
            }
        });

        DataStream<Product> springStream = sideOutputData.getSideOutput(spring);
        DataStream<Product> summerStream = sideOutputData.getSideOutput(summer);
        DataStream<Product> autumnStream = sideOutputData.getSideOutput(autumn);
        DataStream<Product> winterStream = sideOutputData.getSideOutput(winter);

        // 輸出標(biāo)簽為:winter 的數(shù)據(jù)流
        winterStream.print();

        env.execute("output");
    }
}

結(jié)果輸出:

更多文章:www.ipooli.com

掃碼關(guān)注公眾號(hào)《ipoo》

總結(jié)

以上是生活随笔為你收集整理的Flink 如何分流数据的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。