日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kafka集群编程指南

發(fā)布時(shí)間:2024/1/23 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka集群编程指南 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

kafka集群編程指南

@(KAFKA)[kafka, 大數(shù)據(jù)]

  • kafka集群編程指南
  • 一概述
    • 一主要內(nèi)容
    • 二關(guān)于scala與java的說(shuō)明
  • 二producer的API
    • 一scala版本deprecated
      • 1一個(gè)簡(jiǎn)單例子
      • 2指定partitioner的producer
      • 關(guān)于KeyedMessage的分析
    • 二java版本
  • 三consumer的API
    • 一high level consummer
    • 二simplelow level consumer
      • 原理介紹
        • 為什么需要 SimpleConsumer
        • 使用SimpleConsumer的缺點(diǎn)
        • 使用SimpleConsumer的步驟
      • 詳細(xì)步驟
      • 3代碼中的一些方法分析
        • 1找到topic和分區(qū)的ledaer broker
        • 2查找從哪個(gè)offset開(kāi)始讀取消息
        • 3錯(cuò)誤處理
          • 4讀取數(shù)據(jù)
    • 3關(guān)于offset的指定
  • 四使用storm從kafka集群中消費(fèi)消息
  • 五使用spark streaming從kafka集群中消費(fèi)消息
  • 六與hadoop的集成

JAVA API:http://kafka.apache.org/082/javadoc/index.html

一、概述

(一)主要內(nèi)容

本文主要介紹了以下4部分內(nèi)容

(1)向kafka集群發(fā)送消息的producer JAVA API

(2)從kafka集群消費(fèi)消息的consumer JAVA API

(3)使用storm從kafka集群中消費(fèi)消息

(4)使用spark streaming從kafka集群中消費(fèi)消息

(二)關(guān)于scala與java的說(shuō)明

由于kafka本身是用scala語(yǔ)言寫的,但大多使用kafka集群的用戶都習(xí)慣使用java語(yǔ)言,因此,kafka使用scala語(yǔ)言寫了一個(gè)java版本的API,目前它同時(shí)支持producer與consumer。

從0.8.2版本開(kāi)始,kafka使用java語(yǔ)言重寫了producer API,并計(jì)劃于0.8.3(官方說(shuō)下一個(gè)版本,沒(méi)有具體說(shuō)哪個(gè))使用java語(yǔ)言重寫consumer API。官方推薦使用新producer API代替原有的scala語(yǔ)言寫的API。

總結(jié):

(1)kafka_0.8.2有2個(gè)版本producer API,分別是scala版本與java版本,前者放到源碼的core目錄下,后者放在源碼的client目錄下。官方推薦使用java語(yǔ)言版本,scala語(yǔ)言版本不再更新。

(2)kafka_0.8.2目前只有scala版本的consumer API,計(jì)劃于下一個(gè)版本中增加java版本的。

二、producer的API

此部分先簡(jiǎn)單介紹一下scala版本的API,然后再深入介紹java版本的API。

(一)scala版本(deprecated)

1、一個(gè)簡(jiǎn)單例子

基本步驟為創(chuàng)建producer——>使用producer發(fā)送消息——>關(guān)閉producer
(1)創(chuàng)建producer

Properties props = new Properties(); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list","192.168.172.117:9092"); props.put("producer.type", "sync"); Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));

(2)使用producer發(fā)送消息

