日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 读取本地文件,处理数据,导入ES

發布時間:2024/9/16 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 读取本地文件,处理数据,导入ES 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

需求

  • 本地有一份文件
  • 使用Flink讀取本地數據源
  • 處理數據,導入ES中
  • 提交Flink作業

環境

  • Flink :1.8.2
  • Elasticsearch:6.2.3
  • JDK:1.8

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.vincent</groupId><artifactId>hadoop-hdfs</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.8.2</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version><hadoop.version>2.8.5</hadoop.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><!-- Elasticsearch 6.x --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>${flink.version}</version></dependency><!-- Hadoop --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.11</artifactId><version>1.8.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.vincent.Test</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build><!-- This profile helps to make things run out of the box in IntelliJ --><!-- Its adds Flink's core classes to the runtime class path. --><!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --><profiles><profile><id>add-dependencies-for-IDEA</id><activation><property><name>idea.version</name></property></activation><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency></dependencies></profile></profiles></project>

自定義一個工具類ElasticsearchSinkUtil.java

package com.vincent;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.util.ExceptionUtils; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List;public class ElasticSearchSinkUtil {public static List<HttpHost> getEsAddresses(String hosts) {String[] hostList = hosts.split(",");List<HttpHost> addresses = new ArrayList<>();for (String host : hostList) {String[] ip_port = host.split(":");String ip = ip_port[0];String port = ip_port[1];addresses.add(new HttpHost(ip, Integer.parseInt(port)));}return addresses;}public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {@Overridepublic void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {String description = actionRequest.getDescription();System.out.println("----------");System.out.println(description);System.out.println("===========");if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {System.out.println("超時異常");} else if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {// 異常1: ES隊列滿了(Reject異常),放回隊列System.out.println("ES隊列滿了");requestIndexer.add(actionRequest);} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {System.out.println("parse異常" + description);} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchException.class).isPresent()) {System.out.println("出現異常");}}});data.addSink(esSinkBuilder.build()).setParallelism(parallelism);} }

Main方法

package com.vincent;import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.common.xcontent.*;import java.io.IOException; import java.util.List;public class Test {public static void main(String[] args) throws Exception {String propertiesPath = args[0];ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesPath);List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get("es.hosts"));int bulk_size = parameterTool.getInt("es.bulk.flushMaxAction");int sinkParallelism = parameterTool.getInt("es.sink.parallelism");String rawPath = parameterTool.get("rawPath");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile(rawPath);SingleOutputStreamOperator<Tuple7<String, String, String, String, String, String, String>> map = dataStreamSource.map(new MapFunction<String, Tuple7<String, String, String, String, String, String, String>>() {@Overridepublic Tuple7<String, String, String, String, String, String, String> map(String s) throws Exception {String[] splits = s.split("\t");String field1= splits[0];String field2 = splits[1];String field3= splits[2];String field4= splits[3];String field5= splits[4];String field6= splits[5];String field7= splits[6];return new Tuple7<>(uid, timestamp, desc_info, related_identity, record_num, desc_type, date);}});ElasticSearchSinkUtil.addSink(esAddresses, bulk_size, sinkParallelism, map, new ElasticsearchSinkFunction<Tuple7<String, String, String, String, String, String, String>>() {@Overridepublic void process(Tuple7<String, String, String, String, String, String, String> data, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {IndexRequest indexRequest = null;try {indexRequest = createIndexRequest(data);} catch (IOException e) {e.printStackTrace();}requestIndexer.add(indexRequest);}public IndexRequest createIndexRequest(Tuple7<String, String, String, String, String, String, String> data) throws IOException {JSONObject jsonObject = new JSONObject();jsonObject.put("field1", data.f0);jsonObject.put("field2", data.f1);jsonObject.put("field3", JSONObject.parseObject(data.f2));jsonObject.put("field4", JSONObject.parseObject(data.f3));jsonObject.put("field5", data.f4);jsonObject.put("field6", data.f5);jsonObject.put("field7", data.f6);return Requests.indexRequest().index("my_index").type("type").source(jsonObject.toString(), XContentType.JSON);}});// map.setParallelism(1).print();env.execute("Test");} }

自定義一個配置文件

可以靈活地修改配置文件:

es.hosts=swarm-manager:9200,swarm-worker1:9200,swarm-worker2:9200 es.bulk.flushMaxAction=200 es.sink.parallelism=1 # hdfs: hdfs://swarm-manager:9001/text/000000_0, windows: E:/test/hello.txt # rawPath=hdfs://swarm-manager:9001/text/000000_0 rawPath=E:/test/000000_0

打包部署

使用mvn pakage打包應用,將生成的hadoop-hdfs-1.0-SNAPSHOT-shaded.jar拷貝至服務器中。

啟動Flink集群

使用命令./flink-1.8.2/bin/start-cluster.bat啟動集群

運行作業

使用命令:flink run ./hadoop-hdfs-1.0-SNAPSHOT-shaded.jar ./flink-es.properties就可以運行該作業了
在瀏覽器中輸入http://服務器IP:8081可以查看作業運行情況

總結

以上是生活随笔為你收集整理的Apache Flink 读取本地文件,处理数据,导入ES的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。