日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)

發布時間:2024/9/27 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 自定义Flume拦截器,并将收集的日志存储到Kafka中(案例) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.引入POM文件

如果想調用Flume,需要引入flume相關的jar包依賴,jar包依賴如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cn.com.toto.stormlogPro</artifactId><groupId>stormlogPro</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cn.com.toto.flume</artifactId><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.6.0</version><!-- 設置打包的時候,剔除依賴--><scope>provided</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.com.toto.stromlogpro.log4j.LogInfoBuilder</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build> </project>

2.自定義的攔截器的代碼

package cn.com.toto.stromlogpro.flume;import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List;/*** 自定義一個點擊流收集的攔截器* * 1、實現一個Interceptor.Builder接口。* 2、Interceptor.Builder中有個configuref方法,通過configure獲取配置文件中的相應key。* 3、Interceptor.Builder中有個builder方法,通過builder創建一個自定義的AppInterceptor* 4、AppInterceptor中有兩個方法,一個是批處理,一個單條處理,將批處理的邏輯轉換為單條處理* 5、需要在單條數據中添加 appid,由于appid是變量。需要在AppInterceptor的構造器中傳入一些參數。* 6、為自定義的AppInterceptor創建有參構造器,將需要的參數傳入進來。** @author tuzq* @create 2017-06-25 12:48*/ public class AppInterceptor implements Interceptor{//4.定義成員變量appId,用來接收從配置文件中讀取的信息private String appId;public AppInterceptor(String appId) {this.appId = appId;}/*** 單條數據進行處理,通過這個方式為日志添加上系統id* @param event* @return*/@Overridepublic Event intercept(Event event) {String message = null;try {message = new String(event.getBody(), "utf-8");} catch (UnsupportedEncodingException e) {message = new String(event.getBody());}//處理邏輯if (StringUtils.isNotBlank(message)) {message = "aid:"+appId+"||msg:" +message;event.setBody(message.getBytes());//正常邏輯應該執行到這里return event;}return event;}/*** 批量數據進行處理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {List<Event> resultList = new ArrayList<Event>();for (Event event : list) {Event r = intercept(event);if (r != null) {resultList.add(r);}}return resultList;}@Overridepublic void initialize() {}@Overridepublic void close() {}public static class AppInterceptorBuilder implements Interceptor.Builder{//1、獲取配置文件的appIdprivate String appId;@Overridepublic Interceptor build() {//3、構造攔截器return new AppInterceptor(appId);}@Overridepublic void configure(Context context) {//2、當出現default之后,就是點擊流告警系統this.appId = context.getString("appId","default");System.out.println("appId:"+appId);}} }

LogInfoBuilder的代碼如下:

package cn.com.toto.stromlogpro.log4j;import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.logging.Logger;/*** 通過這個工程模擬創建日志內容** @author tuzq* @create 2017-06-25 13:51*/ public class LogInfoBuilder {private final static Logger logger = Logger.getLogger("msg");public static void main(String[] args) {Random random = new Random();List<String> list = logInfoList();while(true) {logger.info(list.get(random.nextInt(list.size())));}}private static List<String> logInfoList() {List list = new ArrayList<String>();list.add("aid:1||msg:error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("java.sql.SQLException: You have an error in your SQL syntax;");list.add("error Unable to connect to any of the specified MySQL hosts.");list.add("error:Servlet.service() for servlet action threw exception java.lang.NullPointerException");list.add("error:Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 2");list.add("error:NoSuchMethodError: com/starit/.");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:Java.lang.IllegalStateException");list.add("error:Java.lang.IllegalMonitorStateException");list.add("error:Java.lang.NegativeArraySizeException");list.add("error:java.sql.SQLException: You have an error in your SQL syntax;");list.add("error:Java.lang.TypeNotPresentException ");list.add("error:Java.lang.UnsupprotedOperationException ");list.add("error Java.lang.IndexOutOfBoundsException");list.add("error Java.lang.ClassNotFoundException");list.add("error java.lang.ExceptionInInitializerError ");list.add("error:java.lang.IncompatibleClassChangeError ");list.add("error:java.lang.LinkageError ");list.add("error:java.lang.OutOfMemoryError ");list.add("error java.lang.StackOverflowError");list.add("error: java.lang.UnsupportedClassVersionError");list.add("error java.lang.ClassCastException");list.add("error: java.lang.CloneNotSupportedException");list.add("error: java.lang.EnumConstantNotPresentException ");list.add("error java.lang.IllegalMonitorStateException ");list.add("error java.lang.IllegalStateException ");list.add("error java.lang.IndexOutOfBoundsException ");list.add("error java.lang.NumberFormatException ");list.add("error java.lang.RuntimeException ");list.add("error java.lang.TypeNotPresentException ");list.add("error MetaSpout.java:9: variable i might not have been initialized");list.add("error MyEvaluator.java:1: class Test1 is public, should be declared in a file named Test1.java ");list.add("error Main.java:5: cannot find symbol ");list.add("error NoClassDefFoundError: asa wrong name: ASA ");list.add("error Test1.java:54: 'void' type not allowed here");list.add("error Test5.java:8: missing return statement");list.add("error:Next.java:66: cannot find symbol ");list.add("error symbol : method createTempFile(java.lang.String,java.lang.String,java.lang.String) ");list.add("error invalid method declaration; return type required");list.add("error array required, but java.lang.String found");list.add("error Exception in thread main java.lang.NumberFormatException: null 20. .");list.add("error non-static method cannot be referenced from a static context");list.add("error Main.java:5: non-static method fun1() cannot be referenced from a static context");list.add("error continue outside of loop");list.add("error MyAbstract.java:6: missing method body, or declare abstract");list.add("error Main.java:6: Myabstract is abstract; cannot be instantiated");list.add("error MyInterface.java:2: interface methods cannot have body ");list.add("error Myabstract is abstract; cannot be instantiated");list.add("error asa.java:3: modifier static not allowed here");list.add("error possible loss of precision found: long required:byte var=varlong");list.add("error java.lang.NegativeArraySizeException ");list.add("error java.lang.ArithmeticException: by zero");list.add("error java.lang.ArithmeticException");list.add("error java.lang.ArrayIndexOutOfBoundsException");list.add("error java.lang.ClassNotFoundException");list.add("error java.lang.IllegalArgumentException");list.add("error fatal error C1010: unexpected end of file while looking for precompiled header directive");list.add("error fatal error C1083: Cannot open include file: R…….h: No such file or directory");list.add("error C2011:C……clas type redefinition");list.add("error C2018: unknown character 0xa3");list.add("error C2057: expected constant expression");list.add("error C2065: IDD_MYDIALOG : undeclared identifier IDD_MYDIALOG");list.add("error C2082: redefinition of formal parameter bReset");list.add("error C2143: syntax error: missing : before ");list.add("error C2146: syntax error : missing ';' before identifier dc");list.add("error C2196: case value '69' already used");list.add("error C2509: 'OnTimer' : member function not declared in 'CHelloView'");list.add("error C2555: 'B::f1': overriding virtual function differs from 'A::f1' only by return type or calling convention");list.add("error C2511: 'reset': overloaded member function 'void (int)' not found in 'B'");list.add("error C2660: 'SetTimer' : function does not take 2 parameters");list.add("error warning C4035: 'f……': no return value");list.add("error warning C4553: '= =' : operator has no effect; did you intend '='");list.add("error C4716: 'CMyApp::InitInstance' : must return a value");list.add("error LINK : fatal error LNK1168: cannot open Debug/P1.exe for writing");list.add("error LNK2001: unresolved external symbol public: virtual _ _thiscall C (void)");list.add("error java.lang.IllegalArgumentException: Path index.jsp does not start with");list.add("error org.apache.struts.action.ActionServlet.process(ActionServlet.java:148");list.add("error org.apache.jasper.JasperException: Exception in JSP");list.add("error The server encountered an internal error () that prevented it from fulfilling this request");list.add("error org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467");list.add("error javax.servlet.http.HttpServlet.service(HttpServlet.java:803)");list.add("error javax.servlet.jsp.JspException: Cannot find message resources under key org.apache.struts.action.MESSAGE");list.add("error Stacktrace: org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467)");list.add("error javax.servlet.ServletException: Cannot find bean org.apache.struts.taglib.html.BEAN in any scope");list.add("error no data found");list.add("error exception in thread main org.hibernate.MappingException: Unknown entity:.");list.add("error using namespace std;");list.add("error C2065: 'cout' : undeclared identifier");list.add("error main already defined in aaa.obj");list.add("error syntax error : missing ';' before '}'");list.add("error cout : undeclared identifier");list.add("error weblogic.servlet.internal.WebAppServletContext$ServletInvocationAction.run(WebAp ");list.add("error Caused by: java.lang.reflect.InvocationTargetException");list.add("error Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("error at com.starit.gejie.Util.Trans.BL_getSysNamesByType(Trans.java:220)");return list;} }

MyDailyRollingFileAppender的代碼如下:

package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.DailyRollingFileAppender; import org.apache.log4j.Priority;/*** @author tuzq* @create 2017-06-25 13:58*/ public class MyDailyRollingFileAppender extends DailyRollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);} }

MyRollingFileAppender的代碼如下:

package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.Priority; import org.apache.log4j.RollingFileAppender;/*** @author tuzq* @create 2017-06-25 14:01*/ public class MyRollingFileAppender extends RollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);} }

