日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Oozie JMS通知消息实现--根据作业ID来过滤消息

發布時間:2023/12/31 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Oozie JMS通知消息实现--根据作业ID来过滤消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一,介紹

本文使用Oozie的消息通知功能,并根據JMS規范中的消息選擇器(Selector)實現 根據作業的ID來過濾消息。

首先搭建好JMS Provider(ActiveMQ) ,并進行相關配置,這樣Oozie Server就可以把消息發送給JMS Provider了,我們使用了ActiveMQ作為消息服務器。相關配置可參考:Oozie 使用ActiveMQ實現 JMS通知

在ActiveMQ中配置好Topic,該Topic名稱為${username}。即該${username}提交的所有的作業,作業的執行結果都會發送到名為${username}的Topic上。

對于Oozie而言,每提交一個作業會生成一個JobID,而我們的需求是,只對某些JobID感興趣,并不是對該用戶提交的所有作業感興趣。因此,需要根據JobID來過濾訂閱到${username}這個Topic上的消息。

?

二,實現JMS 消息選擇器

根據JMS規范,在創建一個消費者時,可使用消息選擇器。這樣消費者就只能接收到被消息選擇器過濾以后的消息了。而消息選擇器使用 消息屬性 和 消息頭作為條件表達式,這些條件表達式使用boolean邏輯來聲明應該將哪一條消息傳遞給JMS消費者。請注意:消息選擇器無法參考消息體內的數據,它只能使用消息頭和消息屬性。

于是,我們就需要知道Oozie生成的JMS消息的消息頭和消息屬性是什么?參考官方文檔可以看出,Oozie生成的JMS消息的頭部由JMSHeaderConstants類來定義。其源代碼如下:

1 /** 2 * 3 * Class holding constants used in JMS selectors 4 */ 5 public final class JMSHeaderConstants { 6 // JMS Application specific properties for selectors 7 public static final String EVENT_STATUS = "eventStatus"; 8 public static final String SLA_STATUS = "slaStatus"; 9 public static final String APP_NAME = "appName"; 10 public static final String USER = "user"; 11 public static final String MESSAGE_TYPE = "msgType"; 12 public static final String APP_TYPE = "appType"; 13 14 //public static final String JOBID = "jobId";// add for my specific selectors 15 // JMS Header property 16 public static final String MESSAGE_FORMAT = "msgFormat"; 17 18 }

從中可以看出,可以使用EVENT_STATUS、SLA_STATUS、USER……相關屬性構造選擇器。但是官方JMSHeaderConstants類的源代碼中并沒有JobID這個屬性。

因此,需要修改源代碼,添加JobID,以使得我們消費者能夠根據JobID進行過濾。(如上,第14行就是我自己添加的代碼)

此外,還需要在JobMessage類的構造方法里面添加一行:

jmsMessageProperties.put(JMSHeaderConstants.JOBID, id);//add for my specific selectors

至此,Oozie端的代碼修改完畢。

這兩個類在oozie-client這個maven工程中

進入到該工程的文件夾下,使用

mvn clean compile

mvn clean package

生成相關的jar文件

將生成的oozie-client-4.1.0.jar文件替換掉原來的Oozie 安裝目錄下的lib包下的對應的jar包。重啟Oozie即可。

另外,Apache Oozie-4.1.0是不支持Spark作業的。而Cloudera-Oozie-4.1.0則是支持Spark作業的。

Apache oozie-client.jar 與Cloudera的 oozie-client.jar對比如下:

?

5月 6, 中午12點09:47.473 FATAL org.apache.oozie.service.Services SERVER[datanode1] Runtime Exception during Services Load. Check your list of 'oozie.services' or 'oozie.services.ext'5月 6, 中午12點09:47.480 FATAL org.apache.oozie.service.Services SERVER[datanode1] E0103: Could not load service classes, resource [spark-action-0.1.xsd] not found org.apache.oozie.service.ServiceException: E0103: Could not load service classes, resource [spark-action-0.1.xsd] not foundat org.apache.oozie.service.Services.loadServices(Services.java:309)at org.apache.oozie.service.Services.init(Services.java:213)at org.apache.oozie.servlet.ServicesLoader.contextInitialized(ServicesLoader.java:46)at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4210)at org.apache.catalina.core.StandardContext.start(StandardContext.java:4709)at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:802)at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:779)at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:583)at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:944)at org.apache.catalina.startup.HostConfig.deployWARs(HostConfig.java:779)at org.apache.catalina.startup.HostConfig.deployApps(HostConfig.java:505)at org.apache.catalina.startup.HostConfig.start(HostConfig.java:1322)at org.apache.catalina.startup.HostConfig.lifecycleEvent(HostConfig.java:325)at org.apache.catalina.util.LifecycleSupport.fireLifecycleEvent(LifecycleSupport.java:142)at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1068)at org.apache.catalina.core.StandardHost.start(StandardHost.java:822)at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1060)at org.apache.catalina.core.StandardEngine.start(StandardEngine.java:463)at org.apache.catalina.core.StandardService.start(StandardService.java:525)at org.apache.catalina.core.StandardServer.start(StandardServer.java:759)at org.apache.catalina.startup.Catalina.start(Catalina.java:595)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.catalina.startup.Bootstrap.start(Bootstrap.java:289)at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:414) Caused by: java.lang.IllegalArgumentException: resource [spark-action-0.1.xsd] not found

?

?

這樣,Oozie發送給JMS消息服務器的消息,在頭部中都會帶一個JobID了,而我們就根據這個JobID屬性進行消息過濾。

可參考Oozie官方文檔中給出的一句話:

JMS messages published are javax.jms.TextMessage . The body contains JSON and the header contains multiple properties that can be used asselectors. The header properties are not repeated in the body of the message to keep the messages small.

由于JobID已經在body里面了,故Oozie并沒有把它放到Header中去。

?

三,使用消息選擇器來過濾消息

由于現在Oozie發送給ActiveMQ的每條JMS消息都會在頭部帶一個JobID,故現在可使用JobID作為消息選擇器過濾消息了。

String selector=JMSHeaderConstants.JOBID + "='" + jobid + "'"; MessageConsumer consumer = session.createConsumer(topic, selector);

至此,就可以實現根據作業ID來接收該JobID所對應的作業的執行結果信息了。

?

總結

以上是生活随笔為你收集整理的Oozie JMS通知消息实现--根据作业ID来过滤消息的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。