日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java调用lingo_使用Lingo增强JMS

發布時間:2023/12/10 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java调用lingo_使用Lingo增强JMS 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

雖然activemq+jencks的jms輕量級解決方案已經很好地在psa中work了,尤其spring的JmsTemplate使得代碼更簡單,但是還是存在問題。

問題來自暑期做psa的時候,linke突然提出要求,需要MDP返回些處理信息,比如處理結果、異常,以便后臺監控和前臺顯示,但是,MDP沒有返回值也沒法返回異常,當時我只能無奈。

Lingo解決了這個問題,它擴展了JMS,允許異步的函數調用。

在下載了lingo-1.0-M1后(雖然1.2.1發布了,但是還沒有文檔支持,所以暫且用1.0),參考其自帶的example,了解了它異步函數調用的代碼思路。

客戶端擁有服務端的方法接口,客戶端將callback和相關參數代入接口,進行異步調用,而服務端的接口實現中利用callback來返回必要的信息。

callback實現了EventListener,提供了返回值和異常的接口,另外涉及到兩個方面,首先,callback本身需要輪詢,其次,callback可以由實例池管理。

第一個方面主要參考了lingo的example,使用semaphore來進行輪詢。

第二個方面并沒有利用實例池,而是利用ThreadPoolExecutor來newFixedThreadPool,管理不同的異步調用線程,來完成對callback的調度。

配置部分:

encoding="UTF-8"?>

/p>

BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">

class="org.activemq.spring.BrokerFactoryBean">

value="classpath:activemq.xml"

/>

class="org.activemq.ActiveMQConnectionFactory">

name="brokerURL">

tcp://localhost:61616

class="org.activemq.message.ActiveMQQueue">

index="0">

lingo.demo

id="invocationFactory"

class="org.logicblaze.lingo.LingoRemoteInvocationFactory">

class="org.logicblaze.lingo.SimpleMetadataStrategy">

value="true" />

class="org.logicblaze.lingo.jms.JmsProxyFactoryBean">

value="org.openepo.jms.lingo.MailService"

/>

ref="jmsFactory" />

ref="destination" />

name="remoteInvocationFactory"

ref="invocationFactory" />

class="org.logicblaze.lingo.jms.JmsServiceExporter">

ref="serverImpl" />

value="org.openepo.jms.lingo.MailService"

/>

ref="jmsFactory" />

ref="destination" />

ref="invocationFactory" />

class="org.openepo.jms.lingo.MailServiceImpl"

/>

class="org.openepo.jms.lingo.AsyncManager"

singleton="false">

/>

/>

ResultListener和ResultListenerImpl:callback接口及實現。

ResultListener.java:

package org.openepo.jms.lingo;

import java.util.EventListener;

public interface ResultListener extends EventListener {

public void onResult(Object result);

// lifecycle end methods

public void stop();

public void onException(Exception e);

}

ResultListenerImpl.java:

package org.openepo.jms.lingo;

import java.util.ArrayList;

import java.util.List;

public class ResultListenerImpl implements ResultListener

{

private List results = new ArrayList();

private Object semaphore = new Object();

private boolean stopped;

private Exception onException;

private long waitTime = 1000;

public synchronized void onResult(Object result)

{

results.add(result);

synchronized (semaphore) {

semaphore.notifyAll();

}

}

// lifecycle end methods

public void stop() {

stopped = true;

}

public void onException(Exception e)

{

onException = e;

}

public Exception getOnException()

{

return

onException;

}

public List getResults()

{

return results;

}

public boolean isStopped()

{

return stopped;

}

public void waitForAsyncResponses(int

messageCount) {

System.out.println("Waiting for: " + messageCount +

" responses to arrive");

long start =

System.currentTimeMillis();

for (int i = 0; i

< 10; i++) {

try {

if (hasReceivedResponses(messageCount))

{

break;

}

synchronized (semaphore) {

semaphore.wait(waitTime);

}

}

catch (InterruptedException e) {

System.out.println("Caught: " + e);

}

}

long end =

System.currentTimeMillis() - start;

System.out.println("End of wait for " + end + " millis");

}

protected boolean hasReceivedResponse()

{

return

results.isEmpty();

}

protected synchronized boolean

hasReceivedResponses(int messageCount) {

return results.size()

>= messageCount;

}

public long getWaitTime()

{

return waitTime;

}

public void setWaitTime(long waitTime)

{

this.waitTime =

waitTime;

}

}