3.在Flume中的conf配置文件,并將收集的日志下沉到kafka中

a1.sources = r1 a1.channels = c1 a1.sinks = k1a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/info.log a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.com.toto.stromlogpro.flume.AppInterceptor$AppInterceptorBuilder #通過這個參數向自定義的Flume攔截器中傳遞參數(即系統編號) a1.sources.r1.interceptors.i1.appId = 1a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = log_monitor a1.sinks.k1.brokerList = hadoop1:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1

總結

以上是生活随笔為你收集整理的自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 黄色av小说在线观看 | 蜜桃视频一区二区三区在线观看 | 日韩精品欧美精品 | 在线观看中文字幕av | 国产精品久线在线观看 | 欧美一区二区三区在线 | 日韩极品在线 | 午夜在线你懂的 | 偷拍老头老太高潮抽搐 | 亚洲一区播放 | 91视频免费网址 | 日韩一区二区视频在线播放 | 一级免费观看 | 日韩在线视频免费观看 | 91重口味 | 青青色在线 | 欧美精品成人久久 | 国产农村妇女aaaaa视频 | 免费一级a毛片夜夜看 | 草视频在线观看 | 蜜桃在线一区二区 | 肥婆大荫蒂欧美另类 | 亚州av在线播放 | 国精品人妻无码一区二区三区喝尿 | 自拍偷拍亚洲欧洲 | 99re8在线精品视频免费播放 | 最近中文字幕免费mv视频7 | 综合视频在线 | 亚洲天堂av在线免费观看 | 亚洲污片 | 亚洲一区二区在线视频 | 久久久免费网站 | 最新自拍偷拍 | 亚洲激情视频网 | 久久久网站| 国产成人精品aa毛片 | 激情综合影院 | 色视频线观看在线播放 | 夜夜操导航 | 国产精品久久久不卡 | 五月开心网| 国产成人自拍网 | 国产视频首页 | 激情国产| 亚洲精品色| 麻豆国产原创 | 欧美一级免费在线 | 日日草| 特级毛片在线观看 | 青青草中文字幕 | 国产白袜脚足j棉袜在线观看 | 老牛影视av牛牛影视av | 伊人影院在线观看视频 | 日韩欧美综合一区 | 日韩欧美精品久久 | 日韩中文字幕免费在线观看 | 三浦惠理子aⅴ一二三区 | 亚洲综合成人网 | 日本a级片视频 | 妺妺窝人体色777777 | 特黄特色大片免费播放器使用方法 | 日本人jizz | 久久人人视频 | 成年人网站免费观看 | 男人天堂怡红院 | 已满18岁免费观看电视连续剧 | 午夜福利电影一区二区 | 91毛片网站 | 亚洲AV无码国产精品播放在线 | 久草新免费 | 日韩av导航 | 狠狠躁夜夜躁av无码中文幕 | 欧美日韩一级二级三级 | 亚洲人av在线 | 黄色免费在线网站 | 中国一区二区三区 | 亚洲综合小说 | 在线 日本 制服 中文 欧美 | 十大黄台在线观看 | 偷操| 亚洲一区二区视频网站 | 五月激情六月 | 91一二区 | 黄网在线免费看 | 男人操女人动态图 | 精品无码一区二区三区在线 | 无人码人妻一区二区三区免费 | 亚洲欧美国产一区二区三区 | 亚洲国内精品 | 亚洲av色香蕉一区二区三区 | 国产一区二区视频免费 | 亚洲在线视频 | 亚洲精品入口 | 亚洲春色av| 欧美成人一区二区三区四区 | 欧美肉丝袜videos办公室 | 国产网站免费在线观看 | 四虎成人影视 | 日韩视频在线观看一区 |