日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

spark-stream 访问 Redis

發布時間:2023/11/29 数据库 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark-stream 访问 Redis 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在spark-stream上寫了一些流計算處理程序,程序架構如下

程序運行在Spark-stream上,我的目標是kafka、Redis的參數都支持在啟動時指定。

在寫代碼時參考了這篇文章 https://www.iteblog.com/archi...,該文講的比較清楚,但是有兩個問題:

  • 用scala實現的

  • Redis服務器的地址是寫死的,我的程序要挪個位置,要重新改代碼編譯。

  • 當時倒騰了一些時間,現在寫出來和大家分享,提高后來者的效率。

    如上圖Spark是分布式引擎,Driver中創建的Redis Pool,在Worker上又得重新創建,參考文章中是定義一個Redis連接池管理類,Redis Pool是類的靜態變量,類加載時由JVM自動創建。這個和我的預期有差距。

    在Driver中創建Redis管理對象,然后將該對象廣播,然后在Worker上獲取該廣播對象,從而實現參數可變,但是Redis管理對象在每個Worker上又只實例化了一次。

    Driver

    Driver 指定序列化方式,Spark支持兩種序列化方式,Java 和 Kryo,Kryo更高效。

    資料上說Kryo方式需要注冊類,但是我沒有注冊也能成功運行。

    public static void main(String[] args) {if (args.length < 3) {System.err.println("Usage: kafka_spark_redis <brokers> <topics> <redisServer>\n" +" <brokers> Kafka broker列表\n" +" <topics> 要消費的topic列表\n" +" <redisServer> redis 服務器地址 \n\n");System.exit(1);}/* 解析參數 */String brokers = args[0];String topics = args[1];String redisServer = args[2];// 創建stream context,兩秒鐘的數據算一批SparkConf sparkConf = new SparkConf().setAppName("kafka_spark_redis"); // sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");//java的序列號速度沒有Kryo速度快sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); // sparkConf.set("spark.kryo.registrator", "MyRegistrator");JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));JavaSparkContext sc = jssc.sparkContext();HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));HashMap<String, String> kafkaParams = new HashMap<String, String>();kafkaParams.put("metadata.broker.list", brokers);kafkaParams.put("group.id","kakou-test");//Redis連接池管理類RedisClient redisClient = new RedisClient(redisServer);//創建redis連接池管理類//廣播Reids連接池管理對象final Broadcast<RedisClient> broadcastRedis = sc.broadcast(redisClient);// 創建流處理對象JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,String.class, /* kafka key class */String.class, /* kafka value class */StringDecoder.class, /* key 解碼類 */StringDecoder.class, /* value 解碼類 */kafkaParams, /* kafka 參數,如設置kafka broker */topicsSet /* 待消費的topic名稱 */);// 將行分拆為單詞JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {//@Override// kafka傳來key-value對public String call(Tuple2<String, String> tuple2) {// 取value值return tuple2._2();}});/* 大量省略 */........}

    RedisClient

    RedisClient 是自己實現的類,在類中重載write/read這兩個序列化和反序列化函數,需要注意的是如果是Java Serializer 需要實現其它的接口。

    在Driver廣播時會觸發調用write序列化函數。

    public class RedisClient implements KryoSerializable {public static JedisPool jedisPool;public String host;public RedisClient(){Runtime.getRuntime().addShutdownHook(new CleanWorkThread());}public RedisClient(String host){this.host=host;Runtime.getRuntime().addShutdownHook(new CleanWorkThread());jedisPool = new JedisPool(new GenericObjectPoolConfig(), host);}static class CleanWorkThread extends Thread{@Overridepublic void run() {System.out.println("Destroy jedis pool");if (null != jedisPool){jedisPool.destroy();jedisPool = null;}}}public Jedis getResource(){return jedisPool.getResource();}public void returnResource(Jedis jedis){jedisPool.returnResource(jedis);}public void write(Kryo kryo, Output output) {kryo.writeObject(output, host);}public void read(Kryo kryo, Input input) {host=kryo.readObject(input, String.class);this.jedisPool =new JedisPool(new GenericObjectPoolConfig(), host) ;} }

    Worker

    在foreachRDD中獲取廣播變量,由廣播變量觸發先調用RedisClient的無參反序列化函數,然后再調用反序列化函數,我們的做法是在反序列化函數中創建Redis Pool。

    //標準輸出,對車輛的車牌和黑名單進行匹配,對與匹配成功的,保存到redis上。paircar.foreachRDD(new Function2<JavaRDD<HashMap<String, String>>, Time, Void>() {public Void call(JavaRDD<HashMap<String, String>> rdd, Time time) throws Exception {Date now=new Date();rdd.foreachPartition(new VoidFunction<Iterator<HashMap<String, String>>>() {public void call(Iterator<HashMap<String, String>> it) throws Exception {String tmp1;String tmp2;Date now=new Date();RedisClient redisClient=broadcastRedis.getValue();Jedis jedis=redisClient.getResource();......redisClient.returnResource(jedis);}});

    結語

    Spark對分布式計算做了封裝,但很多場景下還是要了解它的工作機制,很多問題和性能優化都和Spark的工作機制緊密相關。

    總結

    以上是生活随笔為你收集整理的spark-stream 访问 Redis的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。