日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

ELK 经典用法—企业自定义日志收集切割和mysql模块

發(fā)布時間:2024/4/17 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ELK 经典用法—企业自定义日志收集切割和mysql模块 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

ELK 經(jīng)典用法—企業(yè)自定義日志收集切割和mysql模塊

一、收集切割公司自定義的日志

很多公司的日志并不是和服務默認的日志格式一致,因此,就需要我們來進行切割了。

1、需切割的日志示例

2018-02-24 11:19:23,532 [143] DEBUG performanceTrace 1145 http://api.114995.com:8082/api/Carpool/QueryMatchRoutes 183.205.134.240 null 972533 310000 TITTL00 HUAWEI 860485038452951 3.1.146 HUAWEI 5.1 113.552344 33.332737 發(fā)送響應完成 Exception:(null)

?

2、切割的配置

在logstash 上,使用fifter 的grok 插件進行切割

input {beats {port => "5044"} }filter {grok {match => {"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{NUMBER:thread:int}\] %{DATA:level} (?<logger>[a-zA-Z]+) %{NUMBER:executeTime:int} %{URI:url} %{IP:clientip} %{USERNAME:UserName} %{NUMBER:userid:int} %{NUMBER:AreaCode:int} (?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) (?<Brand>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) %{NUMBER:DeviceId:int} (?<TerminalSourceVersion>[0-9a-z\.]+) %{NUMBER:Sdk:float} %{NUMBER:Lng:float} %{NUMBER:Lat:float} (?<Exception>.*)"}remove_field => "message"}date {match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]remove_field => "timestamp"}geoip {source => "clientip"target => "geoip"database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"} }output {elasticsearch {hosts => ["http://192.168.10.101:9200/"]index => "logstash-%{+YYYY.MM.dd}"document_type => "apache_logs"} }

?

3、切割解析后效果

?

4、最終kibana 展示效果

① top10 clientip

?

② top5 url

?

③ 根據(jù)ip 顯示地理位置

?

⑤ top10 executeTime

?

⑥ 其他字段都可進行設置,多種圖案,也可將多個圖形放在一起展示

?

?

二、grok 用法詳解

1、簡介

  Grok是迄今為止使蹩腳的、無結構的日志結構化和可查詢的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表現(xiàn)完美。

  Grok內(nèi)置了120多種的正則表達式庫,地址:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

?

2、入門例子

① 示例

55.3.244.1 GET /index.html 15824 0.043

?

② 分析

  這條日志可切分為5個部分,IP(55.3.244.1)、方法(GET)、請求文件路徑(/index.html)、字節(jié)數(shù)(15824)、訪問時長(0.043),對這條日志的解析模式(正則表達式匹配)如下:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

?

③ 寫到filter中

filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }

?

④ 解析后效果

client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043

?

3、解析任意格式日志

(1)解析任意格式日志的步驟:

① 先確定日志的切分原則,也就是一條日志切分成幾個部分。

② 對每一塊進行分析,如果Grok中正則滿足需求,直接拿來用。如果Grok中沒用現(xiàn)成的,采用自定義模式。

③ 學會在Grok Debugger中調(diào)試。

?

(2)grok 的分類

  • 滿足自帶的grok 正則 grok_pattern

① 可以查詢

# less /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/grok-patterns

?

② 使用格式

grok_pattern 由零個或多個 %{SYNTAX:SEMANTIC}組成

例: %{IP:clientip}

  其中SYNTAX 是表達式的名字,是由grok提供的:例如數(shù)字表達式的名字是NUMBER,IP地址表達式的名字是IP

  SEMANTIC 表示解析出來的這個字符的名字,由自己定義,例如IP字段的名字可以是 client

?

  • 自定義SYNTAX

使用格式:(?<field_name>the pattern here)

例:(?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+)

?

(3)正則解析容易出錯,強烈建議使用Grok Debugger調(diào)試,姿勢如下(我打開這個網(wǎng)頁不能用)

?

三、使用mysql 模塊,收集mysql 日志

1、官方文檔使用介紹

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-module-mysql.html

?

2、配置filebeat ,使用mysql 模塊收集mysql 的慢查詢

# vim filebeat.yml

