producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探
生活随笔
收集整理的這篇文章主要介紹了
producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
本次內容我們有兩個目標:第一個初探Producer發送消息的流程第二個我們學習一下Kafka是如何構造異常體系的一、代碼分析Producer核心流程初探//因為生產中開發使用的是異步的方式發送的消息,所以我這兒直接貼的代碼//就是異步發送的代碼,大家注意這個代碼里面傳進去了兩個參數//一個是消息//一個是回調函數,這個回調函數很重要,每個消息發送完成以后這個回調函數都會被//執行,我們可以根據這個回調函數返回來的信息知道消息是否發送成功,//做相對應的應對處理。這種傳遞回調函數的代碼設計方式也值得我們積累,這樣可以增加用戶開發代碼時候的靈活性。
?producer.send(new?ProducerRecord<>(topic,
????????????????????messageNo,
????????????????????messageStr),?new?DemoCallBack(startTime,?messageNo,?messageStr));點擊過去就會看到如下核心代碼private?Future?doSend(ProducerRecord?record,?Callback?callback)?{
????????TopicPartition?tp?=?null;try?{//?first?make?sure?the?metadata?for?the?topic?is?available//第一步:阻塞等待獲取集群元數據//maxBlockTimeMs?獲取元數據最多等待的時間
????????????ClusterAndWaitTime?clusterAndWaitTime?=?waitOnMetadata(record.topic(),?record.partition(),?maxBlockTimeMs);//最多等待的時間減去等待元數據花的時間等于還可以在等待的時間
????????????long?remainingWaitMs?=?Math.max(0,?maxBlockTimeMs?-?clusterAndWaitTime.waitedOnMetadataMs);//集群元數據信息
????????????Cluster?cluster?=?clusterAndWaitTime.cluster;//第二步:對key和value進行序列化
????????????byte[]?serializedKey;try?{
????????????????serializedKey?=?keySerializer.serialize(record.topic(),?record.key());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?key?of?class?"?+?record.key().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?key.serializer");
????????????}
????????????byte[]?serializedValue;try?{
????????????????serializedValue?=?valueSerializer.serialize(record.topic(),?record.value());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?value?of?class?"?+?record.value().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?value.serializer");
????????????}//第三步:根據分區器選擇合適的分區
????????????int?partition?=?partition(record,?serializedKey,?serializedValue,?cluster);//第四步:計算消息的大小
????????????int?serializedSize?=?Records.LOG_OVERHEAD?+?Record.recordSize(serializedKey,?serializedValue);//確認消息是否超出限制
????????????ensureValidRecordSize(serializedSize);//第五步:根據元數據獲取到topic信息,封裝分區對象
????????????tp?=?new?TopicPartition(record.topic(),?partition);
????????????long?timestamp?=?record.timestamp()?==?null???time.milliseconds()?:?record.timestamp();
????????????log.trace("Sending?record?{}?with?callback?{}?to?topic?{}?partition?{}",?record,?callback,?record.topic(),?partition);//?producer?callback?will?make?sure?to?call?both?'callback'?and?interceptor?callback//第六步:設置回調對象
????????????Callback?interceptCallback?=?this.interceptors?==?null???callback?:?new?InterceptorCallback<>(callback,?this.interceptors,?tp);//第七步:把消息追加到accumulator對象中
????????????RecordAccumulator.RecordAppendResult?result?=?accumulator.append(tp,?timestamp,?serializedKey,?serializedValue,?interceptCallback,?remainingWaitMs);//消息存入accumulator中,如果一個批次滿了,或者是創建了一個新的批次//那么喚醒sender線程,讓sender線程開始干活,至于干什么活,我們后面//再去分析if?(result.batchIsFull?||?result.newBatchCreated)?{
????????????????log.trace("Waking?up?the?sender?since?topic?{}?partition?{}?is?either?full?or?getting?a?new?batch",?record.topic(),?partition);//第八步:喚醒sender線程this.sender.wakeup();
????????????}return?result.future;//?handling?exceptions?and?record?the?errors;//?for?API?exceptions?return?them?in?the?future,//?for?other?exceptions?throw?directly
????????}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}
????}看完上面的代碼,我們也就大概完成了本次的第一個目標:初探Producer的核心流程初探。代碼調用的時序圖如下:Producer發送數據流程分析二、Kafka異常體系一直跟著分析源碼的同學能感覺得到上面的代碼就是KafkaProducer的核心流程。這也是我們為什么在挑這個時候講Kafka是如何構造異常體系的原因,一般在項目的核心流程里面去觀察這個項目的異常體系會看得比較清晰,大家發現這個流程里面捕獲了很多異常:?}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}通過看這段代碼,我們可以學習到如下3個點:1. 核心流程捕獲各種異常(上面的這段代碼就是核心代碼)2. 底層異常直接往上拋,比如:ensureValidRecordSize方法3. 自定義各種異常,力求出了問題,方便精準定位問題?比如:ensureValidRecordSize方法注意:核心流程捕獲異常的時候我們也可以考慮把異常封裝成為各種狀態碼。Kafka自定義各種異常。舉個例子,比如我們分析初探核心流程里面有段代碼是://檢查要發送的這個消息大小,?檢查是否超過了請求大小和內存緩沖大小。
????????????ensureValidRecordSize(serializedSize);點擊過去private?void?ensureValidRecordSize(int?size)?{//默認值1M,如果超過1M拋異常if?(size?>?this.maxRequestSize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?maximum?request?size?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.MAX_REQUEST_SIZE_CONFIG?+"?configuration.");//不能超過內存緩沖的大小,如果超過內存大小拋異常if?(size?>?this.totalMemorySize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?total?memory?buffer?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.BUFFER_MEMORY_CONFIG?+"?configuration.");
????}RecordTooLargeException 就是自定義的異常,Kafka選擇把這種底層代碼的異常往上拋,在核心流程里統一處理。如果沒有太多工業項目設計經驗的同學,可以學習Kafka的異常體系的設計,Kafka使用的這種異常處理方式是大多數大數據項目處理異常使用的方式。三、總結本小節主要分析了KafkaProducer發送消息的大致步驟,另外此小節還有一個重點就是我們學習了Kafka是如何構建自己的異常體系的。系列更新第八期啦,后續還有更精彩的內容!喜歡的同學可以點贊,關注-? ?關注“大數據觀止”? ?- 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀
?producer.send(new?ProducerRecord<>(topic,
????????????????????messageNo,
????????????????????messageStr),?new?DemoCallBack(startTime,?messageNo,?messageStr));點擊過去就會看到如下核心代碼private?Future?doSend(ProducerRecord?record,?Callback?callback)?{
????????TopicPartition?tp?=?null;try?{//?first?make?sure?the?metadata?for?the?topic?is?available//第一步:阻塞等待獲取集群元數據//maxBlockTimeMs?獲取元數據最多等待的時間
????????????ClusterAndWaitTime?clusterAndWaitTime?=?waitOnMetadata(record.topic(),?record.partition(),?maxBlockTimeMs);//最多等待的時間減去等待元數據花的時間等于還可以在等待的時間
????????????long?remainingWaitMs?=?Math.max(0,?maxBlockTimeMs?-?clusterAndWaitTime.waitedOnMetadataMs);//集群元數據信息
????????????Cluster?cluster?=?clusterAndWaitTime.cluster;//第二步:對key和value進行序列化
????????????byte[]?serializedKey;try?{
????????????????serializedKey?=?keySerializer.serialize(record.topic(),?record.key());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?key?of?class?"?+?record.key().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?key.serializer");
????????????}
????????????byte[]?serializedValue;try?{
????????????????serializedValue?=?valueSerializer.serialize(record.topic(),?record.value());
????????????}?catch?(ClassCastException?cce)?{throw?new?SerializationException("Can't?convert?value?of?class?"?+?record.value().getClass().getName()?+"?to?class?"?+?producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()?+"?specified?in?value.serializer");
????????????}//第三步:根據分區器選擇合適的分區
????????????int?partition?=?partition(record,?serializedKey,?serializedValue,?cluster);//第四步:計算消息的大小
????????????int?serializedSize?=?Records.LOG_OVERHEAD?+?Record.recordSize(serializedKey,?serializedValue);//確認消息是否超出限制
????????????ensureValidRecordSize(serializedSize);//第五步:根據元數據獲取到topic信息,封裝分區對象
????????????tp?=?new?TopicPartition(record.topic(),?partition);
????????????long?timestamp?=?record.timestamp()?==?null???time.milliseconds()?:?record.timestamp();
????????????log.trace("Sending?record?{}?with?callback?{}?to?topic?{}?partition?{}",?record,?callback,?record.topic(),?partition);//?producer?callback?will?make?sure?to?call?both?'callback'?and?interceptor?callback//第六步:設置回調對象
????????????Callback?interceptCallback?=?this.interceptors?==?null???callback?:?new?InterceptorCallback<>(callback,?this.interceptors,?tp);//第七步:把消息追加到accumulator對象中
????????????RecordAccumulator.RecordAppendResult?result?=?accumulator.append(tp,?timestamp,?serializedKey,?serializedValue,?interceptCallback,?remainingWaitMs);//消息存入accumulator中,如果一個批次滿了,或者是創建了一個新的批次//那么喚醒sender線程,讓sender線程開始干活,至于干什么活,我們后面//再去分析if?(result.batchIsFull?||?result.newBatchCreated)?{
????????????????log.trace("Waking?up?the?sender?since?topic?{}?partition?{}?is?either?full?or?getting?a?new?batch",?record.topic(),?partition);//第八步:喚醒sender線程this.sender.wakeup();
????????????}return?result.future;//?handling?exceptions?and?record?the?errors;//?for?API?exceptions?return?them?in?the?future,//?for?other?exceptions?throw?directly
????????}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}
????}看完上面的代碼,我們也就大概完成了本次的第一個目標:初探Producer的核心流程初探。代碼調用的時序圖如下:Producer發送數據流程分析二、Kafka異常體系一直跟著分析源碼的同學能感覺得到上面的代碼就是KafkaProducer的核心流程。這也是我們為什么在挑這個時候講Kafka是如何構造異常體系的原因,一般在項目的核心流程里面去觀察這個項目的異常體系會看得比較清晰,大家發現這個流程里面捕獲了很多異常:?}?catch?(ApiException?e)?{
????????????log.debug("Exception?occurred?during?message?send:",?e);if?(callback?!=?null)
????????????????callback.onCompletion(null,?e);this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);return?new?FutureFailure(e);
????????}?catch?(InterruptedException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?new?InterruptException(e);
????????}?catch?(BufferExhaustedException?e)?{this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(KafkaException?e)?{this.errors.record();if?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}?catch?(Exception?e)?{//?we?notify?interceptor?about?all?exceptions,?since?onSend?is?called?before?anything?else?in?this?methodif?(this.interceptors?!=?null)this.interceptors.onSendError(record,?tp,?e);throw?e;
????????}通過看這段代碼,我們可以學習到如下3個點:1. 核心流程捕獲各種異常(上面的這段代碼就是核心代碼)2. 底層異常直接往上拋,比如:ensureValidRecordSize方法3. 自定義各種異常,力求出了問題,方便精準定位問題?比如:ensureValidRecordSize方法注意:核心流程捕獲異常的時候我們也可以考慮把異常封裝成為各種狀態碼。Kafka自定義各種異常。舉個例子,比如我們分析初探核心流程里面有段代碼是://檢查要發送的這個消息大小,?檢查是否超過了請求大小和內存緩沖大小。
????????????ensureValidRecordSize(serializedSize);點擊過去private?void?ensureValidRecordSize(int?size)?{//默認值1M,如果超過1M拋異常if?(size?>?this.maxRequestSize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?maximum?request?size?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.MAX_REQUEST_SIZE_CONFIG?+"?configuration.");//不能超過內存緩沖的大小,如果超過內存大小拋異常if?(size?>?this.totalMemorySize)throw?new?RecordTooLargeException("The?message?is?"?+?size?+"?bytes?when?serialized?which?is?larger?than?the?total?memory?buffer?you?have?configured?with?the?"?+
??????????????????????????????????????????????ProducerConfig.BUFFER_MEMORY_CONFIG?+"?configuration.");
????}RecordTooLargeException 就是自定義的異常,Kafka選擇把這種底層代碼的異常往上拋,在核心流程里統一處理。如果沒有太多工業項目設計經驗的同學,可以學習Kafka的異常體系的設計,Kafka使用的這種異常處理方式是大多數大數據項目處理異常使用的方式。三、總結本小節主要分析了KafkaProducer發送消息的大致步驟,另外此小節還有一個重點就是我們學習了Kafka是如何構建自己的異常體系的。系列更新第八期啦,后續還有更精彩的內容!喜歡的同學可以點贊,關注-? ?關注“大數據觀止”? ?- 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀
總結
以上是生活随笔為你收集整理的producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: stm32 adc过采样_产生ADC误差
- 下一篇: spinbox 上下箭头事件_[Reac