日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka系列九、kafka事务原理、事务API和使用场景

發布時間:2024/4/17 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka系列九、kafka事务原理、事务API和使用场景 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、事務場景

  • 最簡單的需求是producer發的多條消息組成一個事務這些消息需要對consumer同時可見或者同時不可見 。
  • producer可能會給多個topic,多個partition發消息,這些消息也需要能放在一個事務里面,這就形成了一個典型的分布式事務。
  • kafka的應用場景經常是應用先消費一個topic,然后做處理再發到另一個topic,這個consume-transform-produce過程需要放到一個事務里面,比如在消息處理或者發送的過程中如果失敗了,消費位點也不能提交。
  • producer或者producer所在的應用可能會掛掉,新的producer啟動以后需要知道怎么處理之前未完成的事務 。
  • 流式處理的拓撲可能會比較深,如果下游只有等上游消息事務提交以后才能讀到,可能會導致rt非常長吞吐量也隨之下降很多,所以需要實現read committed和read uncommitted兩種事務隔離級別。
  • 二、幾個關鍵概念和推導

    1.因為producer發送消息可能是分布式事務,所以引入了常用的2PC,所以有事務協調者(Transaction Coordinator)。Transaction Coordinator和之前為了解決腦裂和驚群問題引入的Group Coordinator在選舉和failover上面類似。

    2.事務管理中事務日志是必不可少的,kafka使用一個內部topic來保存事務日志,這個設計和之前使用內部topic保存位點的設計保持一致。事務日志是Transaction Coordinator管理的狀態的持久化,因為不需要回溯事務的歷史狀態,所以事務日志只用保存最近的事務狀態。
    3.因為事務存在commit和abort兩種操作,而客戶端又有read committed和read uncommitted兩種隔離級別,所以消息隊列必須能標識事務狀態,這個被稱作Control Message。
    4.producer掛掉重啟或者漂移到其它機器需要能關聯的之前的未完成事務所以需要有一個唯一標識符來進行關聯,這個就是TransactionalId,一個producer掛了,另一個有相同TransactionalId的producer能夠接著處理這個事務未完成的狀態。注意不要把TransactionalId和數據庫事務中常見的transaction id搞混了,kafka目前沒有引入全局序,所以也沒有transaction id,這個TransactionalId是用戶提前配置的。
    5. TransactionalId能關聯producer,也需要避免兩個使用相同TransactionalId的producer同時存在,所以引入了producer epoch來保證對應一個TransactionalId只有一個活躍的producer epoch

    三、事務語義

    2.1.? 多分區原子寫入

    事務能夠保證Kafka topic下每個分區的原子寫入。事務中所有的消息都將被成功寫入或者丟棄。例如,處理過程中發生了異常并導致事務終止,這種情況下,事務中的消息都不會被Consumer讀取。現在我們來看下Kafka是如何實現原子的“讀取-處理-寫入”過程的。

    首先,我們來考慮一下原子“讀取-處理-寫入”周期是什么意思。簡而言之,這意味著如果某個應用程序在某個topic tp0的偏移量X處讀取到了消息A,并且在對消息A進行了一些處理(如B = F(A))之后將消息B寫入topic tp1,則只有當消息A和B被認為被成功地消費并一起發布,或者完全不發布時,整個讀取過程寫入操作是原子的。

    現在,只有當消息A的偏移量X被標記為消耗時,消息A才被認為是從topic tp0消耗的,消費到的數據偏移量(record offset)將被標記為提交偏移量(Committing offset)。在Kafka中,我們通過寫入一個名為offsets topic的內部Kafka topic來記錄offset commit。消息僅在其offset被提交給offsets topic時才被認為成功消費。

    由于offset commit只是對Kafkatopic的另一次寫入,并且由于消息僅在提交偏移量時被視為成功消費,所以跨多個主題和分區的原子寫入也啟用原子“讀取-處理-寫入”循環:提交偏移量X到offset topic和消息B到tp1的寫入將是單個事務的一部分,所以整個步驟都是原子的。

    2.2.? 粉碎“僵尸實例”

    我們通過為每個事務Producer分配一個稱為transactional.id的唯一標識符來解決僵尸實例的問題。在進程重新啟動時能夠識別相同的Producer實例。

    API要求事務性Producer的第一個操作應該是在Kafka集群中顯示注冊transactional.id。 當注冊的時候,Kafka broker用給定的transactional.id檢查打開的事務并且完成處理。 Kafka也增加了一個與transactional.id相關的epoch。Epoch存儲每個transactional.id內部元數據。

    一旦這個epoch被觸發,任何具有相同的transactional.id和更舊的epoch的Producer被視為僵尸,并被圍起來, Kafka會拒絕來自這些Procedure的后續事務性寫入。

    2.3.? 讀事務消息

    現在,讓我們把注意力轉向數據讀取中的事務一致性。

    Kafka Consumer只有在事務實際提交時才會將事務消息傳遞給應用程序。也就是說,Consumer不會提交作為整個事務一部分的消息,也不會提交屬于中止事務的消息。

    值得注意的是,上述保證不足以保證整個消息讀取的原子性,當使用Kafka consumer來消費來自topic的消息時,應用程序將不知道這些消息是否被寫為事務的一部分,因此他們不知道事務何時開始或結束;此外,給定的Consumer不能保證訂閱屬于事務一部分的所有Partition,并且無法發現這一點,最終難以保證作為事務中的所有消息被單個Consumer處理。

    簡而言之:Kafka保證Consumer最終只能提供非事務性消息或提交事務性消息。它將保留來自未完成事務的消息,并過濾掉已中止事務的消息。

    四 、事務處理Java API

    ?producer提供了五個事務方法:

  • initTransactions
  • beginTransaction
  • sendOffsets
  • commitTransaction
  • abortTransaction
  • 1、api分類

    在一個原子操作中,根據包含的操作類型,可以分為三種情況,前兩種情況是事務引入的場景,最后一種情況沒有使用價值。

  • 只有Producer生產消息;
  • 消費消息和生產消息并存,這個是事務場景中最常用的情況,就是我們常說的“consume-transform-produce ”模式
  • 只有consumer消費消息,這種操作其實沒有什么意義,跟使用手動提交效果一樣,而且也不是事務屬性引入的目的,所以一般不會使用這種情況
  • 2、事務配置

    1、創建消費者代碼,需要:

    • 將配置中的自動提交屬性(auto.commit)進行關閉
    • 而且在代碼里面也不能使用手動提交commitSync( )或者commitAsync( )
    • 設置isolation.level

    2、創建生成者,代碼如下,需要:

    • 配置transactional.id屬性
    • 配置enable.idempotence屬性

    3、“只有寫”應用程序示例

    package com.example.demo.transaction;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties; import java.util.concurrent.Future;public class TransactionProducer {private static Properties getProps(){Properties props = new Properties();props.put("bootstrap.servers", "47.52.199.53:9092");props.put("retries", 2); // 重試次數props.put("batch.size", 100); // 批量發送大小props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置props.put("linger.ms", 1000); // 發送頻率,滿足任務一個條件發送props.put("client.id", "producer-syn-2"); // 發送端id,便于統計props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("transactional.id","producer-1"); // 每臺機器唯一props.put("enable.idempotence",true); // 設置冪等性return props;}public static void main(String[] args) {KafkaProducer<String, String> producer = new KafkaProducer<>(getProps());// 初始化事務   producer.initTransactions();try {Thread.sleep(2000);// 開啟事務 producer.beginTransaction();// 發送消息到producer-synproducer.send(new ProducerRecord<String, String>("producer-syn","test3"));// 發送消息到producer-asynFuture<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>("producer-asyn","test4"));// 提交事務 producer.commitTransaction();}catch (Exception e){e.printStackTrace();// 終止事務 producer.abortTransaction();}} }

    4、消費-生產并存(consume-Transform-Produce)

    package com.example.demo.transaction;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition;import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future;public class consumeTransformProduce {private static Properties getProducerProps(){Properties props = new Properties();props.put("bootstrap.servers", "47.52.199.51:9092");props.put("retries", 3); // 重試次數props.put("batch.size", 100); // 批量發送大小props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置props.put("linger.ms", 1000); // 發送頻率,滿足任務一個條件發送props.put("client.id", "producer-syn-2"); // 發送端id,便于統計props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("transactional.id","producer-2"); // 每臺機器唯一props.put("enable.idempotence",true); // 設置冪等性return props;}private static Properties getConsumerProps(){Properties props = new Properties();props.put("bootstrap.servers", "47.52.199.51:9092");props.put("group.id", "test_3");props.put("session.timeout.ms", 30000); // 如果其超時,將會可能觸發rebalance并認為已經死去,重新選舉Leaderprops.put("enable.auto.commit", "false"); // 開啟自動提交props.put("auto.commit.interval.ms", "1000"); // 自動提交時間props.put("auto.offset.reset","earliest"); // 從最早的offset開始拉取,latest:從最近的offset開始消費props.put("client.id", "producer-syn-1"); // 發送端id,便于統計props.put("max.poll.records","100"); // 每次批量拉取條數props.put("max.poll.interval.ms","1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("isolation.level","read_committed"); // 設置隔離級別return props;}public static void main(String[] args) {// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());// 創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());// 初始化事務 producer.initTransactions();// 訂閱主題consumer.subscribe(Arrays.asList("consumer-tran"));for(;;){// 開啟事務 producer.beginTransaction();// 接受消息ConsumerRecords<String, String> records = consumer.poll(500);// 處理邏輯try {Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();for(ConsumerRecord record : records){// 處理消息System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// 記錄提交的偏移量commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()));// 產生新消息Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<>("consumer-send",record.value()+"send"));}// 提交偏移量producer.sendOffsetsToTransaction(commits,"group0323");// 事務提交 producer.commitTransaction();}catch (Exception e){e.printStackTrace();producer.abortTransaction();}}} }

    ?

    在一個事務中,既有生產消息操作又有消費消息操作,即常說的Consume-tansform-produce模式。如下實例代碼

    五、事務工作原理

    1、事務協調器和事務日志

    在Kafka 0.11.0中與事務API一起引入的組件是上圖右側的事務Coordinator和事務日志。

    事務Coordinator是每個KafkaBroker內部運行的一個模塊。事務日志是一個內部的Kafka Topic。每個Coordinator擁有事務日志所在分區的子集,即, 這些borker中的分區都是Leader。

    每個transactional.id都通過一個簡單的哈希函數映射到事務日志的特定分區,事務日志文件__transaction_state-0。這意味著只有一個Broker擁有給定的transactional.id。

    通過這種方式,我們利用Kafka可靠的復制協議和Leader選舉流程來確保事務協調器始終可用,并且所有事務狀態都能夠持久存儲。

    值得注意的是,事務日志只保存事務的最新狀態而不是事務中的實際消息。消息只存儲在實際的Topic的分區中。事務可以處于諸如“Ongoing”,“prepare commit”和“Completed”之類的各種狀態中。正是這種狀態和關聯的元數據存儲在事務日志中。

    2、事務數據流

    數據流在抽象層面上有四種不同的類型。

    A. producer和事務coordinator的交互

      執行事務時,Producer向事務協調員發出如下請求:

  • initTransactions API向coordinator注冊一個transactional.id。 此時,coordinator使用該transactional.id關閉所有待處理的事務,并且會避免遇到僵尸實例,由具有相同的transactional.id的Producer的另一個實例啟動的任何事務將被關閉和隔離。每個Producer會話只發生一次
  • 當Producer在事務中第一次將數據發送到分區時,首先向coordinator注冊分區。
  • 當應用程序調用commitTransaction或abortTransaction時,會向coordinator發送一個請求以開始兩階段提交協議。
  • B. Coordinator和事務日志交互

      隨著事務的進行,Producer發送上面的請求來更新Coordinator上事務的狀態。事務Coordinator會在內存中保存每個事務的狀態,并且把這個狀態寫到事務日志中(這是以三種方式復制的,因此是持久保存的)。

      事務Coordinator是讀寫事務日志的唯一組件。如果一個給定的Borker故障了,一個新的Coordinator會被選為新的事務日志的Leader,這個事務日志分割了這個失效的代理,它從傳入的分區中讀取消息并在內存中重建狀態。

    C.Producer將數據寫入目標Topic所在分區

      在Coordinator的事務中注冊新的分區后,Producer將數據正常地發送到真實數據所在分區。這與producer.send流程完全相同,但有一些額外的驗證,以確保Producer不被隔離。

    D.Topic分區和Coordinator的交互

  • 在Producer發起提交(或中止)之后,協調器開始兩階段提交協議。
  • 在第一階段,Coordinator將其內部狀態更新為“prepare_commit”并在事務日志中更新此狀態。一旦完成了這個事務,無論發生什么事,都能保證事務完成。
  • Coordinator然后開始階段2,在那里它將事務提交標記寫入作為事務一部分的Topic分區。
  • 這些事務標記不會暴露給應用程序,但是在read_committed模式下被Consumer使用來過濾掉被中止事務的消息,并且不返回屬于開放事務的消息(即那些在日志中但沒有事務標記與他們相關聯)。
  • 一旦標記被寫入,事務協調器將事務標記為“完成”,并且Producer可以開始下一個事務。
  • 六、事務相關配置

    1、Broker configs

    1、ransactional.id.timeout.ms:

    在ms中,事務協調器在生產者TransactionalId提前過期之前等待的最長時間,并且沒有從該生產者TransactionalId接收到任何事務狀態更新。默認是604800000(7天)。這允許每周一次的生產者作業維護它們的id

    2、max.transaction.timeout.ms

    事務允許的最大超時。如果客戶端請求的事務時間超過此時間,broke將在InitPidRequest中返回InvalidTransactionTimeout錯誤。這可以防止客戶機超時過大,從而導致用戶無法從事務中包含的主題讀取內容。

    默認值為900000(15分鐘)。這是消息事務需要發送的時間的保守上限。

    3、transaction.state.log.replication.factor

    事務狀態topic的副本數量。默認值:3

    4、transaction.state.log.num.partitions

    事務狀態主題的分區數。默認值:50

    5、transaction.state.log.min.isr

    事務狀態主題的每個分區ISR最小數量。默認值:2

    6、transaction.state.log.segment.bytes 事務狀態主題的segment大小。默認值:104857600字節

    2、Producer configs

    1、enable.idempotence:開啟冪等

    2、transaction.timeout.ms:事務超時時間

    事務協調器在主動中止正在進行的事務之前等待生產者更新事務狀態的最長時間。

    這個配置值將與InitPidRequest一起發送到事務協調器。如果該值大于max.transaction.timeout。在broke中設置ms時,請求將失敗,并出現InvalidTransactionTimeout錯誤。

    默認是60000。這使得交易不會阻塞下游消費超過一分鐘,這在實時應用程序中通常是允許的。

    3、transactional.id

    用于事務性交付的TransactionalId。這支持跨多個生產者會話的可靠性語義,因為它允許客戶端確保使用相同TransactionalId的事務在啟動任何新事務之前已經完成。如果沒有提供TransactionalId,則生產者僅限于冪等交付。

    3、Consumer configs

    1、isolation.level

    • read_uncommitted:以偏移順序使用已提交和未提交的消息。
    • read_committed:僅以偏移量順序使用非事務性消息或已提交事務性消息。為了維護偏移排序,這個設置意味著我們必須在使用者中緩沖消息,直到看到給定事務中的所有消息。

    七、事務性能以及如何優化

    1、Producer打開事務之后的性能

    讓我們把注意力轉向事務如何執行。首先,事務只造成中等的寫入放大。

    額外的寫入在于:

  • 對于每個事務,我們都有額外的RPC向Coordinator注冊分區。
  • 在完成事務時,必須將一個事務標記寫入參與事務的每個分區。同樣,事務Coordinator在單個RPC中批量綁定到同一個Borker的所有標記,所以我們在那里保存RPC開銷。但是在事務中對每個分區進行額外的寫操作是無法避免的。
  • 最后,我們將狀態更改寫入事務日志。這包括寫入添加到事務的每批分區,“prepare_commit”狀態和“complete_commit”狀態。
  • 我們可以看到,開銷與作為事務一部分寫入的消息數量無關。所以擁有更高吞吐量的關鍵是每個事務包含更多的消息。

    實際上,對于Producer以最大吞吐量生產1KB記錄,每100ms提交消息導致吞吐量僅降低3%。較小的消息或較短的事務提交間隔會導致更嚴重的降級。

    增加事務時間的主要折衷是增加了端到端延遲。回想一下,Consum閱讀事務消息不會傳遞屬于公開傳輸的消息。因此,提交之間的時間間隔越長,消耗的應用程序就越需要等待,從而增加了端到端的延遲。

    2、Consumer打開之后的性能

    Consumer在開啟事務的場景比Producer簡單得多,它需要做的是:

    • 過濾掉屬于中止事務的消息。
    • 不返回屬于公開事務一部分的事務消息。

    因此,當以read_committed模式讀取事務消息時,事務Consumer的吞吐量沒有降低。這樣做的主要原因是我們在讀取事務消息時保持零拷貝讀取。

    此外,Consumer不需要任何緩沖等待事務完成。相反,Broker不允許提前抵消包括公開事務。

    因此,Consumer是非常輕巧和高效的。感興趣的讀者可以在本文檔(鏈接2)中了解Consumer設計的細節。

    八、進一步閱讀

    我們剛剛講述了Apache Kafka中事務的表面。 幸運的是,幾乎所有的設計細節都保存在在線文檔中。 相關文件是:

    最初的Kafka KIP(鏈接3):它提供了關于數據流的設計細節,并且詳細介紹了公共接口,特別是與事務相關的配置選項。

    原始設計文檔(鏈接4):不是為了內核,這是源代碼之外的權威地方 - 了解每個事務性RPC如何處理,如何維護事務日志,如何清除事務性數據等等。

    KafkaProducerjavadocs(鏈接5):這是學習如何使用新API的好地方。頁面開始處的示例以及send方法的文檔是很好的起點。

    九、結論

    在這篇文章中,我們了解了ApacheKafka中關于事務API的關鍵設計目標,我們理解了事務API的語義,并對API的實際工作有了更高層次的理解。

    如果我們考慮“讀取-處理-寫入”周期,這篇文章主要介紹了讀寫路徑,處理本身就是一個黑盒子。事實是,在處理階段中可以做很多事情,使得一次處理不可能保證單獨使用事務API。例如,如果處理對其他存儲系統有副作用,則這里覆蓋的API不足以保證exactly once。

    Kafka Streams框架使用事務API向上移動整個價值鏈,并為各種各樣的流處理應用提供exactly once,甚至能夠在處理期間更新某些附加狀態并進行存儲。

    后續的博客文章將介紹KafkaStreams如何提供一次處理語義,以及如何編寫利用它的應用程序。

    最后,對于那些渴望了解上述API實現細節的人,我們將會有另一篇博客文章,其中涵蓋了這里描述的一些更有趣的解決方案。

    十、鏈接

    1.? https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/

    2.? https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc/edit?usp=sharing

    3.? https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

    4.? https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit?usp=sharing

    5.? https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

    6. https://my.oschina.net/xiaominmin/blog/1816437

    7. https://blog.csdn.net/ransom0512/article/details/78840042

    8. ?? https://blog.csdn.net/mlljava1111/article/details/81180351

    轉載于:https://www.cnblogs.com/wangzhuxing/p/10125437.html

    總結

    以上是生活随笔為你收集整理的kafka系列九、kafka事务原理、事务API和使用场景的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。