日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Web服务http日志收集

發布時間:2023/12/16 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Web服务http日志收集 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

需求場景:

架構設計:

端口號:

準備資源:

Linux版本:

準備Java環境

ElasticSearch:

下載安裝

配置:

啟動:

Zookeeper

下載安裝:

配置:

啟動:

Kafka

下載安裝:

配置:

啟動命令:

準備和調試:

Logstash:

下載安裝:

配置:

logstash服務配置:

被監聽的服務配置:

啟動命令:

Kibana:

下載安裝:

配置:

啟動

操作:

案例:

Web服務代碼改造:

集成Kafka:

pom.xml添加依賴:

?添加kafka的配置:

?利用切面捕獲入庫信息:

需要注意的細節:


需求場景:

目前已經運行了一個JavaWeb應用,需要能自定義收集http的request和response,做到報文可追溯,可統計,方便查詢,同時不能對現有web服務的http請求造成影響。

?

架構設計:

1 利用AOP獲取controller層的reqeust和response,并根據自定義要求做Json序列化

2 AOP獲取的內容發送到Kafka,利用MQ的特性,減少對http請求造成的延時

3 LogStash從Kafka中消費Json信息,對內容清洗后按照規則發送給不同的es索引

4 ElasticSearch存儲內容

5 三種方式檢索es中的內容:

?第一種:在服務器中搭建Kibana,所有用戶利用該Kibana來操作es;(推薦)

?第二種:curl直接操作es

?第三種:客戶端本地安裝Kibana,利用客戶端Kibana來操作es

?

端口號:

9200:ElasticSearch的端口

9092:Kafka的端口

2181:ZooKeeper的端口

5601: Kibana的端口

這些端口號是組建默認的端口,可以根據自己的需要進行配置,如果云平臺有安全組限制或者實例中有防火墻的限制,需要打開它們。

?

準備資源:

幾個控件可以分開安裝,也可裝在一臺機器上,建議初期都在一臺里,網絡傳輸消耗少。

本篇重在介紹如何把它們串聯起來,所有控件先只做單節點,而且這些控件集群配置相對都很簡單,在以前的博客中都有介紹過。

最低配置:需要4核8G,或者兩臺2核4G的也可以。

AWS中最低是C4.xlarge 推薦C4.2xlarge

?

Linux版本:

CentOS-7-x86_64-GenericCloud-1802(小版本號無要求,centos7即可)

Jdk:jdk-8u171-linux-x64.tar.gz(小版本號無要求,jdk8即可)

Centos用戶,如果是新機器需要重設下密碼:

$sudo passwd centos

?

準備Java環境

Jdk:jdk-8u171-linux-x64.tar.gz 配置java環境 $vi ~/.bashrc export JAVA_HOME=/opt/jdk/jdk1.8.0_171 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin $source ~/.bashrc

?

ElasticSearch:

下載安裝

$wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.1.zip

$unzip elasticsearch-5.6.1.zip

最好安裝一個中文插件(如果是純英文場景可以跳過此步驟)

wget https://github.com/yejingtao/forblog/raw/master/ik/elasticsearch-analysis-ik-5.6.1.zip

中文插件的原因和使用方式:

https://blog.csdn.net/yejingtao703/article/details/78392902

配置:

涉及到的配置文件都在config下

1、elasticsearch.yml:elastic結點、集群的配置信息;

2、jvm.options:jvm的配置信息,里面找到默認啟動內存是2G,最低可以改成512m

3、log4j2.properties:elastic的log的配置文件。

?

需要修改綁定地址,否則es不能被外部訪問:

修改config 下的配置文件elasticsearch.yml,將#network.host: 192.168.0.1注釋放開,同時修改成你對外的IP

啟動:

$cd bin

$./elasticsearch

如果遇到這樣的報錯:

ERROR: [2] bootstrap checks failed

[1]: max file descriptors [4096] forelasticsearch process is too low, increase to at least [65536]

[2]: max virtual memory areas vm.max_map_count[65530] is too low, increase to at least [262144]

解決問題1:執行下命令ulimit -n?65536

解決問題2:需要修改/etc/sysctl.conf配置文件

echo "vm.max_map_count=262144">>/etc/sysctl.conf

sysctl –p//立刻生效

