日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > php >内容正文

php

PHP下kafka的实践(已经测试)

發布時間:2024/9/20 php 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 PHP下kafka的实践(已经测试) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

kafka

簡介

Kafka?是一種高吞吐量的分布式發布訂閱消息系統

kafka角色必知

producer:生產者。 consumer:消費者。 topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分類, 每一類的消息稱之為一個主題(Topic)。 broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic), 并從Broker拉數據,從而消費這些已發布的消息。

經典模型

1. 一個主題下的分區不能小于消費者數量,即一個主題下消費者數量不能大于分區屬,大了就浪費了空閑了 2. 一個主題下的一個分區可以同時被不同消費組其中某一個消費者消費 3. 一個主題下的一個分區只能被同一個消費組的一個消費者消費

常用參數說明

request.required.acks

Kafka producer的ack有3中機制,初始化producer時的producerconfig可以通過配置request.required.acks不同的值來實現。0:這意味著生產者producer不等待來自broker同步完成的確認繼續發送下一條(批)消息。此選項提供最低的延遲但最弱的耐久性保證(當服務器發生故障時某些數據會丟失,如leader已死,但producer并不知情,發出去的信息broker就收不到)。1:這意味著producer在leader已成功收到的數據并得到確認后發送下一條message。此選項提供了更好的耐久性為客戶等待服務器確認請求成功(被寫入死亡leader但尚未復制將失去了唯一的消息)。-1:這意味著producer在follower副本確認接收到數據后才算一次發送完成。 此選項提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個同步副本保持存活。三種機制,性能依次遞減 (producer吞吐量降低),數據健壯性則依次遞增。

auto.offset.reset

1. earliest:自動將偏移重置為最早的偏移量 2. latest:自動將偏移量重置為最新的偏移量(默認) 3. none:如果consumer group沒有發現先前的偏移量,則向consumer拋出異常。 4. 其他的參數:向consumer拋出異常(無效參數)

kafka安裝和簡單測試

安裝kafka(不需要安裝,解包即可)

# 官方下載地址:http://kafka.apache.org/downloads # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz tar -xzf kafka_2.12-1.1.1.tgz cd kafka_2.12-1.1.0

啟動kafka server

# 需先啟動zookeeper # -daemon 可啟動后臺守護模式 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties

啟動kafka客戶端測試

# 創建一個話題,test話題2個分區 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test Created topic "test".# 顯示所有話題 bin/kafka-topics.sh --list --zookeeper localhost:2181 test# 顯示話題信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0# 啟動一個生產者(輸入消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [等待輸入自己的內容 出現>輸入即可] >i am a new msg ! >i am a good msg ?# 啟動一個消費者(等待消息) # 注意這里的--from-beginning,每次都會從頭開始讀取,你可以嘗試去掉和不去掉看下效果 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ?

安裝kafka的php擴展

# 先安裝rdkfka庫文件 git clone https://github.com/edenhill/librdkafka.git cd librdkafka/ ./configure make sudo make installgit clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make installvim [php]/php.ini extension=rdkafka.so

php代碼實踐

生產者

<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) {file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) {file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); });$rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1");$cf = new RdKafka\TopicConf(); // -1必須等所有brokers同步完成的確認 1當前服務器確認 0不確認,這里如果是0回調里的offset無返回,如果是1和-1會返回offset // 我們可以利用該機制做消息生產的確認,不過還不是100%,因為有可能會中途kafka服務器掛掉 $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf);$option = 'qkl'; for ($i = 0; $i < 20; $i++) {//RD_KAFKA_PARTITION_UA自動選擇分區//$option可選$topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); }$len = $rk->getOutQLen(); while ($len > 0) {$len = $rk->getOutQLen();var_dump($len);$rk->poll(50); }

運行生產者

php producer.php # outputint(20) int(20) int(20) int(20) int(0)# 你可以查看你剛才上面啟動的消費者shell應該會輸出消息 qkl . 0 qkl . 1 qkl . 2 qkl . 3 qkl . 4 qkl . 5 qkl . 6 qkl . 7 qkl . 8 qkl . 9 qkl . 10 qkl . 11 qkl . 12 qkl . 13 qkl . 14 qkl . 15 qkl . 16 qkl . 17 qkl . 18 qkl . 19

