kafka安装及Kafka-PHP扩展的使用
實話說,如果用于隊列的話,跟PHP比較配的,還是Redis。用的順手,呵呵,只是Redis不能有多個consumer。但Kafka官方對PHP不支持,PHP擴展是愛好者或使用者寫的。下面就開始講Kafka的安裝吧。我以CentOS6.4為例,64位。
一. 首先確認下jdk有沒有安裝
使用命令
[root@localhost ~]# java -version java version "1.8.0_73" Java(TM) SE Runtime Environment (build 1.8.0_73-b02) Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)如果有以上信息的話,就往下安裝吧,有些可能是jdk對不上,那就裝到對的上的。如果沒有安裝,就看一下下面的jdk安裝方法:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到這個地址下載jdk8版本,我下載的是jdk-8u73-linux-x64.tar.gz,然后解壓到/usr/local/jdk/下。
然后打開/etc/profile文件
[root@localhost ~]# vim /etc/profile把下面這段代碼寫到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73 export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar export PATH=$JAVA_HOME/bin:$PATH最后
[root@localhost ~]# source /etc/profile這時jdk就生效了,可以使用 java -version驗證下。
二. 接下來安裝Kafka
1. 下載Kafka
到http://kafka.apache.org/downloads.html下載相應的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz
wget http://apache.fayea.com/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
2. 下載完解壓到你喜歡的目錄
我是解壓到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 運行默認的Kafka
啟動Zookeeper server
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &啟動Kafka server
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-server-start.sh config/server.properties &運行生產者producer
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test運行消費者consumer
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning這樣,在producer那邊輸入內容,consumer馬上就能接收到。
4. 當有跨機的producer或consumer連接時
需要配置config/server.properties的host.name,要不然跨機的連不上。
三. Kafka-PHP擴展
使用了一圈,就https://github.com/nmred/kafka-php可以用。
我是使用composer安裝的,以下是示例:
producer.php
<?php require 'vendor/autoload.php';while (1) {$part = mt_rand(0, 1);$produce = \Kafka\Produce::getInstance('kafka0:2181', 3000);// get available partitions$partitions = $produce->getAvailablePartitions('topic_name');var_dump($partitions);// send message$produce->setRequireAck(-1);$produce->setMessages('topic_name', 0, array(date('Y-m-d H:i:s'));sleep(3); }consumer.php
require 'vendor/autoload.php';$consumer = \Kafka\Consumer::getInstance('kafka0:2181'); $group = 'topic_name'; $consumer->setGroup($group); $consumer->setFromOffset(true); $consumer->setTopic('topic_name', 0); $consumer->setMaxBytes(102400); $result = $consumer->fetch(); print_r($result); foreach ($result as $topicName => $partition) {foreach ($partition as $partId => $messageSet) {var_dump($partition->getHighOffset());foreach ($messageSet as $message) {var_dump((string)$message);}var_dump($partition->getMessageOffset());} }來源:https://blog.csdn.net/weiwenjuan0923/article/details/76152744
?
總結
以上是生活随笔為你收集整理的kafka安装及Kafka-PHP扩展的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 地暖中央空调怎么选?
- 下一篇: 动态规划算法php,php算法学习之动态