日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark写入elasticsearch限流

發布時間:2024/2/28 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark写入elasticsearch限流 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 1. spark 批量寫入es
    • 2. java-spark寫入elasticsearch
    • 3. es_hadoop的源碼拓展
      • 1. MyEsSparkSQL
      • 2. MyEsDataFrameWriter

1. spark 批量寫入es

正常情況下,我們的spark任務有寫入es的需求的時候,我們都是使用ES_Hadoop。參考官方的這里,選擇適合自己的版本,如果是hive,spark等都有用到的話可以直接配置

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId><version>7.1.1</version> </dependency>

因為我們這里只是用到了spark,spark的版本是2.3 , scale 是2.11 ,elasticsearch是7.1.1所以只引入spark的包即可。

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.11</artifactId><version>7.1.1</version></dependency>

2. java-spark寫入elasticsearch

java寫入es的代碼可以這樣

@Data public class UserProfileRecord {public String uid;public String want_val; } SparkConf sparkConf = new SparkConf().setAppName(JOB_NAME).set(ConfigurationOptions.ES_NODES, esHost).set(ConfigurationOptions.ES_PORT, esPort).set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser).set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPass).set(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "500").set(ConfigurationOptions.ES_MAPPING_ID, "uid");SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> wantedCols = sparkSession.read().parquet(path);Dataset<UserProfileRecord> searchUserProfile = wantedCols.mapPartitions(new MapPartitionsFunction<Row, UserProfileRecord>() {@Overridepublic Iterator<UserProfileRecord> call(Iterator<Row> input) throws Exception {List<UserProfileRecord> cleanProfileList = new LinkedList<>();while (input.hasNext()) {UserProfileRecord aRecord = new UserProfileRecord();.........cleanProfileList.add(aRecord);}return cleanProfileList.iterator();}}, Encoders.bean(UserProfileRecord.class));EsSparkSQL.saveToEs(searchUserProfile.repartition(3), this.writeIndex);

??這里因為es當前只有3個節點,所以用了一個repartition來將寫入es的task數變成3個,減小對es的壓力,在實際的使用過程中主片的寫入速度能夠達到平均3w/s,但是當任務產出的數據量比較大的時候寫入的時間會比較長,還是會對當前的es集群產生比較大的影響,導致部分查詢超時。
??查找了很多官方的文檔,發現能夠調整的很有限,一般都是調整partition的數量和ConfigurationOptions.ES_BATCH_SIZE_ENTRIES 來throttle寫入es的速度。我這邊各種試探,收效甚微。
??本來想用elasticsearch的java-client直接做rest請求的(這樣就可以控制速速了),但是翻了一下es_hadoop的源碼,發現她用的是tranport-client(是es內部通信使用的基于tcp的協議封裝)那肯定比http類型的rest更高效啊,而且還有很多partition和es索引的replica的映射關系,想著應該是做了很多優化。所以還是用es_hadoop來做吧,沒有辦法了,只能看看改改源碼了。

3. es_hadoop的源碼拓展

增加了兩個scala文件(強上scala😂)
MyEsSparkSQL
MyEsDataFrameWriter

注意包名一定要是org.elasticsearch.spark.sql

1. MyEsSparkSQL