Low Level 消費者

<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) {file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) {file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); });//設置消費組 $conf->set('group.id', 'myConsumerGroup');$rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1");$topicConf = new RdKafka\TopicConf(); $topicConf->set('request.required.acks', 1); //在interval.ms的時間內自動提交確認、建議不要啟動 //$topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100);// 設置offset的存儲為file //$topicConf->set('offset.store.method', 'file'); // 設置offset的存儲為broker$topicConf->set('offset.store.method', 'broker'); //$topicConf->set('offset.store.path', __DIR__);//smallest:簡單理解為從頭開始消費,其實等價于上面的 earliest //largest:簡單理解為從最新的開始消費,其實等價于上面的 latest //$topicConf->set('auto.offset.reset', 'smallest');$topic = $rk->newTopic("test", $topicConf);// 參數1消費分區0 // RD_KAFKA_OFFSET_BEGINNING 重頭開始消費 // RD_KAFKA_OFFSET_STORED 最后一條消費的offset記錄開始消費 // RD_KAFKA_OFFSET_END 最后一條消費 $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {//參數1表示消費分區,這里是分區0//參數2表示同步阻塞多久$message = $topic->consume(0, 12 * 1000);if (is_null($message)) {sleep(1);echo "No more messages\n";continue;}switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;} }

High LEVEL消費者

<?php /*** Created by PhpStorm.* User: qkl* Date: 2018/8/22* Time: 17:58*/ $conf = new \RdKafka\Conf();function rebalance(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {global $offset;switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign(); // $kafka->assign([new RdKafka\TopicPartition("qkl01", 0, 0)]);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);} }// Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {rebalance($kafka, $err, $partitions); });// Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'test-110-g100');// Initial list of Kafka brokers $conf->set('metadata.broker.list', '192.168.216.122');$topicConf = new \RdKafka\TopicConf();$topicConf->set('request.required.acks', -1); //在interval.ms的時間內自動提交確認、建議不要啟動 $topicConf->set('auto.commit.enable', 0); //$topicConf->set('auto.commit.enable', 0); $topicConf->set('auto.commit.interval.ms', 100);// 設置offset的存儲為file $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', __DIR__); // 設置offset的存儲為broker // $topicConf->set('offset.store.method', 'broker');// Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest');// Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf);$consumer = new \RdKafka\KafkaConsumer($conf);//$KafkaConsumerTopic = $consumer->newTopic('qkl01', $topicConf);// Subscribe to topic 'test' $consumer->subscribe(['qkl01']);echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n";while (true) {$message = $consumer->consume(120*1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message); // $consumer->commit($message); // $KafkaConsumerTopic->offsetStore(0, 20);break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;} }

消費組特別說明

特別注意,High LEVEL消費者設置的消費組,kafka服務器才會記錄, Low?Level消費者設置的消費組,服務器不會記錄

具體查看消費組信息,你可以翻閱本篇文章

查看服務器元數據(topic/partition/broker)

<?php$conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) {file_put_contents("./xx.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) {printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); });$conf->set('group.id', 'myConsumerGroup');$rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1");$allInfo = $rk->metadata(true, NULL, 60e3);$topics = $allInfo->getTopics();echo rd_kafka_offset_tail(100); echo "--";echo count($topics); echo "--";foreach ($topics as $topic) {$topicName = $topic->getTopic();if ($topicName == "__consumer_offsets") {continue ;}$partitions = $topic->getPartitions();foreach ($partitions as $partition) { // $rf = new ReflectionClass(get_class($partition)); // foreach ($rf->getMethods() as $f) { // var_dump($f); // } // die();$topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());echo "當前的話題:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";echo "offset:" . ($topPartition->getOffset()) . PHP_EOL;} }

如果需遠端生產和消費

vim config/server.properties advertised.listeners=PLAINTEXT://ip:9092 # ip 未你kafka的外網ip即可

分享一個打包好的php-rdkafka的類庫

https://github.com/qkl9527/php-rdkafka-class

上面的git clone不能下載的時候可以直接打開github地址然后進行下載。

來源:https://segmentfault.com/a/1190000015765348

總結

以上是生活随笔為你收集整理的PHP下kafka的实践(已经测试)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。