其中在解決ulimit -n?65536問題是被卡住了,原因是centos用戶沒有ulimit命令的權限

解決方案:

$sudo vi /etc/security/limits.conf 在配置文件的最后添加6行: centos soft nproc 16384 centos hard nproc 16384 centos soft nofile 65536 centos hard nofile 65536 centos soft memlock 4000000 centos hard memlock 4000000 保存退出后需要重新切換一次centos用戶 $su – centos

在外網的瀏覽器中確認能訪問到elasticSearch(http://ip:9200)就可以確定安裝完畢

?

建議做為服務器應用將elasticSearch轉為后臺運行:

$nohup elasticsearch-5.6.1/bin/elasticsearch &

?

Zookeeper

Zookeeper是使用Kafka的先決條件

下載安裝:

$wget?http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

$gunzip zookeeper-3.4.10.tar.gz

?

配置:

將/conf/下zoo_sample.cfg 重命名為 zoo.cfg

vim zoo.cfg dataDir=/tmp/zookeeper (數據文件) dataLogDir=/tmp/zookeeperlog (日志文件)

啟動:

./bin/zkServer.sh start

2181是zk的默認端口,啟動后可以telnet檢查下是否啟動成功(也可以不檢查,因為zk啟動失敗的話,后面的kafka是不會啟動成功的)

?

Kafka

下載安裝:

$wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz

$gunzip kafka_2.11-2.1.1.tgz

?

配置:

$vi conf/server.properties

#listeners=PLAINTEXT://:9092改成自己的地址

advertised.listeners=PLAINTEXT也要改成外網地址,深坑,否則外面的生產者解析不到kafka的地址。

?

啟動命令:

$bin/kafka-server-start.sh config/server.properties

同理上線使用時后臺運行

$nohup bin/kafka-server-start.sh config/server.properties &

?

檢查Kafka是否啟動成功,調用以下命令查詢下kafka的topic

$bin/kafka-topics.sh --list --zookeeper {yourip}:2181

?

準備和調試:

創建好topic,給接入代碼使用

$bin/kafka-topics.sh --create --zookeeper {yourip}:2181 --replication-factor 1 --partitions 1 --topic httplog

像http日志收集這種應用場景,java代碼只做生產者,可以直接用kafka自帶的消費者來進行調試,不需要單獨開發消費代碼。

kafka消費者調測:

$bin/kafka-console-consumer.sh --bootstrap-server {yourip}:9092 --topic httplog

同理,kafka也自帶生產者調測工具,這里不再介紹

?

Logstash:

下載安裝:

$wget https://artifacts.elastic.co/downloads/logstash/logstash-5.6.3.zip

$unzip logstash-5.6.3.zip

?

配置:

logstash服務配置:

配置文件在HOME/config下

修改logstash.yml

# Bind address for the metrics REST endpoint # # http.host: "127.0.0.1" 重新配置http.host端口: # Bind port for the metrics REST endpoint, this option also accept a range # (9600-9700) and logstash will pick up the first available ports. # # http.port: 9600-9700

這里注意下就好,主要根據這個端口范圍來關心logstash的死活,這里把logstash架在kafka和elasticsearch之間,我們不會通過端口去訪問它。

?

被監聽的服務配置:

隨便一個位置添加一個啟動配置文件logstash.conf,建議也是放在logstash的home目錄下,內容如下:

input {kafka {bootstrap_servers => ["10.100.1.142:9092"]client_id => "fwapi"group_id => "fwapi"auto_offset_reset => "latest"consumer_threads => 5topics => ["httplog"]codec => json {charset => "UTF-8"}} }filter {json{source => "message"target => "message"} }output {elasticsearch{hosts => ["10.100.1.142:9200"]index => "httplog-%{+YYYY.MM.dd}"timeout => 300} }

啟動命令:

$bin/logstash -f logstash.conf

上生產時需要轉到后臺

$nohup bin/logstash -f logstash.conf &

?

?

Kibana:

下載安裝:

https://www.elastic.co/downloads/kibana

下載時請注意Kibana版本要與elasticsearc版本一致,否則會有下面這種坑

?

所以對應上述es版本的kibana安裝包是:

https://artifacts.elastic.co/downloads/kibana/kibana-5.6.1-linux-x86_64.tar.gz

?

配置:

修改config/kibana.yml

#elasticsearch.hosts: ["http://localhost:9200"] 改成自己elasticsearch的地址 #server.host: "localhost" 改成自己的kibana地址

啟動

$./kibana

上生產時需要后臺運行

$ nohup ./kibana &

?

操作:

地址:http://hostname:5601/

Dev Tools里是curl的查詢界面,

案例:

具體某一天中某一個接口的請求:GET httplog-2019.02.28/_search

如果按月查詢:GET httplog-2019.02.*/_search

按年查詢GET httplog-2019.*/_search

全量查詢GET /_search

壓力監控(QPS):

可以聚合和報表的類型:

?

成品案例:

?

?

Web服務代碼改造:

集成Kafka:

pom.xml添加依賴:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>

?添加kafka的配置:

spring.kafka.producer.bootstrap-servers=10.100.129.142:9092 spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=4096 spring.kafka.producer.buffer-memory=40960 package com.fw.tester.config;import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;@Configuration @EnableKafka public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String servers;@Value("${spring.kafka.producer.retries}")private int retries;@Value("${spring.kafka.producer.batch-size}")private int batchSize;@Value("${spring.kafka.producer.buffer-memory}")private int bufferMemory;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());} }