MailService和MailServiceImpl:服務代碼。

MailService.java:

package org.openepo.jms.lingo;

import java.util.List;

public interface MailService {

public void

asyncSendMail(List mails,

ResultListener listener);

}

MailServiceImpl.java:

package org.openepo.jms.lingo;

import java.util.List;

public class MailServiceImpl implements MailService

{

public void

asyncSendMail(List mails,

ResultListener listener) {

try {

for (Mail mail : mails) {

sendMail(mail);

Thread.sleep(2000);// 服務端時耗

listener.onResult(mail.getContent() +

" Sended Successfully.");

}

listener.stop();

} catch

(Exception e) {

listener.onException(e);

}

}

public void sendMail(Mail mail) throws

Exception {

// 可以取消下面的注釋來查看服務端將異常傳給客戶端

//throw new Exception("Error occurs

on server side.");

}

}

在服務端方法中,可以利用callback將處理結果,是否結束和異常信息返回客戶端.

Mail.java:

package org.openepo.jms.lingo;

import java.io.Serializable;

public class Mail implements Serializable

{

private static final long serialVersionUID = 1L;

private String content;

public String getContent() {

return content;

}

public void setContent(String content)

{

this.content = content;

}

public Mail(String content) {

this.content = content;

}

}

AsyncManager:各類異步調用的方法可以集中在這個類中,利用線程池來統一控制callback實例。

AsyncManager.java:

package org.openepo.jms.lingo;

import java.util.List;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadPoolExecutor;

public class AsyncManager {

static private int threadSize =

10;???//callback池大小

static private ThreadPoolExecutor executor =

(ThreadPoolExecutor) Executors

.newFixedThreadPool(threadSize);?//callback池

public void setThreadSize(int threadSize)

{

AsyncManager.threadSize = threadSize;

}

private MailService mailClient;

public void setMailClient(MailService mailClient)

{

this.mailClient = mailClient;

}

public AsyncManager() {

}

public void sendMails(final

List mails)

{

// callback對象

final ResultListenerImpl callBack = new

ResultListenerImpl();

callBack.setWaitTime(2000);

// 異步調用

mailClient.asyncSendMail(mails, callBack);

// 調用線程池中的callback

executor.execute(new Runnable()

{

public void run() {

// callBack 阻塞等待n個消息

callBack.waitForAsyncResponses(mails.size());

if (callBack.getOnException() != null) {

// 服務端異常

System.out.println("Server

Exception: "

+

callBack.getOnException().getMessage());

} else

{

// 得到服務端處理結果,打印結果

for (Object result :

callBack.getResults())

{

System.out.println("Result:

" + result);

}

}

}

});

}

}

上面匿名類的run方法中,在callback的waitForAsyncResponses方法結束后,可以檢查callback中的信息,進行異常處理等。

下面是測試用例:

@Test

public void test() {

List mails = new

ArrayList();

mails.add(new Mail("mail1"));

mails.add(new Mail("mail2"));

// 計算時間

long startTime = System.currentTimeMillis();

try {

// 異步調用

asyncManager.sendMails(mails);

// 沒有阻塞

System.out.println("Cost time "

+ (System.currentTimeMillis() -

startTime) + "ms");

//為查看結果,sleep主線程

Thread.sleep(10000);

} catch (InterruptedException e)

{

e.printStackTrace();

}

}

使用Lingo對JMS增強后,通過callback,使得異步調用的確比較OO了,但是更重要的是服務端的信息可以通過callback返回給客戶端,客戶端可以相應地進行處理。

多出了許多代碼,自然復雜度有所增加,但是lingo-1.2.1后,增加了annotation,減少了callback的代碼量。

總結

以上是生活随笔為你收集整理的java调用lingo_使用Lingo增强JMS的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。