日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java打印设备集中管理_Kafka+Log4j实现日志集中管理

發(fā)布時(shí)間:2023/12/4 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java打印设备集中管理_Kafka+Log4j实现日志集中管理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

記錄如何使用Kafka+Log4j實(shí)現(xiàn)集中日志管理的過程。

引言

前面寫的《Spring+Log4j+ActiveMQ實(shí)現(xiàn)遠(yuǎn)程記錄日志——實(shí)戰(zhàn)+分析》得到了許多同學(xué)的認(rèn)可,在認(rèn)可的同時(shí),也有同學(xué)提出可以使用Kafka來集中管理日志,于是今天就來學(xué)習(xí)一下。

特別說明,由于網(wǎng)絡(luò)上關(guān)于Kafka+Log4j的完整例子并不多,我也是一邊學(xué)習(xí)一邊使用,因此如果有解釋得不好或者錯(cuò)誤的地方,歡迎批評指正,如果你有好的想法,也歡迎留言探討。

第一部分 搭建Kafka環(huán)境

安裝Kafka

下載:http://kafka.apache.org/downloads.html

tar zxf kafka-.tgz

cd kafka-

啟動Zookeeper

啟動Zookeeper前需要配置一下config/zookeeper.properties:

接下來啟動Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動Kafka Server

啟動Kafka Server前需要配置一下config/server.properties。主要配置以下幾項(xiàng),內(nèi)容就不說了,注釋里都很詳細(xì):

然后啟動Kafka Server:

bin/kafka-server-start.sh config/server.properties

創(chuàng)建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看創(chuàng)建的Topic

>bin/kafka-topics.sh --list --zookeeper localhost:2181

啟動控制臺Producer,向Kafka發(fā)送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

^C

啟動控制臺Consumer,消費(fèi)剛剛發(fā)送的消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message

This is another message

刪除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

注:只有當(dāng)delete.topic.enable=true時(shí),該操作才有效

配置Kafka集群(單臺機(jī)器上)

首先拷貝server.properties文件為多份(這里演示4個(gè)節(jié)點(diǎn)的Kafka集群,因此還需要拷貝3份配置文件):

cp config/server.properties config/server1.properties

cp config/server.properties config/server2.properties

cp config/server.properties config/server3.properties

修改server1.properties的以下內(nèi)容:

broker.id=1

port=9093

log.dir=/tmp/kafka-logs-1

同理修改server2.properties和server3.properties的這些內(nèi)容,并保持所有配置文件的zookeeper.connect屬性都指向運(yùn)行在本機(jī)的zookeeper地址localhost:2181。注意,由于這幾個(gè)Kafka節(jié)點(diǎn)都將運(yùn)行在同一臺機(jī)器上,因此需要保證這幾個(gè)值不同,這里以累加的方式處理。例如在server2.properties上:

broker.id=2

port=9094

log.dir=/tmp/kafka-logs-2

把server3.properties也配置好以后,依次啟動這些節(jié)點(diǎn):

bin/kafka-server-start.sh config/server1.properties &

bin/kafka-server-start.sh config/server2.properties &

bin/kafka-server-start.sh config/server3.properties &

Topic & Partition

Topic在邏輯上可以被認(rèn)為是一個(gè)queue,每條消費(fèi)都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個(gè)或多個(gè)Partition,每個(gè)Partition在物理上對應(yīng)一個(gè)文件夾,該文件夾下存儲這個(gè)Partition的所有消息和索引文件。

現(xiàn)在在Kafka集群上創(chuàng)建備份因子為3,分區(qū)數(shù)為4的Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

說明:備份因子replication-factor越大,則說明集群容錯(cuò)性越強(qiáng),就是當(dāng)集群down掉后,數(shù)據(jù)恢復(fù)的可能性越大。所有的分區(qū)數(shù)里的內(nèi)容共同組成了一份數(shù)據(jù),分區(qū)數(shù)partions越大,則該topic的消息就越分散,集群中的消息分布就越均勻。

然后使用kafka-topics.sh的--describe參數(shù)查看一下Topic為kafka的詳情:

