java调用lingo_使用Lingo增强JMS
雖然activemq+jencks的jms輕量級解決方案已經很好地在psa中work了,尤其spring的JmsTemplate使得代碼更簡單,但是還是存在問題。
問題來自暑期做psa的時候,linke突然提出要求,需要MDP返回些處理信息,比如處理結果、異常,以便后臺監控和前臺顯示,但是,MDP沒有返回值也沒法返回異常,當時我只能無奈。
Lingo解決了這個問題,它擴展了JMS,允許異步的函數調用。
在下載了lingo-1.0-M1后(雖然1.2.1發布了,但是還沒有文檔支持,所以暫且用1.0),參考其自帶的example,了解了它異步函數調用的代碼思路。
客戶端擁有服務端的方法接口,客戶端將callback和相關參數代入接口,進行異步調用,而服務端的接口實現中利用callback來返回必要的信息。
callback實現了EventListener,提供了返回值和異常的接口,另外涉及到兩個方面,首先,callback本身需要輪詢,其次,callback可以由實例池管理。
第一個方面主要參考了lingo的example,使用semaphore來進行輪詢。
第二個方面并沒有利用實例池,而是利用ThreadPoolExecutor來newFixedThreadPool,管理不同的異步調用線程,來完成對callback的調度。
配置部分:
encoding="UTF-8"?>
/p>
BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
class="org.activemq.spring.BrokerFactoryBean">
value="classpath:activemq.xml"
/>
class="org.activemq.ActiveMQConnectionFactory">
name="brokerURL">
tcp://localhost:61616
class="org.activemq.message.ActiveMQQueue">
index="0">
lingo.demo
id="invocationFactory"
class="org.logicblaze.lingo.LingoRemoteInvocationFactory">
class="org.logicblaze.lingo.SimpleMetadataStrategy">
value="true" />
class="org.logicblaze.lingo.jms.JmsProxyFactoryBean">
value="org.openepo.jms.lingo.MailService"
/>
ref="jmsFactory" />
ref="destination" />
name="remoteInvocationFactory"
ref="invocationFactory" />
class="org.logicblaze.lingo.jms.JmsServiceExporter">
ref="serverImpl" />
value="org.openepo.jms.lingo.MailService"
/>
ref="jmsFactory" />
ref="destination" />
ref="invocationFactory" />
class="org.openepo.jms.lingo.MailServiceImpl"
/>
class="org.openepo.jms.lingo.AsyncManager"
singleton="false">
/>
/>
ResultListener和ResultListenerImpl:callback接口及實現。
ResultListener.java:
package org.openepo.jms.lingo;
import java.util.EventListener;
public interface ResultListener extends EventListener {
public void onResult(Object result);
// lifecycle end methods
public void stop();
public void onException(Exception e);
}
ResultListenerImpl.java:
package org.openepo.jms.lingo;
import java.util.ArrayList;
import java.util.List;
public class ResultListenerImpl implements ResultListener
{
private List results = new ArrayList();
private Object semaphore = new Object();
private boolean stopped;
private Exception onException;
private long waitTime = 1000;
public synchronized void onResult(Object result)
{
results.add(result);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
// lifecycle end methods
public void stop() {
stopped = true;
}
public void onException(Exception e)
{
onException = e;
}
public Exception getOnException()
{
return
onException;
}
public List getResults()
{
return results;
}
public boolean isStopped()
{
return stopped;
}
public void waitForAsyncResponses(int
messageCount) {
System.out.println("Waiting for: " + messageCount +
" responses to arrive");
long start =
System.currentTimeMillis();
for (int i = 0; i
< 10; i++) {
try {
if (hasReceivedResponses(messageCount))
{
break;
}
synchronized (semaphore) {
semaphore.wait(waitTime);
}
}
catch (InterruptedException e) {
System.out.println("Caught: " + e);
}
}
long end =
System.currentTimeMillis() - start;
System.out.println("End of wait for " + end + " millis");
}
protected boolean hasReceivedResponse()
{
return
results.isEmpty();
}
protected synchronized boolean
hasReceivedResponses(int messageCount) {
return results.size()
>= messageCount;
}
public long getWaitTime()
{
return waitTime;
}
public void setWaitTime(long waitTime)
{
this.waitTime =
waitTime;
}
}
MailService和MailServiceImpl:服務代碼。
MailService.java:
package org.openepo.jms.lingo;
import java.util.List;
public interface MailService {
public void
asyncSendMail(List mails,
ResultListener listener);
}
MailServiceImpl.java:
package org.openepo.jms.lingo;
import java.util.List;
public class MailServiceImpl implements MailService
{
public void
asyncSendMail(List mails,
ResultListener listener) {
try {
for (Mail mail : mails) {
sendMail(mail);
Thread.sleep(2000);// 服務端時耗
listener.onResult(mail.getContent() +
" Sended Successfully.");
}
listener.stop();
} catch
(Exception e) {
listener.onException(e);
}
}
public void sendMail(Mail mail) throws
Exception {
// 可以取消下面的注釋來查看服務端將異常傳給客戶端
//throw new Exception("Error occurs
on server side.");
}
}
在服務端方法中,可以利用callback將處理結果,是否結束和異常信息返回客戶端.
Mail.java:
package org.openepo.jms.lingo;
import java.io.Serializable;
public class Mail implements Serializable
{
private static final long serialVersionUID = 1L;
private String content;
public String getContent() {
return content;
}
public void setContent(String content)
{
this.content = content;
}
public Mail(String content) {
this.content = content;
}
}
AsyncManager:各類異步調用的方法可以集中在這個類中,利用線程池來統一控制callback實例。
AsyncManager.java:
package org.openepo.jms.lingo;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class AsyncManager {
static private int threadSize =
10;???//callback池大小
static private ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors
.newFixedThreadPool(threadSize);?//callback池
public void setThreadSize(int threadSize)
{
AsyncManager.threadSize = threadSize;
}
private MailService mailClient;
public void setMailClient(MailService mailClient)
{
this.mailClient = mailClient;
}
public AsyncManager() {
}
public void sendMails(final
List mails)
{
// callback對象
final ResultListenerImpl callBack = new
ResultListenerImpl();
callBack.setWaitTime(2000);
// 異步調用
mailClient.asyncSendMail(mails, callBack);
// 調用線程池中的callback
executor.execute(new Runnable()
{
public void run() {
// callBack 阻塞等待n個消息
callBack.waitForAsyncResponses(mails.size());
if (callBack.getOnException() != null) {
// 服務端異常
System.out.println("Server
Exception: "
+
callBack.getOnException().getMessage());
} else
{
// 得到服務端處理結果,打印結果
for (Object result :
callBack.getResults())
{
System.out.println("Result:
" + result);
}
}
}
});
}
}
上面匿名類的run方法中,在callback的waitForAsyncResponses方法結束后,可以檢查callback中的信息,進行異常處理等。
下面是測試用例:
@Test
public void test() {
List mails = new
ArrayList();
mails.add(new Mail("mail1"));
mails.add(new Mail("mail2"));
// 計算時間
long startTime = System.currentTimeMillis();
try {
// 異步調用
asyncManager.sendMails(mails);
// 沒有阻塞
System.out.println("Cost time "
+ (System.currentTimeMillis() -
startTime) + "ms");
//為查看結果,sleep主線程
Thread.sleep(10000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
使用Lingo對JMS增強后,通過callback,使得異步調用的確比較OO了,但是更重要的是服務端的信息可以通過callback返回給客戶端,客戶端可以相應地進行處理。
多出了許多代碼,自然復雜度有所增加,但是lingo-1.2.1后,增加了annotation,減少了callback的代碼量。
總結
以上是生活随笔為你收集整理的java调用lingo_使用Lingo增强JMS的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python 底层原理processpo
- 下一篇: 第九大陆服务器未找到文件,第九大陆服务器