producer.send(new KeyedMessage<Integer, String>("topic", "message”);

2個(gè)參數(shù)分別為topic名稱和發(fā)送的消息內(nèi)容,均為String類型。

(3)關(guān)閉producer

producer.close();

2、指定partitioner的producer

有時(shí)候,你需要指定哪條消息發(fā)送到哪個(gè)分區(qū)中去,這樣可以使得負(fù)載更加均衡,或者是滿足特定的分析要求。以下我們舉一個(gè)例子,相同ip的信息會(huì)被發(fā)送到相同的機(jī)器。

先看一些說(shuō)明,再列出完整代碼

(1)partitioner.class

server.properties中的這個(gè)參數(shù)指定了使用哪個(gè)分區(qū)類對(duì)消息進(jìn)行分區(qū),默認(rèn)為kafka.producer.DefaultPartitioner,它會(huì)根據(jù)消息的key的hash值進(jìn)行分區(qū)。

(2)指定partitioner類

我們可以定義自己的Partitioner類,然后在代碼中指定使用這個(gè)類。
這個(gè)類必須實(shí)現(xiàn)這個(gè)接口

trait Partitioner {/*** Uses the key to calculate a partition bucket id for routing* the data to the appropriate broker partition* @return an integer between 0 and numPartitions-1*/def partition(key: Any, numPartitions: Int): Int }

然后在代碼中指定使用這個(gè)類:

props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");

(3)完整代碼如下:

SimplePartitioner.java

package com.lujinhong.demo.kafka.producer;import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;/*** @author lujinhong* @date 2015年8月11日 下午6:47:02* @Description:*/ // Partitioning Code: (分區(qū)函數(shù))public class SimplePartitioner implements Partitioner {public SimplePartitioner(VerifiableProperties props) {}public int partition(Object key, int a_numPartitions) {int partition = 0;String sKey = key.toString();int offset = sKey.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt(sKey.substring(offset + 1))% a_numPartitions;}return partition;} }

PartitionerProducer.java

package com.lujinhong.demo.kafka.producer;/*** @author lujinhong* @date 2015年8月11日 下午6:35:07* @Description: */ import java.util.*;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;public class PartitionerProducer {public static void main(String[] args) {//long events = Long.parseLong(args[0]);Random rnd = new Random();Properties props = new Properties();props.put("metadata.broker.list", "192.168.172.98:9092,192.168.172.111:9092");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "com.lujinhong.demo.kafka.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);for (long nEvents = 0; nEvents < 1000; nEvents++) {long runtime = new Date().getTime();String ip = "192.168.2." + rnd.nextInt(255);String msg = runtime + ",www.example.com," + ip;KeyedMessage<String, String> data = new KeyedMessage<String, String>("ljh_test", ip, msg);producer.send(data);}producer.close();} }

3.關(guān)于KeyedMessage的分析

不管你是否使用partitioner,producer均是使用KeyedMessage發(fā)送消息的,KeyedMessage有三種形式:

(1)最完整的形式應(yīng)該是:

KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)

即有4個(gè)參數(shù),分別為topic名稱,消息的Key,分區(qū)所用的key,以及消息內(nèi)容

(2)忽略分區(qū)所有的key

def this(topic: String, key: K, message: V) = this(topic, key, key, message)

此時(shí),當(dāng)消息的key同時(shí)作為分區(qū)的key

(3)忽略所有key

def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

此時(shí),將key設(shè)置為空。

package kafka.producer/*** A topic, key, and value.* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.*/ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {if(topic == null)throw new IllegalArgumentException("Topic cannot be null.")def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)def this(topic: String, key: K, message: V) = this(topic, key, key, message)def partitionKey = {if(partKey != null)partKeyelse if(hasKey)keyelsenull }def hasKey = key != null }

(二)java版本

待補(bǔ)充

三、consumer的API

high level consumer可以完成大部分的數(shù)據(jù)消費(fèi)工作,它做了高度的封裝,使得你很容易就讀取到數(shù)據(jù)。

但如果要做一引起底層的操作,比如多次讀取數(shù)據(jù),指定offset等,則需要使用simple consumer。

(一)high level consummer

提供了更高層的抽象, 詳細(xì)請(qǐng)參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

本demo的2個(gè)類分別完成了以下2個(gè)功能:
(1)構(gòu)建一個(gè) 或者多個(gè)KafkaStream對(duì)象,并把這個(gè)對(duì)象提交到線程池中。
(2)處理這些KafkaStream。

我們先看第一個(gè)類 HighLevelConsumerDemo的關(guān)鍵步驟:
(1)創(chuàng)建 ConsumerConfig,用于指定consumer的配置。

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props); }

(2)使用上面的ConsumerConfig創(chuàng)建 ConsumerConnector對(duì)象。

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));

(3)創(chuàng)建一個(gè) KafkaStream列表

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

先是構(gòu)建一個(gè)(topic名稱,線程數(shù))的map,然后將其傳遞給consumer.createMessageStreams,這會(huì)創(chuàng)建一個(gè)以(topic名稱,KafkaStream列表)的map,其中KafkaStream列表的size為前面指定的線程數(shù)。最后,通過(guò)get就可以獲得KafkaStream列表。