#=========================== Filebeat prospectors ============================= filebeat.modules: - module: mysqlerror:enabled: truevar.paths: ["/var/log/mariadb/mariadb.log"]slowlog:enabled: truevar.paths: ["/var/log/mariadb/mysql-slow.log"] #----------------------------- Redis output -------------------------------- output.redis:hosts: ["192.168.10.102"]password: "ilinux.io"key: "httpdlogs"datatype: "list"db: 0timeout: 5

?

3、elk—logstash 切割mysql 的慢查詢?nèi)罩?/h3>

① 切割配置

# vim mysqllogs.conf

input {redis {host => "192.168.10.102"port => "6379"password => "ilinux.io"data_type => "list"key => "httpdlogs"threads => 2} }

filter {
  if [fields][type] == "pachongmysql" {
    grok {
      match => {
        "message" => "^#\ Time:\ (?<Time>.*)"
      }
      match => {
        "message" => "^#\ User\@Host:\ (?<User>.*)\[exiuapp\]\ \@\ \ \[%{IP:hostip}\]\ \ Id:\ \ \ \ %{NUMBER:Id:int}"
      }
      match => {
        "message" => "^#\ Query_time:\ %{NUMBER:Query_time:float}\ \ Lock_time:\ %{NUMBER:Lock_time:float}\ Rows_sent:\ %{NUMBER:Rows_sent:int}\ \ Rows_examined:\ %{NUMBER:Rows_examined:int}"
      }
      match => {
        "message" => "^use\ (?<database>.*)"
      }
      match => {
        "message" => "^SET\ timestamp=%{NUMBER:timestamp:int}\;"
      }
      match => {
        "message" => "(?<sql>.*);"
      }
      remove_field => "message"
    }
  }
}

output {

elasticsearch {hosts => ["http://192.168.10.101:9200/"]index => "logstash-%{+YYYY.MM.dd}"document_type => "mysql_logs"} }?

② 切割后顯示結果

?

?

4、kibana 最終顯示效果

① 哪幾個的數(shù)據(jù)庫最多,例:top2 庫

表無法顯示,因為有些語句不涉及表,切割不出來

?

② 哪幾個sql語句出現(xiàn)的最多,例:top5 sql語句

?

③ 哪幾個sql語句出現(xiàn)的最多,例:top5 sql語句

?

④ 哪幾臺服務器慢查詢?nèi)罩旧傻淖疃?#xff0c;例:top5 服務器

?

⑤ 哪幾個用戶慢查詢?nèi)罩旧傻淖疃?#xff0c;例:top2 用戶

?

可以合并顯示

?

5、使用mysql 模塊收集mysql 的慢查詢

(1)filebeat 配置和上邊一樣

?

(2)elk—logstash 切割mysql 的錯誤日志

# vim mysqllogs.conf

filter {grok {match => { "message" => "(?<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}) %{NUMBER:pid:int} \[%{DATA:level}\] (?<content>.*)" }}date {match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]remove_field => "timestamp"} }

?

(3)就不在展示結果了

?

四、ELK 收集多實例日志

很多情況下,公司資金不足,不會一對一收集日志;因此,一臺logstash 使用多實例收集處理多臺agent 的日志很有必要。

1、filebeat 的配置

主要是output 的配置,只需不同agent 指向不同的端口即可

① agent 1 配置指向5044 端口

#----------------------------- Logstash output -------------------------------- output.logstash:# The Logstash hostshosts: ["192.168.10.107:5044"]

② agent 2 配置指向5045 端口

#----------------------------- Logstash output -------------------------------- output.logstash:# The Logstash hostshosts: ["192.168.10.107:5045"]

?

2、logstash 的配置

針對不同的agent ,input 指定對應的端口

① agent 1

input {beats {port => "5044"} } output { #可以在output 加以區(qū)分elasticsearch {hosts => ["http://192.168.10.107:9200/"]index => "logstash-apache1-%{+YYYY.MM.dd}"document_type => "apache1_logs"} }

② agent 1

input {beats {port => "5045"} } output { #可以在output 加以區(qū)分elasticsearch {hosts => ["http://192.168.10.107:9200/"]index => "logstash-apache2-%{+YYYY.MM.dd}"document_type => "apache2_logs"} }

開啟對應的服務就ok 了

轉載于:https://www.cnblogs.com/dengbingbing/p/10485962.html

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的ELK 经典用法—企业自定义日志收集切割和mysql模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。