日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Logstash:如何处理 Logstash pipeline 错误信息

發(fā)布時間:2024/3/12 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Logstash:如何处理 Logstash pipeline 错误信息 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在我們使用 Logstash 的時候經(jīng)常會出現(xiàn)一些錯誤。比如當(dāng)我們使用 dissect 這樣的 filter 時,會出現(xiàn)格式不匹配從而導(dǎo)致錯誤。那么我們該如何處理這類錯誤呢?當(dāng) dissect 遇到錯誤的格式不能進行解析時,會為文檔添加一個叫做?_dissectfailure 的標簽,并繼續(xù)處理該事件:

那么我們該如何處理該類錯誤的信息呢?

一種比較好的辦法就是通過 elasticsearch output 把他存放于另外一個索引中。我們先用如下的例子來進行實驗。

dissect.conf

input {generator {message => "<1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"count => 1} }filter {if [message] =~ "THREAT," {dissect {mapping => {message => "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pan_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app},%{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_flags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number},%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan_cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan_report_id},%{pan_anymore}"}}} }output {stdout { codec => rubydebug } }

上面的 pipeline 在正常沒有錯誤的情況下,會生成如下的結(jié)果:

現(xiàn)在假如我們修改上面的 generator 部分。在它的前面添加一個空格:

input {generator {message => " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"count => 1} }

由于 dissect 對格式是非常的挑剔。格式不對那么它就會生成一個錯誤。為此,它會為文檔添加一個叫做?_dissectfailure 的標簽。我們可以依據(jù)這個標簽,把文檔保存于一個叫做 parsefailures 的索引中:

input {generator {message => " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"count => 100} }filter {if [message] =~ "THREAT," {dissect {mapping => {message => "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pan_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app},%{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_flags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number},%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan_cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan_report_id},%{pan_anymore}"}}} }output {stdout { codec => rubydebug }if "_dissectfailure" in [tags] {elasticsearch {index => "parsefailures"hosts => [ "localhost:9200" ]}} }

在上面我有意識地把 generator 中的 count 增加到100。這樣確保在 Logstash 退出之前,有時間把內(nèi)容寫到 Elasticsearch 中去。我們重新運行 Logstash:

我們發(fā)現(xiàn)一個錯誤的信息。它說明在使用 dissect filter 時導(dǎo)致錯誤。我們可以在 Kibana 中檢查 parsefailures 這個索引:

GET parsefailures/_search {"took" : 1,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 102,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "parsefailures","_type" : "_doc","_id" : "3Llu8ncBReLdFyHVZsv0","_score" : 1.0,"_source" : {"@timestamp" : "2021-03-02T10:13:45.332Z","tags" : ["_dissectfailure"],"sequence" : 0,"message" : " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54","host" : "liuxg","@version" : "1"}},{"_index" : "parsefailures","_type" : "_doc","_id" : "37l08ncBReLdFyHVUcs4","_score" : 1.0,"_source" : {"tags" : ["_dissectfailure"],"host" : "liuxg","@timestamp" : "2021-03-02T10:20:44.841Z","sequence" : 12,"message" : " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54","@version" : "1"}},...

?

總結(jié)

以上是生活随笔為你收集整理的Logstash:如何处理 Logstash pipeline 错误信息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。