First we create a Map that tells Kafka how many threads we are providing for which topics. The consumer.createMessageStreams is how we pass this information to Kafka. The return is a map of KafkaStream to listen on for each topic. (Note here we only asked Kafka for a single Topic but we could have asked for multiple by adding another element to the Map.)

就是說(shuō)這里其實(shí)可以指定多個(gè)topic的,但最好不要這樣做吧。

(4)創(chuàng)建一個(gè)線程池

executor = Executors.newFixedThreadPool(a_numThreads);

使用java concurrent包創(chuàng)建了一個(gè)固定大小的線程池。

(5)為線程池指定每個(gè)線程執(zhí)行的內(nèi)容

int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}

(6)10秒鐘后,關(guān)閉consumer與線程池。

public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}

}

處理KafkaStream對(duì)象中的內(nèi)容
(1)通過(guò) KafkaStream.iterator()來(lái)持續(xù)獲取消息。
(2)通過(guò) it.next().message()來(lái)讀取消息內(nèi)容。

關(guān)于線程數(shù)與分區(qū)數(shù)的關(guān)系:
(1)如果線程數(shù)>分區(qū)數(shù),則有一些線程一直空閑,它獲取不到任何的消息。
(2)如果線程數(shù)<分區(qū)數(shù),則有一些線程會(huì)讀取多個(gè)分區(qū)。這將不能保證不同分區(qū)間消息獲取的時(shí)間順序,但在每一個(gè)分區(qū)內(nèi)消息還是執(zhí)照順序獲取的。比如這個(gè)線程有可能從分區(qū)1獲取100個(gè)消息,然后再?gòu)姆謪^(qū)2獲取到10個(gè)消息。
因此一般而言,線程數(shù)與分區(qū)數(shù)相等即可,或者略大一點(diǎn)以作冗余。

關(guān)于關(guān)閉consumer時(shí)zk的一些說(shuō)明

由于consumer會(huì)每隔一段時(shí)間才去將offset提交到zk,因此不要馬上關(guān)閉關(guān)閉executor,而是等待一段時(shí)間后。
Kafka does not update Zookeeper with the message offset last read after every read, instead it waits a short period of time. Due to this delay it is possible that your logic has consumed a message and that fact hasn’t been synced to zookeeper. So if your client exits/crashes you may find messages being replayed next time to start.

Also note that sometimes the loss of a Broker or other event that causes the Leader for a Partition to change can also cause duplicate messages to be replayed.

To help avoid this, make sure you provide a clean way for your client to exit instead of assuming it can be ‘kill -9’d.

As an example, the main here sleeps for 10 seconds, which allows the background consumer threads to consume data from their streams 10 seconds. Since auto commit is on, they will commit offsets every second. Then, shutdown is called, which calls shutdown on the consumer, then on the ExecutorService, and finally tries to wait for the ExecutorService to finish all outsanding work. This gives the consumer threads time to finish processing the few outstanding messages that may remain in their streams. Shutting down the consumer causes the iterators for each stream to return false for hasNext() once all messages already received from the server are processed, so the other threads should exit gracefully. Additionally, with auto commit enabled, the call to consumer.shutdown() will commit the final offsets.

完整代碼如下:
(1)HighLevelConsumerDemo

package com.lujinhong.demo.kafka.consumer;import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;public class HighLevelConsumerDemo {private final ConsumerConnector consumer;private final String topic;private ExecutorService executor;public HighLevelConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = args[0];String groupId = args[1];String topic = args[2];int threads = Integer.parseInt(args[3]);HighLevelConsumerDemo example = new HighLevelConsumerDemo(zooKeeper, groupId, topic);example.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();} }

(2)ConsumerTest