package org.elasticsearch.spark.sqlimport org.apache.commons.logging.LogFactory import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Dataset import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SparkSession import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE import org.elasticsearch.hadoop.cfg.PropertiesSettings import org.elasticsearch.hadoop.rest.InitializationUtils import org.elasticsearch.hadoop.util.ObjectUtils import org.elasticsearch.spark.cfg.SparkSettingsManagerimport scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.JavaConverters.propertiesAsScalaMapConverter import scala.collection.Mapobject MyEsSparkSQL {private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }@transient private[this] val LOG = LogFactory.getLog(EsSparkSQL.getClass)//// Read//def esDF(sc: SQLContext): DataFrame = esDF(sc, Map.empty[String, String])def esDF(sc: SQLContext, resource: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource))def esDF(sc: SQLContext, resource: String, query: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))def esDF(sc: SQLContext, cfg: Map[String, String]): DataFrame = {val esConf = new SparkSettingsManager().load(sc.sparkContext.getConf).copy()esConf.merge(cfg.asJava)sc.read.format("org.elasticsearch.spark.sql").options(esConf.asProperties.asScala.toMap).load}def esDF(sc: SQLContext, resource: String, query: String, cfg: Map[String, String]): DataFrame = {esDF(sc, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_READ -> resource, ES_QUERY -> query))}def esDF(sc: SQLContext, resource: String, cfg: Map[String, String]): DataFrame = {esDF(sc, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_READ -> resource))}// SparkSession variantdef esDF(ss: SparkSession): DataFrame = esDF(ss.sqlContext, Map.empty[String, String])def esDF(ss: SparkSession, resource: String): DataFrame = esDF(ss.sqlContext, Map(ES_RESOURCE_READ -> resource))def esDF(ss: SparkSession, resource: String, query: String): DataFrame = esDF(ss.sqlContext, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))def esDF(ss: SparkSession, cfg: Map[String, String]): DataFrame = esDF(ss.sqlContext, cfg)def esDF(ss: SparkSession, resource: String, query: String, cfg: Map[String, String]): DataFrame = esDF(ss.sqlContext, resource, query, cfg)def esDF(ss: SparkSession, resource: String, cfg: Map[String, String]): DataFrame = esDF(ss.sqlContext, resource, cfg)//// Write//def saveToEs(srdd: Dataset[_], resource: String): Unit = {saveToEs(srdd, Map(ES_RESOURCE_WRITE -> resource))}def saveToEs(srdd: Dataset[_], resource: String, cfg: Map[String, String]): Unit = {saveToEs(srdd, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_WRITE -> resource))}def saveToEs(srdd: Dataset[_], cfg: Map[String, String]): Unit = {if (srdd != null) {if (srdd.isStreaming) {throw new EsHadoopIllegalArgumentException("Streaming Datasets should not be saved with 'saveToEs()'. Instead, use " +"the 'writeStream().format(\"es\").save()' methods.")}val sparkCtx = srdd.sqlContext.sparkContextval sparkCfg = new SparkSettingsManager().load(sparkCtx.getConf)val esCfg = new PropertiesSettings().load(sparkCfg.save())esCfg.merge(cfg.asJava)// Need to discover ES Version before checking index existenceInitializationUtils.discoverClusterInfo(esCfg, LOG)InitializationUtils.checkIdForOperation(esCfg)InitializationUtils.checkIndexExistence(esCfg)sparkCtx.runJob(srdd.toDF().rdd, new MyEsDataFrameWriter(srdd.schema, esCfg.save()).write _)}} }

這個類就是直接盜版了EsSparkSQL,只是重寫了def saveToEs(srdd: Dataset[_], cfg: Map[String, String]): Unit方法中的最后一句

sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)

變成了

sparkCtx.runJob(srdd.toDF().rdd, new MyEsDataFrameWriter(srdd.schema, esCfg.save()).write _)

2. MyEsDataFrameWriter

package org.elasticsearch.spark.sqlimport java.util.concurrent.atomic.AtomicIntegerimport lombok.extern.slf4j.Slf4j import org.apache.spark.TaskContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.elasticsearch.hadoop.rest.RestService import org.elasticsearch.hadoop.serialization.{BytesConverter, JdkBytesConverter} import org.elasticsearch.hadoop.serialization.builder.ValueWriter import org.elasticsearch.hadoop.serialization.field.FieldExtractor import org.elasticsearch.spark.rdd.EsRDDWriter/*** Created by chencc on 2020/8/31.*/ @Slf4j class MyEsDataFrameWriter (schema: StructType, override val serializedSettings: String)extends EsRDDWriter[Row](serializedSettings:String) {override protected def valueWriter: Class[_ <: ValueWriter[_]] = classOf[DataFrameValueWriter]override protected def bytesConverter: Class[_ <: BytesConverter] = classOf[JdkBytesConverter]override protected def fieldExtractor: Class[_ <: FieldExtractor] = classOf[DataFrameFieldExtractor]override protected def processData(data: Iterator[Row]): Any = { (data.next, schema) }override def write(taskContext: TaskContext, data: Iterator[Row]): Unit = {val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)taskContext.addTaskCompletionListener((TaskContext) => writer.close())if (runtimeMetadata) {writer.repository.addRuntimeFieldExtractor(metaExtractor)}val counter= new AtomicInteger(0);while (data.hasNext) {counter.incrementAndGet();writer.repository.writeToIndex(processData(data))if(counter.get()>=500){Thread.sleep(100);counter.set(0)log.info("batch is 2000 will sleep 50 milliseconds ") // log.info("no sleep..")}}} }

這個MyEsDataFrameWriter 重寫了EsRDDWriter的write方法,增加了一些sleep,實際上可以根據線上的實際情況來調整這里。
這里的500和100可以做成在SparkConf中配置的,靈活性就更高了。

通過這樣的配置可以完美的throttle spark 寫入es的速度。

總結

以上是生活随笔為你收集整理的spark写入elasticsearch限流的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。