日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SparkStreaming “Could not read data from write ahead log record” 报错分析解决

發布時間:2024/4/15 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkStreaming “Could not read data from write ahead log record” 报错分析解决 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
# if open wal org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment

?

SparkStreaming開啟了checkpoint wal后有時會出現如上報錯,但不會影響整體程序,只會丟失報錯的那個job的數據。其根本原因是wal文件被刪了,被sparkstreaming自己的清除機制刪掉了。通常意味著一定程度流式程序上存在速率不匹配或堆積問題。

查看driver日志可發現類似如下的日志:

2017-03-23 13:55:00 INFO [Logging.scala:58] Attempting to clear 0 old log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248380000: 2017-03-23 13:55:05 INFO [Logging.scala:58] Attempting to clear 1 old log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248470000: hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata/log-1490248404471-1490248464471 2017-03-23 13:55:05 INFO [Logging.scala:58] Cleared log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248470000 2017-03-23 13:55:05 ERROR [Logging.scala:74] Task 41 in stage 35.0 failed 4 times; aborting job 2017-03-23 13:55:05 ERROR [Logging.scala:95] Error running job streaming job 1490248470000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 41 in stage 35.0 failed 4 times, most recent failure: Lost task 41.3 in stage 35.0 (TID 4273, alps60): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedData/0/log-1490248403649-1490248463649,44333482,118014)at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:143)

可以發現?1490248403649 的日志被刪除程序刪除了(cleared log older than 1490248470000),然后這個wal就報錯了。

Spark官方文檔沒有任何關于這個的配置,因此直接看源碼。(spark很多這樣的坑,得看源碼才知道如何hack或有些隱藏配置)。

?

1.FileBasedWriteAheadLogSegment 類中根據日志搜索發現了clean方法(后面的邏輯就是具體刪除邏輯,暫不關心),核心就是如何調整這個threshTime了。

/*** Delete the log files that are older than the threshold time.** Its important to note that the threshold time is based on the time stamps used in the log* files, which is usually based on the local system time. So if there is coordination necessary* between the node calculating the threshTime (say, driver node), and the local system time* (say, worker node), the caller has to take account of possible time skew.** If waitForCompletion is set to true, this method will return only after old logs have been* deleted. This should be set to true only for testing. Else the files will be deleted* asynchronously.*/def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {val oldLogFiles = synchronized {val expiredLogs = pastLogs.filter { _.endTime < threshTime }pastLogs --= expiredLogsexpiredLogs}logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

?

2.一步步看調用追蹤出去,ReceivedBlockHandler ->?ReceiverSupervisorImpl -> CleanUpOldBlocks 。這里有個和ReceiverTracker通信的rpc,因此直接搜索CleanUpOldBlocks -> ReceiverTracker -> JobGenerator?

在JobGenerator.clearCheckpointData 中有這么一段邏輯

/** Clear DStream checkpoint data for the given `time`. */private def clearCheckpointData(time: Time) {ssc.graph.clearCheckpointData(time)// All the checkpoint information about which batches have been processed, etc have// been saved to checkpoints, so its safe to delete block metadata and data WAL filesval maxRememberDuration = graph.getMaxInputStreamRememberDuration()jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)markBatchFullyProcessed(time)}

發現了 ssc.graph有個 maxRememberDuration 的成員屬性!這就意味著有機會通過ssc去修改它。

搜索一下代碼便發現了相關方法:

jssc.remember(new Duration(2 * 3600 * 1000));

?

反思:

從之前的日志我們發現默認的清除間隔是幾十秒左右,但是在代碼中我們可以發現這個參數只能被設置一次(每次設置都會檢查當前為null才生效,初始值為null)。所以問題來了,這幾十秒在哪里設置的?代碼一時沒找到,于是項目直接搜索 remember,發現了在DStream里的初始化代碼(其中slideDuration初始化來自InputDStream)。根據計算,我們的batchInterval為15s,其他兩個沒有設置,則checkpointDuration 為15s,rememberDuration為30s。

override def slideDuration: Duration = {if (ssc == null) throw new Exception("ssc is null")if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")ssc.graph.batchDuration}/*** Initialize the DStream by setting the "zero" time, based on which* the validity of future times is calculated. This method also recursively initializes* its parent DStreams.*/private[streaming] def initialize(time: Time) {if (zeroTime != null && zeroTime != time) {throw new SparkException("ZeroTime is already initialized to " + zeroTime+ ", cannot initialize it again to " + time)}zeroTime = time// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is largerif (mustCheckpoint && checkpointDuration == null) {checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toIntlogInfo("Checkpoint interval automatically set to " + checkpointDuration)}// Set the minimum value of the rememberDuration if not already setvar minRememberDuration = slideDurationif (checkpointDuration != null && minRememberDuration <= checkpointDuration) {// times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)minRememberDuration = checkpointDuration * 2}if (rememberDuration == null || rememberDuration < minRememberDuration) {rememberDuration = minRememberDuration}// Initialize the dependencies dependencies.foreach(_.initialize(zeroTime))}

?

轉載于:https://www.cnblogs.com/lhfcws/p/6605085.html

總結

以上是生活随笔為你收集整理的SparkStreaming “Could not read data from write ahead log record” 报错分析解决的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。