?利用切面捕獲入庫信息:

package com.fw.tester.aspect;import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.AfterThrowing; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.springframework.core.annotation.Order; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse;/*** Aspect for http log*/@Aspect @Order(5) @Component @Slf4j public class WebLogJsonAspect {@Autowiredprivate KafkaTemplate kafkaTemplate;@Value("${spring.kafka.enable}")private boolean kafkaEnable;ThreadLocal<Long> startTime = new ThreadLocal<>();@Pointcut("execution(public * com.fw.tester.controller..*.*(..))")public void webLog(){}@Before("webLog()")public void doBefore(JoinPoint joinPoint) throws Throwable {startTime.set(System.currentTimeMillis());ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = attributes.getRequest();LogRequest logRequest = new LogRequest(System.currentTimeMillis(), request.getRequestURL().toString(), request.getMethod(),joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName(), JSON.toJSONString(joinPoint.getArgs()));log.info(JSON.toJSONString(logRequest));if(kafkaEnable) {try {kafkaTemplate.send("httplog", JSON.toJSONString(logRequest));log.info("Send message to kafka successfully");} catch (Exception e) {log.error("Send message to kafka unsuccessfully", e);e.printStackTrace();}}}@AfterReturning(returning = "ret", pointcut = "webLog()")public void doAfterReturning(Object ret) throws Throwable {HttpServletResponse response = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();LogResponse logResponse = new LogResponse(startTime.get(), response.getStatus(), ret, System.currentTimeMillis() - startTime.get());log.info(JSON.toJSONString(logResponse));if(kafkaEnable) {try {kafkaTemplate.send("httplog", JSON.toJSONString(logResponse));log.info("Send message to kafka successfully");} catch (Exception e) {log.error("Send message to kafka unsuccessfully", e);e.printStackTrace();}}}@AfterThrowing(throwing="ex", pointcut = "webLog()")public void doThrowing(Throwable ex){LogResponse logResponse = new LogResponse(startTime.get(), HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), System.currentTimeMillis() - startTime.get());log.info(JSON.toJSONString(logResponse));if(kafkaEnable) {try {kafkaTemplate.send("httplog", JSON.toJSONString(logResponse));log.info("Send message to kafka successfully");} catch (Exception e) {log.error("Send message to kafka unsuccessfully", e);e.printStackTrace();}}}@Data@AllArgsConstructorclass LogRequest {private long traceId;private String url;private String httpMethod;private String classMethod;private String args;}@Data@AllArgsConstructorclass LogResponse {private long traceId;private int status;private Object response;private long spendTime;}}

?完整的示例代碼請見:https://github.com/yejingtao/fw-tester

?

需要注意的細節:

在elasticsearch中盡量用“小索引”,利用Kibana的“大索引”做上層的封裝,這樣一旦出現問題方便索引的重建和恢復。

舉例:

http日志每天都用新的索引來保存記錄httplog-2019.03.02,在kibana中大索引配置成httplog-2019*,如果某一天因為新屬性導致插入失敗,只需要重建當天索引就好。


?

總結

以上是生活随笔為你收集整理的Web服务http日志收集的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。