package com.lujinhong.demo.kafka.consumer;/*** @author lujinhong* @date 2015年8月7日 上午10:43:58* @Description: */ import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println( "Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);} }

(二)simple(low level) consumer

1. 原理介紹

對(duì)于大部分的應(yīng)用來(lái)說(shuō),high level的api已經(jīng)足夠完成功能。如果需要更底層的功能(如定義啟動(dòng)時(shí)的offset等),則需要使用simple(low level) consumer。
詳細(xì)請(qǐng)參考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in here.

為什么需要 SimpleConsumer

使用SimpleConsumer的原因是你需要取得分區(qū)的更大控制度,這是Consumer Group做不到的。比如說(shuō):

  • 多次讀取一個(gè)消息
  • 讀取分區(qū)中的一部分消息,而不是全部
  • 做事務(wù)保障,以保證每個(gè)消息處理且僅處理一次。
  • 使用SimpleConsumer的缺點(diǎn)

    使用SimpleConsumer比起Conusmer Group需要增加大量的工作。注意對(duì)于SimpleConsumer而言,沒(méi)有Group的概念。

  • 你必須處理你的offset,以便知道應(yīng)用在哪里停止消費(fèi)的。
  • 你必須找出哪個(gè)broker是某個(gè)分區(qū)的leader。
  • 你必須處理ledaer發(fā)生變化的情況。
  • 使用SimpleConsumer的步驟

  • 找到活動(dòng)的broker并確定哪個(gè)是topic和分區(qū)的leader。
  • 找到哪個(gè)是topic和分區(qū)的relica broker。
  • 定義一個(gè)請(qǐng)求,指定你關(guān)心的數(shù)據(jù)
  • 取得數(shù)據(jù)
  • 識(shí)別leader變化,并自動(dòng)恢復(fù)
  • 2. 詳細(xì)步驟

    見(jiàn)代碼中的注釋

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//1、找到leaderPartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;//2、構(gòu)建consumer對(duì)象SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);//3、指定offsetlong readOffset = getLastOffset(consumer,a_topic, a_partition, 101000000000000L, clientName);//kafka.api.OffsetRequest.LatestTime()int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}//4、構(gòu)建FetchRequest對(duì)象FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka.build();//5、通過(guò)consumer發(fā)起消費(fèi)請(qǐng)求,返回FetchResponse對(duì)象。FetchResponse fetchResponse = consumer.fetch(req);//6、如果發(fā)生錯(cuò)誤,則進(jìn)行錯(cuò)誤處理if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {//如果設(shè)定的時(shí)間值超過(guò)了當(dāng)前時(shí)間,則返回最新的消息。// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors = 0;long numRead = 0;//7、處理獲取到的每一個(gè)消息for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果讀取到的offset小于設(shè)定的開(kāi)始o(jì)ffset,則跳過(guò)。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//輸出offset及消息內(nèi)容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null) consumer.close(); }

    3、代碼中的一些方法分析

    (1)找到topic和分區(qū)的ledaer broker

    最簡(jiǎn)單的方式是把你知識(shí)的一些broker作為參數(shù)傳給程序,可以通過(guò)配置文件、命令行等試均可。這不需要所有的broker,只需要其中一部分就可以了。

    程序說(shuō)明如下:
    * 先創(chuàng)建一個(gè)SimpleConsumer對(duì)象,幾個(gè)參數(shù)的含義如下:

    new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId)
    • 創(chuàng)建一個(gè)TopicMetadataRequest對(duì)象,然后使用consumer去發(fā)送請(qǐng)求

      List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    • 取得topic的元數(shù)據(jù),然后取得具體的分區(qū)元信息

      List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}

      一旦查找到所屬的信息,則馬上退出整個(gè)循環(huán)。

    • 最后取得replica broker,并將之保存到一個(gè)List中

      if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());} }

      完整代碼如下:

      private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic+ ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null) consumer.close();} } if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());} } return returnMetaData; }

    (2)查找從哪個(gè)offset開(kāi)始讀取消息

    現(xiàn)在我們需要定義從哪個(gè)offset開(kāi)始讀取數(shù)據(jù)。kafka提供了2個(gè)常量來(lái)指定offset
    * kafka.api.OffsetRequest.EarliestTime() :從集群現(xiàn)在最早的數(shù)據(jù)開(kāi)始讀取
    * kafka.api.OffsetRequest.LatestTime(): 從最晚的數(shù)據(jù)開(kāi)始讀取,也就是新進(jìn)入集群的消息。

    不要認(rèn)為0就是開(kāi)始的offset,因?yàn)橄?huì)過(guò)期然后被刪除的。

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0]; }

    (3)錯(cuò)誤處理

    由于SimpleConsumer不會(huì)自動(dòng)處理錯(cuò)誤,我們需要處理一下這些錯(cuò)誤。

    if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}

    一旦返回錯(cuò)誤,我們先記錄錯(cuò)誤原因,關(guān)閉consumer,然后查找一個(gè)新的leader。

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}

    This method uses the findLeader() logic we defined earlier to find the new leader, except here we only try to connect to one of the replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard.
    Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. In reality ZooKeeper often does the failover very quickly so you never sleep.

    (4)讀取數(shù)據(jù)
    // When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only. // Setting the replicaId incorrectly will cause the brokers to behave incorrectly. FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {// See code in previous section } numErrors = 0;long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--; }if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {} }

    Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
    Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn’t the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
    Finally, we keep track of the # of messages read. If we didn’t read anything on the last request we go to sleep for a second so we aren’t hammering Kafka when there is no data

    3、關(guān)于offset的指定

    (1)定義request對(duì)象

    FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) .build();

    其中readOffset是將要讀取的offset。

    (2)通過(guò)request去發(fā)送請(qǐng)求

    FetchResponse fetchResponse = consumer.fetch(req);

    (3) 遍歷每一個(gè)response

    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();//如果讀取到的offset小于設(shè)定的開(kāi)始o(jì)ffset,則跳過(guò)。if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);//輸出offset及消息內(nèi)容System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}

    (4)關(guān)于offset的指定
    首先注意區(qū)分2個(gè)概念,offset與代碼中指定的值,如EaliestTime(), LastestTime(),或者是一個(gè)13位的UNIX 時(shí)間戳。

    前者是一個(gè)數(shù)值,創(chuàng)建topic時(shí),offset為0,然后不斷的遞增。但要注意,0不總是代表kafka集群中最早的數(shù)值,因?yàn)槌^(guò)一定時(shí)間的數(shù)據(jù)會(huì)被刪除。kafka集群的數(shù)據(jù)文件中的文件名就是這個(gè)文件中第一個(gè)消息的offset。

    后者是方便程序開(kāi)發(fā)所定義的幾個(gè)值,可以是最早的消息,最晚的消息,或者是指定一個(gè)特定的時(shí)間。因?yàn)橛脩舨豢赡苷f(shuō)從哪個(gè)offset開(kāi)始消費(fèi)消息,而應(yīng)該是從哪個(gè)時(shí)間開(kāi)始消費(fèi)消息。

    下面我們看看如何通過(guò)指定一個(gè)時(shí)間來(lái)確定一個(gè)offset:

    long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);

    具體是這樣的:

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0]; }

    另:clientName有什么用呢?

    四、使用storm從kafka集群中消費(fèi)消息

    一個(gè)簡(jiǎn)單示例:
    Core Spout

    BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    Trident Spout

    TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts("localhost"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

    詳細(xì)請(qǐng)參考storm-kafka編程指南

    五、使用spark streaming從kafka集群中消費(fèi)消息

    1、定義你所需要完成的功能

    這里以日志的過(guò)濾為例:

    public class FiltersFuntion implements Function<Tuple2<String, String>, String>,akka.japi.Function<Tuple2<String, String>, String>

    關(guān)鍵是覆蓋Function中的call()方法,它定義了對(duì)每一個(gè)消息所作的處理

    @Overridepublic String call(Tuple2<String, String> v1) throws Exception {

    2、定義應(yīng)用的結(jié)構(gòu)

    public final class JavaKafkaWordCount {private static final Pattern SPACE = Pattern.compile(" ");private JavaKafkaWordCount() {}public static void main(String[] args) {if (args.length < 4) {System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");System.exit(1);}SparkConf sparkConf = new SparkConf().setAppName("ljh_JavaKafkaWordCount");// Create the context with a 1 second batch sizeJavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));int numThreads = Integer.parseInt(args[3]);Map<String, Integer> topicMap = new HashMap<String, Integer>();String[] topics = args[2].split(",");for (String topic: topics) {topicMap.put(topic, numThreads);}JavaPairReceiverInputDStream<String, String> messages =KafkaUtils.createStream(jssc, args[0], args[1], topicMap);JavaDStream<String> lines = messages.map(new FiltersFuntion() );JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {return Lists.newArrayList(SPACE.split(x));}});JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});wordCounts.print();jssc.start();jssc.awaitTermination();} }

    六、與hadoop的集成

    通過(guò)一個(gè)叫camus的第三方項(xiàng)目實(shí)現(xiàn),請(qǐng)謹(jǐn)慎使用。
    Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).

    詳細(xì)請(qǐng)參考項(xiàng)目:https://github.com/linkedin/camus/

    總結(jié)

    以上是生活随笔為你收集整理的kafka集群编程指南的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。