kafka集群编程指南
kafka集群編程指南
@(KAFKA)[kafka, 大數(shù)據(jù)]
- kafka集群編程指南
- 一概述
- 一主要內(nèi)容
- 二關(guān)于scala與java的說(shuō)明
- 二producer的API
- 一scala版本deprecated
- 1一個(gè)簡(jiǎn)單例子
- 2指定partitioner的producer
- 關(guān)于KeyedMessage的分析
- 二java版本
- 一scala版本deprecated
- 三consumer的API
- 一high level consummer
- 二simplelow level consumer
- 原理介紹
- 為什么需要 SimpleConsumer
- 使用SimpleConsumer的缺點(diǎn)
- 使用SimpleConsumer的步驟
- 詳細(xì)步驟
- 3代碼中的一些方法分析
- 1找到topic和分區(qū)的ledaer broker
- 2查找從哪個(gè)offset開(kāi)始讀取消息
- 3錯(cuò)誤處理
- 4讀取數(shù)據(jù)
- 原理介紹
- 3關(guān)于offset的指定
- 四使用storm從kafka集群中消費(fèi)消息
- 五使用spark streaming從kafka集群中消費(fèi)消息
- 六與hadoop的集成
JAVA API:http://kafka.apache.org/082/javadoc/index.html
一、概述
(一)主要內(nèi)容
本文主要介紹了以下4部分內(nèi)容
(1)向kafka集群發(fā)送消息的producer JAVA API
(2)從kafka集群消費(fèi)消息的consumer JAVA API
(3)使用storm從kafka集群中消費(fèi)消息
(4)使用spark streaming從kafka集群中消費(fèi)消息
(二)關(guān)于scala與java的說(shuō)明
由于kafka本身是用scala語(yǔ)言寫的,但大多使用kafka集群的用戶都習(xí)慣使用java語(yǔ)言,因此,kafka使用scala語(yǔ)言寫了一個(gè)java版本的API,目前它同時(shí)支持producer與consumer。
從0.8.2版本開(kāi)始,kafka使用java語(yǔ)言重寫了producer API,并計(jì)劃于0.8.3(官方說(shuō)下一個(gè)版本,沒(méi)有具體說(shuō)哪個(gè))使用java語(yǔ)言重寫consumer API。官方推薦使用新producer API代替原有的scala語(yǔ)言寫的API。
總結(jié):
(1)kafka_0.8.2有2個(gè)版本producer API,分別是scala版本與java版本,前者放到源碼的core目錄下,后者放在源碼的client目錄下。官方推薦使用java語(yǔ)言版本,scala語(yǔ)言版本不再更新。
(2)kafka_0.8.2目前只有scala版本的consumer API,計(jì)劃于下一個(gè)版本中增加java版本的。
二、producer的API
此部分先簡(jiǎn)單介紹一下scala版本的API,然后再深入介紹java版本的API。
(一)scala版本(deprecated)
1、一個(gè)簡(jiǎn)單例子
基本步驟為創(chuàng)建producer——>使用producer發(fā)送消息——>關(guān)閉producer
(1)創(chuàng)建producer
(2)使用producer發(fā)送消息
producer.send(new KeyedMessage<Integer, String>("topic", "message”);2個(gè)參數(shù)分別為topic名稱和發(fā)送的消息內(nèi)容,均為String類型。
(3)關(guān)閉producer
producer.close();2、指定partitioner的producer
有時(shí)候,你需要指定哪條消息發(fā)送到哪個(gè)分區(qū)中去,這樣可以使得負(fù)載更加均衡,或者是滿足特定的分析要求。以下我們舉一個(gè)例子,相同ip的信息會(huì)被發(fā)送到相同的機(jī)器。
先看一些說(shuō)明,再列出完整代碼
(1)partitioner.class
server.properties中的這個(gè)參數(shù)指定了使用哪個(gè)分區(qū)類對(duì)消息進(jìn)行分區(qū),默認(rèn)為kafka.producer.DefaultPartitioner,它會(huì)根據(jù)消息的key的hash值進(jìn)行分區(qū)。
(2)指定partitioner類
我們可以定義自己的Partitioner類,然后在代碼中指定使用這個(gè)類。
這個(gè)類必須實(shí)現(xiàn)這個(gè)接口
然后在代碼中指定使用這個(gè)類:
props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");(3)完整代碼如下:
SimplePartitioner.java
package com.lujinhong.demo.kafka.producer;import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;/*** @author lujinhong* @date 2015年8月11日 下午6:47:02* @Description:*/ // Partitioning Code: (分區(qū)函數(shù))public class SimplePartitioner implements Partitioner {public SimplePartitioner(VerifiableProperties props) {}public int partition(Object key, int a_numPartitions) {int partition = 0;String sKey = key.toString();int offset = sKey.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt(sKey.substring(offset + 1))% a_numPartitions;}return partition;} }PartitionerProducer.java
package com.lujinhong.demo.kafka.producer;/*** @author lujinhong* @date 2015年8月11日 下午6:35:07* @Description: */ import java.util.*;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;public class PartitionerProducer {public static void main(String[] args) {//long events = Long.parseLong(args[0]);Random rnd = new Random();Properties props = new Properties();props.put("metadata.broker.list", "192.168.172.98:9092,192.168.172.111:9092");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);for (long nEvents = 0; nEvents < 1000; nEvents++) {long runtime = new Date().getTime();String ip = "192.168.2." + rnd.nextInt(255);String msg = runtime + ",www.example.com," + ip;KeyedMessage<String, String> data = new KeyedMessage<String, String>("ljh_test", ip, msg);producer.send(data);}producer.close();} }3.關(guān)于KeyedMessage的分析
不管你是否使用partitioner,producer均是使用KeyedMessage發(fā)送消息的,KeyedMessage有三種形式:
(1)最完整的形式應(yīng)該是:
KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)即有4個(gè)參數(shù),分別為topic名稱,消息的Key,分區(qū)所用的key,以及消息內(nèi)容
(2)忽略分區(qū)所有的key
def this(topic: String, key: K, message: V) = this(topic, key, key, message)此時(shí),當(dāng)消息的key同時(shí)作為分區(qū)的key
(3)忽略所有key
def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)此時(shí),將key設(shè)置為空。
package kafka.producer/*** A topic, key, and value.* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.*/ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {if(topic == null)throw new IllegalArgumentException("Topic cannot be null.")def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)def this(topic: String, key: K, message: V) = this(topic, key, key, message)def partitionKey = {if(partKey != null)partKeyelse if(hasKey)keyelsenull }def hasKey = key != null }(二)java版本
待補(bǔ)充
三、consumer的API
high level consumer可以完成大部分的數(shù)據(jù)消費(fèi)工作,它做了高度的封裝,使得你很容易就讀取到數(shù)據(jù)。
但如果要做一引起底層的操作,比如多次讀取數(shù)據(jù),指定offset等,則需要使用simple consumer。
(一)high level consummer
提供了更高層的抽象, 詳細(xì)請(qǐng)參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
本demo的2個(gè)類分別完成了以下2個(gè)功能:
(1)構(gòu)建一個(gè) 或者多個(gè)KafkaStream對(duì)象,并把這個(gè)對(duì)象提交到線程池中。
(2)處理這些KafkaStream。
我們先看第一個(gè)類 HighLevelConsumerDemo的關(guān)鍵步驟:
(1)創(chuàng)建 ConsumerConfig,用于指定consumer的配置。
(2)使用上面的ConsumerConfig創(chuàng)建 ConsumerConnector對(duì)象。
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));(3)創(chuàng)建一個(gè) KafkaStream列表
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);先是構(gòu)建一個(gè)(topic名稱,線程數(shù))的map,然后將其傳遞給consumer.createMessageStreams,這會(huì)創(chuàng)建一個(gè)以(topic名稱,KafkaStream列表)的map,其中KafkaStream列表的size為前面指定的線程數(shù)。最后,通過(guò)get就可以獲得KafkaStream列表。
First we create a Map that tells Kafka how many threads we are providing for which topics. The consumer.createMessageStreams is how we pass this information to Kafka. The return is a map of KafkaStream to listen on for each topic. (Note here we only asked Kafka for a single Topic but we could have asked for multiple by adding another element to the Map.)
就是說(shuō)這里其實(shí)可以指定多個(gè)topic的,但最好不要這樣做吧。
(4)創(chuàng)建一個(gè)線程池
executor = Executors.newFixedThreadPool(a_numThreads);使用java concurrent包創(chuàng)建了一個(gè)固定大小的線程池。
(5)為線程池指定每個(gè)線程執(zhí)行的內(nèi)容
int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}(6)10秒鐘后,關(guān)閉consumer與線程池。
public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}
處理KafkaStream對(duì)象中的內(nèi)容
(1)通過(guò) KafkaStream.iterator()來(lái)持續(xù)獲取消息。
(2)通過(guò) it.next().message()來(lái)讀取消息內(nèi)容。
關(guān)于線程數(shù)與分區(qū)數(shù)的關(guān)系:
(1)如果線程數(shù)>分區(qū)數(shù),則有一些線程一直空閑,它獲取不到任何的消息。
(2)如果線程數(shù)<分區(qū)數(shù),則有一些線程會(huì)讀取多個(gè)分區(qū)。這將不能保證不同分區(qū)間消息獲取的時(shí)間順序,但在每一個(gè)分區(qū)內(nèi)消息還是執(zhí)照順序獲取的。比如這個(gè)線程有可能從分區(qū)1獲取100個(gè)消息,然后再?gòu)姆謪^(qū)2獲取到10個(gè)消息。
因此一般而言,線程數(shù)與分區(qū)數(shù)相等即可,或者略大一點(diǎn)以作冗余。
關(guān)于關(guān)閉consumer時(shí)zk的一些說(shuō)明
由于consumer會(huì)每隔一段時(shí)間才去將offset提交到zk,因此不要馬上關(guān)閉關(guān)閉executor,而是等待一段時(shí)間后。
Kafka does not update Zookeeper with the message offset last read after every read, instead it waits a short period of time. Due to this delay it is possible that your logic has consumed a message and that fact hasn’t been synced to zookeeper. So if your client exits/crashes you may find messages being replayed next time to start.
Also note that sometimes the loss of a Broker or other event that causes the Leader for a Partition to change can also cause duplicate messages to be replayed.
To help avoid this, make sure you provide a clean way for your client to exit instead of assuming it can be ‘kill -9’d.
As an example, the main here sleeps for 10 seconds, which allows the background consumer threads to consume data from their streams 10 seconds. Since auto commit is on, they will commit offsets every second. Then, shutdown is called, which calls shutdown on the consumer, then on the ExecutorService, and finally tries to wait for the ExecutorService to finish all outsanding work. This gives the consumer threads time to finish processing the few outstanding messages that may remain in their streams. Shutting down the consumer causes the iterators for each stream to return false for hasNext() once all messages already received from the server are processed, so the other threads should exit gracefully. Additionally, with auto commit enabled, the call to consumer.shutdown() will commit the final offsets.
完整代碼如下:
(1)HighLevelConsumerDemo
(2)ConsumerTest
package com.lujinhong.demo.kafka.consumer;/*** @author lujinhong* @date 2015年8月7日 上午10:43:58* @Description: */ import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println( "Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);} }(二)simple(low level) consumer
1. 原理介紹
對(duì)于大部分的應(yīng)用來(lái)說(shuō),high level的api已經(jīng)足夠完成功能。如果需要更底層的功能(如定義啟動(dòng)時(shí)的offset等),則需要使用simple(low level) consumer。
詳細(xì)請(qǐng)參考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in here.
為什么需要 SimpleConsumer
使用SimpleConsumer的原因是你需要取得分區(qū)的更大控制度,這是Consumer Group做不到的。比如說(shuō):
使用SimpleConsumer的缺點(diǎn)
使用SimpleConsumer比起Conusmer Group需要增加大量的工作。注意對(duì)于SimpleConsumer而言,沒(méi)有Group的概念。
使用SimpleConsumer的步驟
2. 詳細(xì)步驟
見(jiàn)代碼中的注釋
public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//1、找到leaderPartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;//2、構(gòu)建consumer對(duì)象SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);//3、指定offsetlong readOffset = getLastOffset(consumer,a_topic, a_partition, 101000000000000L, clientName);//kafka.api.OffsetRequest.LatestTime()int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}//4、構(gòu)建FetchRequest對(duì)象FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka.build();//5、通過(guò)consumer發(fā)起消費(fèi)請(qǐng)求,返回FetchResponse對(duì)象。FetchResponse fetchResponse = consumer.fetch(req);//6、如果發(fā)生錯(cuò)誤,則進(jìn)行錯(cuò)誤處理if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {//如果設(shè)定的時(shí)間值超過(guò)了當(dāng)前時(shí)間,則返回最新的消息。// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors = 0;long numRead = 0;//7、處理獲取到的每一個(gè)消息for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果讀取到的offset小于設(shè)定的開(kāi)始o(jì)ffset,則跳過(guò)。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//輸出offset及消息內(nèi)容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null) consumer.close(); }3、代碼中的一些方法分析
(1)找到topic和分區(qū)的ledaer broker
最簡(jiǎn)單的方式是把你知識(shí)的一些broker作為參數(shù)傳給程序,可以通過(guò)配置文件、命令行等試均可。這不需要所有的broker,只需要其中一部分就可以了。
程序說(shuō)明如下:
* 先創(chuàng)建一個(gè)SimpleConsumer對(duì)象,幾個(gè)參數(shù)的含義如下:
創(chuàng)建一個(gè)TopicMetadataRequest對(duì)象,然后使用consumer去發(fā)送請(qǐng)求
List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);取得topic的元數(shù)據(jù),然后取得具體的分區(qū)元信息
List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}一旦查找到所屬的信息,則馬上退出整個(gè)循環(huán)。
最后取得replica broker,并將之保存到一個(gè)List中
if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());} }完整代碼如下:
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic+ ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null) consumer.close();} } if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());} } return returnMetaData; }
(2)查找從哪個(gè)offset開(kāi)始讀取消息
現(xiàn)在我們需要定義從哪個(gè)offset開(kāi)始讀取數(shù)據(jù)。kafka提供了2個(gè)常量來(lái)指定offset
* kafka.api.OffsetRequest.EarliestTime() :從集群現(xiàn)在最早的數(shù)據(jù)開(kāi)始讀取
* kafka.api.OffsetRequest.LatestTime(): 從最晚的數(shù)據(jù)開(kāi)始讀取,也就是新進(jìn)入集群的消息。
不要認(rèn)為0就是開(kāi)始的offset,因?yàn)橄?huì)過(guò)期然后被刪除的。
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0]; }(3)錯(cuò)誤處理
由于SimpleConsumer不會(huì)自動(dòng)處理錯(cuò)誤,我們需要處理一下這些錯(cuò)誤。
if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}一旦返回錯(cuò)誤,我們先記錄錯(cuò)誤原因,關(guān)閉consumer,然后查找一個(gè)新的leader。
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}This method uses the findLeader() logic we defined earlier to find the new leader, except here we only try to connect to one of the replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard.
Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. In reality ZooKeeper often does the failover very quickly so you never sleep.
(4)讀取數(shù)據(jù)
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only. // Setting the replicaId incorrectly will cause the brokers to behave incorrectly. FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {// See code in previous section } numErrors = 0;long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--; }if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {} }Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn’t the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
Finally, we keep track of the # of messages read. If we didn’t read anything on the last request we go to sleep for a second so we aren’t hammering Kafka when there is no data
3、關(guān)于offset的指定
(1)定義request對(duì)象
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) .build();其中readOffset是將要讀取的offset。
(2)通過(guò)request去發(fā)送請(qǐng)求
FetchResponse fetchResponse = consumer.fetch(req);(3) 遍歷每一個(gè)response
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果讀取到的offset小于設(shè)定的開(kāi)始o(jì)ffset,則跳過(guò)。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//輸出offset及消息內(nèi)容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}(4)關(guān)于offset的指定
首先注意區(qū)分2個(gè)概念,offset與代碼中指定的值,如EaliestTime(), LastestTime(),或者是一個(gè)13位的UNIX 時(shí)間戳。
前者是一個(gè)數(shù)值,創(chuàng)建topic時(shí),offset為0,然后不斷的遞增。但要注意,0不總是代表kafka集群中最早的數(shù)值,因?yàn)槌^(guò)一定時(shí)間的數(shù)據(jù)會(huì)被刪除。kafka集群的數(shù)據(jù)文件中的文件名就是這個(gè)文件中第一個(gè)消息的offset。
后者是方便程序開(kāi)發(fā)所定義的幾個(gè)值,可以是最早的消息,最晚的消息,或者是指定一個(gè)特定的時(shí)間。因?yàn)橛脩舨豢赡苷f(shuō)從哪個(gè)offset開(kāi)始消費(fèi)消息,而應(yīng)該是從哪個(gè)時(shí)間開(kāi)始消費(fèi)消息。
下面我們看看如何通過(guò)指定一個(gè)時(shí)間來(lái)確定一個(gè)offset:
long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);具體是這樣的:
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0]; }另:clientName有什么用呢?
四、使用storm從kafka集群中消費(fèi)消息
一個(gè)簡(jiǎn)單示例:
Core Spout
Trident Spout
TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts("localhost"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);詳細(xì)請(qǐng)參考storm-kafka編程指南
五、使用spark streaming從kafka集群中消費(fèi)消息
1、定義你所需要完成的功能
這里以日志的過(guò)濾為例:
public class FiltersFuntion implements Function<Tuple2<String, String>, String>,akka.japi.Function<Tuple2<String, String>, String>關(guān)鍵是覆蓋Function中的call()方法,它定義了對(duì)每一個(gè)消息所作的處理
@Overridepublic String call(Tuple2<String, String> v1) throws Exception {2、定義應(yīng)用的結(jié)構(gòu)
public final class JavaKafkaWordCount {private static final Pattern SPACE = Pattern.compile(" ");private JavaKafkaWordCount() {}public static void main(String[] args) {if (args.length < 4) {System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");System.exit(1);}SparkConf sparkConf = new SparkConf().setAppName("ljh_JavaKafkaWordCount");// Create the context with a 1 second batch sizeJavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));int numThreads = Integer.parseInt(args[3]);Map<String, Integer> topicMap = new HashMap<String, Integer>();String[] topics = args[2].split(",");for (String topic: topics) {topicMap.put(topic, numThreads);}JavaPairReceiverInputDStream<String, String> messages =KafkaUtils.createStream(jssc, args[0], args[1], topicMap);JavaDStream<String> lines = messages.map(new FiltersFuntion() );JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {return Lists.newArrayList(SPACE.split(x));}});JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});wordCounts.print();jssc.start();jssc.awaitTermination();} }六、與hadoop的集成
通過(guò)一個(gè)叫camus的第三方項(xiàng)目實(shí)現(xiàn),請(qǐng)謹(jǐn)慎使用。
Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).
詳細(xì)請(qǐng)參考項(xiàng)目:https://github.com/linkedin/camus/
總結(jié)
以上是生活随笔為你收集整理的kafka集群编程指南的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 关于kafka中的timestamp与o
- 下一篇: protocol buffer介绍(pr