輸出的第一行是所有分區(qū)的概要,接下來的每一行是一個(gè)分區(qū)的描述。可以看到Topic為kafka的消息,PartionCount=4,ReplicationFactor=3正是我們創(chuàng)建時(shí)指定的分區(qū)數(shù)和備份因子。

另外:Leader是指負(fù)責(zé)這個(gè)分區(qū)所有讀寫的節(jié)點(diǎn);Replicas是指這個(gè)分區(qū)所在的所有節(jié)點(diǎn)(不論它是否活著);ISR是Replicas的子集,代表存有這個(gè)分區(qū)信息而且當(dāng)前活著的節(jié)點(diǎn)。

拿partition:0這個(gè)分區(qū)來說,該分區(qū)的Leader是server0,分布在id為0,1,2這三個(gè)節(jié)點(diǎn)上,而且這三個(gè)節(jié)點(diǎn)都活著。

再來看下Kafka集群的日志:

其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此類推。

從上面的配置可知,id為0,1,2,3的節(jié)點(diǎn)分別對應(yīng)server0, server1, server2, server3。而上例中的partition:0分布在id為0, 1, 2這三個(gè)節(jié)點(diǎn)上,因此可以在server0, server1, server2這三個(gè)節(jié)點(diǎn)上看到有kafka-0這個(gè)文件夾。這個(gè)kafka-0就代表Topic為kafka的partion0。

第二部分 Kafka+Log4j項(xiàng)目整合

先來看下Maven項(xiàng)目結(jié)構(gòu)圖:

作為Demo,文件不多。先看看pom.xml引入了哪些jar包:

org.apache.kafka

kafka_2.9.2

0.8.2.1

org.apache.kafka

kafka-clients

0.8.2.1

com.google.guava

guava

18.0

重要的內(nèi)容是log4j.properties:

log4j.rootLogger=INFO,console

# for package com.demo.kafka, log would be sent to kafka appender.

log4j.logger.com.demo.kafka=DEBUG,kafka

# appender kafka

log4j.appender.kafka=kafka.producer.KafkaLog4jAppender

log4j.appender.kafka.topic=kafka

# multiple brokers are separated by comma ",".

log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095

log4j.appender.kafka.compressionType=none

log4j.appender.kafka.syncSend=true

log4j.appender.kafka.layout=org.apache.log4j.PatternLayout

log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

# appender console

log4j.appender.console=org.apache.log4j.ConsoleAppender

log4j.appender.console.target=System.out

log4j.appender.console.layout=org.apache.log4j.PatternLayout

log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

App.Java里面就很簡單啦,主要是通過log4j輸出日志:

package com.demo.kafka;

import org.apache.log4j.Logger;

public class App {

private static final Logger LOGGER = Logger.getLogger(App.class);

public static void main(String[] args) throws InterruptedException {

for (int i = 0; i < 20; i++) {

LOGGER.info("Info [" + i + "]");

Thread.sleep(1000);

}

}

}

MyConsumer.java用于消費(fèi)kafka中的信息:

package com.demo.kafka;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import com.google.common.collect.ImmutableMap;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

public class MyConsumer {

private static final String ZOOKEEPER = "localhost:2181";

//groupName可以隨意給,因?yàn)閷τ趉afka里的每條消息,每個(gè)group都會完整的處理一遍

private static final String GROUP_NAME = "test_group";

private static final String TOPIC_NAME = "kafka";

private static final int CONSUMER_NUM = 4;

private static final int PARTITION_NUM = 4;

public static void main(String[] args) {

// specify some consumer properties

Properties props = new Properties();

props.put("zookeeper.connect", ZOOKEEPER);

props.put("zookeeper.connectiontimeout.ms", "1000000");

props.put("group.id", GROUP_NAME);

// Create the connection to the cluster

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumerConnector =

Consumer.createJavaConsumerConnector(consumerConfig);

// create 4 partitions of the stream for topic “test”, to allow 4

// threads to consume

Map>> topicMessageStreams =

consumerConnector.createMessageStreams(

ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));

List> streams = topicMessageStreams.get(TOPIC_NAME);

// create list of 4 threads to consume from each of the partitions

ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);

// consume the messages in the threads

for (final KafkaStream stream : streams) {

executor.submit(new Runnable() {

public void run() {

for (MessageAndMetadata msgAndMetadata : stream) {

// process message (msgAndMetadata.message())

System.out.println(new String(msgAndMetadata.message()));

}

}

});

}

}

}

