日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式

發布時間:2024/9/27 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是創建輸入流直接從Kafka 集群節點拉取消息。輸入流保證每個消息從Kafka 集群拉取以后只完全轉換一次,保證語義一致性。但是當作業發生故障或重啟時,要保障從當前的消費位點去處理數據(即Exactly Once語義),單純的依靠SparkStreaming本身的機制是不太理想的,生產環境中通常借助手動管理offset的方式來維護kafka的消費位點。本文分享將介紹如何手動管理Kafka的Offset,希望對你有所幫助。本文主要包括以下內容:

  • 如何使用MySQL管理Kafka的Offset
  • 如何使用Redis管理Kafka的OffSet

如何使用MySQL管理Kafka的Offset

我們可以從Spark Streaming 應用程序中編寫代碼來手動管理Kafka偏移量,偏移量可以從每一批流處理中生成的RDDS偏移量來獲取,獲取方式為:

KafkaUtils.createDirectStream(...).foreachRDD { rdd => // 獲取偏移量 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges...}

當獲取到偏移量之后,可以將將其保存到外部存儲設備中(MySQL、Redis、Zookeeper、HBase等)。

使用案例代碼

  • MySQL中用于保存偏移量的表
CREATE TABLE `topic_par_group_offset` (`topic` varchar(255) NOT NULL,`partition` int(11) NOT NULL,`groupid` varchar(255) NOT NULL,`offset` bigint(20) DEFAULT NULL,PRIMARY KEY (`topic`,`partition`,`groupid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
  • 常量配置類:ConfigConstants
object ConfigConstants {// Kafka配置val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"val groupId = "group_test"val kafkaTopics = "test"val batchInterval = Seconds(5)val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"val batchSize = 16384val lingerMs = 1val bufferMemory = 33554432// MySQL配置val user = "root"val password = "123qwe"val url = "jdbc:mysql://localhost:3306/kafka_offset"val driver = "com.mysql.jdbc.Driver"// 檢查點配置val checkpointDir = "file:///e:/checkpoint"val checkpointInterval = Seconds(10)// Redis配置val redisAddress = "192.168.10.203"val redisPort = 6379val redisAuth = "123qwe"val redisTimeout = 3000 }
  • JDBC連接工具類:JDBCConnPool
object JDBCConnPool {val log: Logger = Logger.getLogger(JDBCConnPool.getClass)var dataSource: BasicDataSource = null/*** 創建數據源** @return*/def getDataSource(): BasicDataSource = {if (dataSource == null) {dataSource = new BasicDataSource()dataSource.setDriverClassName(ConfigConstants.driver)dataSource.setUrl(ConfigConstants.url)dataSource.setUsername(ConfigConstants.user)dataSource.setPassword(ConfigConstants.password)dataSource.setMaxTotal(50)dataSource.setInitialSize(3)dataSource.setMinIdle(3)dataSource.setMaxIdle(10)dataSource.setMaxWaitMillis(2 * 10000)dataSource.setRemoveAbandonedTimeout(180)dataSource.setRemoveAbandonedOnBorrow(true)dataSource.setRemoveAbandonedOnMaintenance(true)dataSource.setTestOnReturn(true)dataSource.setTestOnBorrow(true)}return dataSource}/*** 釋放數據源*/def closeDataSource() = {if (dataSource != null) {dataSource.close()}}/*** 獲取數據庫連接** @return*/def getConnection(): Connection = {var conn: Connection = nulltry {if (dataSource != null) {conn = dataSource.getConnection()} else {conn = getDataSource().getConnection()}} catch {case e: Exception =>log.error(e.getMessage(), e)}conn}/*** 關閉連接*/def closeConnection (ps:PreparedStatement , conn:Connection ) {if (ps != null) {try {ps.close();} catch {case e:Exception =>log.error("預編譯SQL語句對象PreparedStatement關閉異常!" + e.getMessage(), e);}}if (conn != null) {try {conn.close();} catch {case e:Exception =>log.error("關閉連接對象Connection異常!" + e.getMessage(), e);}}} }
  • Kafka生產者:KafkaProducerTest
object KafkaProducerTest {def main(args: Array[String]): Unit = {val props : Properties = new Properties()props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])props.put("key.serializer",ConfigConstants.kafkaKeySer)props.put("value.serializer", ConfigConstants.kafkaValueSer)val producer : Producer[String, String] = new KafkaProducer[String, String](props)val startTime : Long = System.currentTimeMillis()for ( i <- 1 to 100) {producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))}println("消耗時間:" + (System.currentTimeMillis() - startTime))producer.close()} }
  • 讀取和保存Offset:

該對象的作用是從外部設備中讀取和寫入Offset,包括MySQL和Redis

object OffsetReadAndSave {/*** 從MySQL中獲取偏移量** @param groupid* @param topic* @return*/def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {val conn = JDBCConnPool.getConnection()val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"val ppst = conn.prepareStatement(selectSql)ppst.setString(1, groupid)ppst.setString(2, topic)val result: ResultSet = ppst.executeQuery()// 主題分區偏移量val topicPartitionOffset = mutable.Map[TopicPartition, Long]()while (result.next()) {val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))topicPartitionOffset += (topicPartition -> result.getLong("offset"))}JDBCConnPool.closeConnection(ppst, conn)topicPartitionOffset}/*** 從Redis中獲取偏移量** @param groupid* @param topic* @return*/def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {val jedis: Jedis = JedisConnPool.getConnection()var offsets = mutable.Map[TopicPartition, Long]()val key = s"${topic}_${groupid}"val fields : java.util.Map[String, String] = jedis.hgetAll(key)for (partition <- JavaConversions.mapAsScalaMap(fields)) {offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)}offsets.toMap}/*** 將偏移量寫入MySQL** @param groupid 消費者組ID* @param offsetRange 消息偏移量范圍*/def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {val conn = JDBCConnPool.getConnection()val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"val ppst = conn.prepareStatement(insertSql)for (offset <- offsetRange) {ppst.setString(1, offset.topic)ppst.setInt(2, offset.partition)ppst.setString(3, groupid)ppst.setLong(4, offset.untilOffset)ppst.executeUpdate()}JDBCConnPool.closeConnection(ppst, conn)}/*** 將偏移量保存到Redis中* @param groupid* @param offsetRange*/def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {val jedis :Jedis = JedisConnPool.getConnection()for(offsetRange<-offsetRange){val topic=offsetRange.topicval partition=offsetRange.partitionval offset=offsetRange.untilOffset// key為topic_groupid,field為partition,value為offsetjedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)}} }
  • 業務處理類

該對象是業務處理邏輯,主要是消費Kafka數據,再處理之后進行手動將偏移量保存到MySQL中。在啟動程序時,會判斷外部存儲設備中是否存在偏移量,如果是首次啟動則從最初的消費位點消費,如果存在Offset,則從當前的Offset去消費。

觀察現象:當首次啟動時會從頭消費數據,手動停止程序,然后再次啟動,會發現會從當前提交的偏移量消費數據。object ManualCommitOffset {def main(args: Array[String]): Unit = {val brokers = ConfigConstants.kafkaBrokersval groupId = ConfigConstants.groupIdval topics = ConfigConstants.kafkaTopicsval batchInterval = ConfigConstants.batchIntervalval conf = new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster("local[1]").set("spark.serializer",ConfigConstants.sparkSerializer)val ssc = new StreamingContext(conf, batchInterval)// 必須開啟checkpoint,否則會報錯ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel("OFF")//使用broker和topic創建direct kafka streamval topicSet = topics.split(" ").toSet// kafka連接參數val kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")// 從MySQL中讀取該主題對應的消費者組的分區偏移量val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] = null//如果MySQL中已經存在了偏移量,則應該從該偏移量處開始消費if (offsetMap.size > 0) {println("存在偏移量,從該偏移量處進行消費!!")inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果MySQL中沒有存在了偏移量,從最早開始消費inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint時間間隔,必須是batchInterval的整數倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges = Array[OffsetRange]()// 獲取當前DS的消息偏移量val transformDS = inputDStream.transform { rdd =>// 獲取offsetoffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 狀態更新函數* @param newValues:新的value值* @param stateValue:狀態值* @return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 獲取狀態值// 遍歷當前數據,并更新狀態for (newValue <- newValues) {oldvalue += newValue}// 返回最新的狀態Option(oldvalue)}// 業務邏輯處理// 該示例統計消息key的個數,用于查看是否是從已經提交的偏移量消費數據transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和數據信息,觀察輸出的結果transformDS.foreachRDD { (rdd, time) =>// 遍歷打印該RDD數據rdd.foreach { record =>println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// 打印消費偏移量信息for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}//將偏移量保存到到MySQL中OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()} }

如何使用Redis管理Kafka的OffSet

  • Redis連接類
object JedisConnPool {val config = new JedisPoolConfig//最大連接數config.setMaxTotal(60)//最大空閑連接數config.setMaxIdle(10)config.setTestOnBorrow(true)//服務器ipval redisAddress :String = ConfigConstants.redisAddress.toString// 端口號val redisPort:Int = ConfigConstants.redisPort.toInt//訪問密碼val redisAuth :String = ConfigConstants.redisAuth.toString//等待可用連接的最大時間val redisTimeout:Int = ConfigConstants.redisTimeout.toIntval pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)def getConnection():Jedis = {pool.getResource}}
  • 業務邏輯處理

該對象與上面的基本類似,只不過使用的是Redis來進行存儲Offset,存儲到Redis的數據類型是Hash,基本格式為:[key field value] -> [ topic_groupid partition offset],即 key為topic_groupid,field為partition,value為offset。

object ManualCommitOffsetToRedis {def main(args: Array[String]): Unit = {val brokers = ConfigConstants.kafkaBrokersval groupId = ConfigConstants.groupIdval topics = ConfigConstants.kafkaTopicsval batchInterval = ConfigConstants.batchIntervalval conf = new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster("local[1]").set("spark.serializer", ConfigConstants.sparkSerializer)val ssc = new StreamingContext(conf, batchInterval)// 必須開啟checkpoint,否則會報錯ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel("OFF")//使用broker和topic創建direct kafka streamval topicSet = topics.split(" ").toSet// kafka連接參數val kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")// 從Redis中讀取該主題對應的消費者組的分區偏移量val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] = null//如果Redis中已經存在了偏移量,則應該從該偏移量處開始消費if (offsetMap.size > 0) {println("存在偏移量,從該偏移量處進行消費!!")inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果Redis中沒有存在了偏移量,從最早開始消費inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint時間間隔,必須是batchInterval的整數倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges = Array[OffsetRange]()// 獲取當前DS的消息偏移量val transformDS = inputDStream.transform { rdd =>// 獲取offsetoffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 狀態更新函數** @param newValues :新的value值* @param stateValue :狀態值* @return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 獲取狀態值// 遍歷當前數據,并更新狀態for (newValue <- newValues) {oldvalue += newValue}// 返回最新的狀態Option(oldvalue)}// 業務邏輯處理// 該示例統計消息key的個數,用于查看是否是從已經提交的偏移量消費數據transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和數據信息,觀察輸出的結果transformDS.foreachRDD { (rdd, time) =>// 遍歷打印該RDD數據rdd.foreach { record =>println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// 打印消費偏移量信息for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}//將偏移量保存到到Redis中OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()}}

總結

本文介紹了如何使用外部存儲設備來保存Kafka的消費位點,通過詳細的代碼示例說明了使用MySQL和Redis管理消費位點的方式。當然,外部存儲設備很多,用戶也可以使用其他的存儲設備進行管理Offset,比如Zookeeper和HBase等,其基本處理思路都十分相似。

大數據技術與數倉

總結

以上是生活随笔為你收集整理的sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。