日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...

發布時間:2025/3/11 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我正在使用Kafka Consumer API將所有數據從Kafka主題復制到Hive表 . 為此,我使用HDFS作為中間步驟 . 我使用唯一的組ID并將偏移重置為“最早”,以便從頭開始獲取所有數據,并在執行后忽略提交 . 然后我遍歷Kafka主題中的記錄,并將每條記錄保存到HDFS中的臨時文件中 . 然后我使用Spark從HDFS讀取數據,然后使用日期作為文件名將其保存到Parquet文件中 . 然后,我在Hive表中創建一個帶日期的分區,最后在Parquet中將文件作為分區加載到Hive中 .

正如您在下面的代碼中看到的,我使用了幾個中間步驟,這使得我的代碼遠非最佳 . 這是從Kafka主題復制所有數據的最佳推薦方法嗎?我做了一些研究,到目前為止,這是我設法開始工作的變通方法,但是,隨著記錄數量每天增加,我的執行時間達到了可容忍的極限(從2分鐘變為6分鐘到6分鐘)周) .

代碼在這里:

def start( lowerDate: String, upperDate: String )={

// Configurations for kafka consumer

val conf = ConfigFactory.parseResources("properties.conf")

val brokersip = conf.getString("enrichment.brokers.value")

val topics_in = conf.getString("enrichment.topics_in.value")

// Crea la sesion de Spark

val spark = SparkSession

.builder()

.master("yarn")

.appName("ParaTiUserXY")

.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val properties = new Properties

properties.put("key.deserializer", classOf[StringDeserializer])

properties.put("value.deserializer", classOf[StringDeserializer])

properties.put("bootstrap.servers", brokersip)

properties.put("auto.offset.reset", "earliest")

properties.put("group.id", "ParaTiUserXYZZ12345")

//Schema para transformar los valores del topico de Kafka a JSON

val my_schema = new StructType()

.add("longitudCliente", StringType)

.add("latitudCliente", StringType)

.add("dni", StringType)

.add("alias", StringType)

.add("segmentoCliente", StringType)

.add("timestampCliente", StringType)

.add("dateCliente", StringType)

.add("timeCliente", StringType)

.add("tokenCliente", StringType)

.add("telefonoCliente", StringType)

val consumer = new KafkaConsumer[String, String](properties)

consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents") )

val fs = {

val conf = new Configuration()

FileSystem.get(conf)

}

val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")

if( fs.exists(temp_path)){

fs.delete(temp_path, true)

}

while(true)

{

val records=consumer.poll(100)

for (record

val data = record.value.toString

//println(data)

val dataos: FSDataOutputStream = fs.create(temp_path)

val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))

bw.append(data)

bw.close

val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")

val fechaCliente = data_schema.select("dateCliente").first.getString(0)

if( fechaCliente < upperDate && fechaCliente >= lowerDate){

data_schema.select("longitudCliente", "latitudCliente","dni", "alias",

"segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",

"tokenCliente", "telefonoCliente")

.coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)

}

else if( fechaCliente < lowerDate){

//

}

else if( fechaCliente >= upperDate){

break;

}

}

}

consumer.close()

}

總結

以上是生活随笔為你收集整理的java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。