MyProducer.java用于向Kafka發(fā)送消息,但不通過log4j的appender發(fā)送。此案例中可以不要。但是我還是放在這里:

package com.demo.kafka;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class MyProducer {

private static final String TOPIC = "kafka";

private static final String CONTENT = "This is a single message";

private static final String BROKER_LIST = "localhost:9092";

private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";

public static void main(String[] args) {

Properties props = new Properties();

props.put("serializer.class", SERIALIZER_CLASS);

props.put("metadata.broker.list", BROKER_LIST);

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);

//Send one message.

KeyedMessage message =

new KeyedMessage(TOPIC, CONTENT);

producer.send(message);

//Send multiple messages.

List> messages =

new ArrayList>();

for (int i = 0; i < 5; i++) {

messages.add(new KeyedMessage

(TOPIC, "Multiple message at a time. " + i));

}

producer.send(messages);

}

}

到這里,代碼就結(jié)束了。

第三部分 運(yùn)行與驗(yàn)證

先運(yùn)行MyConsumer,使其處于監(jiān)聽狀態(tài)。同時(shí),還可以啟動Kafka自帶的ConsoleConsumer來驗(yàn)證是否跟MyConsumer的結(jié)果一致。最后運(yùn)行App.java。

先來看看MyConsumer的輸出:

再來看看ConsoleConsumer的輸出:

可以看到,盡管發(fā)往Kafka的消息去往了不同的地方,但是內(nèi)容是一樣的,而且一條也不少。最后再來看看Kafka的日志。

我們知道,Topic為kafka的消息有4個(gè)partion,從之前的截圖可知這4個(gè)partion均勻分布在4個(gè)kafka節(jié)點(diǎn)上,于是我對每一個(gè)partion隨機(jī)選取一個(gè)節(jié)點(diǎn)查看了日志內(nèi)容。

上圖中黃色選中部分依次代表在server0上查看partion0,在server1上查看partion1,以此類推。

而紅色部分是日志內(nèi)容,由于在創(chuàng)建Topic時(shí)準(zhǔn)備將20條日志分成4個(gè)區(qū)存儲,可以很清楚的看到,這20條日志確實(shí)是很均勻的存儲在了幾個(gè)partion上。

摘一點(diǎn)Infoq上的話:每個(gè)日志文件都是一個(gè)log entrie序列,每個(gè)log entrie包含一個(gè)4字節(jié)整型數(shù)值(值為N+5),1個(gè)字節(jié)的"magic value",4個(gè)字節(jié)的CRC校驗(yàn)碼,其后跟N個(gè)字節(jié)的消息體。每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:

message length : 4 bytes (value: 1+4+n)

"magic" value : 1 byte

crc : 4 bytes

payload : n bytes

這里我們看到的日志文件的每一行,就是一個(gè)log entrie,每一行前面無法顯示的字符(藍(lán)色選中部分),就是(message length + magic value + crc)了。而log entrie的后部分,則是消息體的內(nèi)容了。

問題:

1. 如果要使用此種方式,有一種場景是提取某天或者某小時(shí)的日志,那么如何設(shè)計(jì)Topic呢?是不是要在Topic上帶入日期或者小時(shí)數(shù)?還有更好的設(shè)計(jì)方案嗎?

2. 假設(shè)按每小時(shí)設(shè)計(jì)Topic,那么如何在使用諸如logger.info()這樣的方法時(shí),自動根據(jù)時(shí)間去改變Topic呢?有類似的例子嗎?

----歡迎交流,共同進(jìn)步。

樣例下載:

------------------------------------------分割線------------------------------------------

具體下載目錄在 /2015年資料/12月/13日/Kafka+Log4j實(shí)現(xiàn)日志集中管理

------------------------------------------分割線------------------------------------------

參考頁面:

相關(guān)閱讀:

Kafka 的詳細(xì)介紹:請點(diǎn)這里

Kafka 的下載地址:請點(diǎn)這里

總結(jié)

以上是生活随笔為你收集整理的java打印设备集中管理_Kafka+Log4j实现日志集中管理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。