android 短信编解码方式,中移短信cmpp协议/smpp协议 netty实现编解码
性能測試
在48core,128G內存的物理服務器上測試協議解析效率:35K條/s, cpu使用率25%.
Build
執行mvn package . jdk1.6以上.
增加了業務處理API
業務層實現接口:BusinessHandlerInterface,或者繼承AbstractBusinessHandler抽象類實現業務即可。 連接保活,消息重發,消息持久化,連接鑒權都已封裝,不須要業務層再實現。
如何實現自己的Handler,比如按短短信計費
參考 CMPPChargingDemoTest 里的擴展位置
實體類說明
CMPP的連接端口
com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一個Tcp連接的發起端,或者接收端。用來記錄連接的IP.port,以及CMPP協議的用戶名,密碼,業務處理的ChannelHandler集合等其它端口參數。包含三個子類:
com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服務監聽端口,包含一個List屬性。 一個服務端口包含多個CMPPServerChildEndpointEntity端口
com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服務接收端口,包含CMPP連接用戶名,密碼,以及協議版本等信息
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客戶端端口,包含CMPP連接用戶名,密碼,以及協議版本,以及服務端IP.port. 用于連接服務端
端口連接器接口
com.zx.sms.connect.manager.EndpointConnector 負責一個端口的打開,關閉,查看當前連接數,新增連接,移除連接。每個端口的實體類都對應一個EndpointConnector.當CMPP連接建立完成,將連接加入連接器管理,并給pipeLine上掛載業務處理的ChannelHandler.
com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 這個類的open()調用netty的ServerBootstrap.bind()開一個服務監聽
com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用來收集CMPPServerChildEndpointEntity端口下的所有連接。它的open()方法為空.
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 這個類open()調用netty的Bootstrap.connect()開始一個TCP連接
端口管理器
com.zx.sms.connect.manager.EndpointManager 該類是單例模式,管理所有端口,并負責所有端口的打開,關閉,以及端口信息保存,以及連接斷線重連。
CMPP協議的連接登陸管理
com.zx.sms.session.cmpp.SessionLoginManager 這是一個netty的ChannelHandler實現,主要負責CMPP連接的建立。當CMPP連接建立完成后,會調用EndpointConnector.addChannel(channel)方法,把連接加入連接器管理,連接器負責給channel的pipeline上掛載業務處理的Handler,最后觸發 SessionState.Connect事件,通知業務處理Handler連接已建立成功。
CMPP的連接狀態管理器
com.zx.sms.session.cmpp.SessionStateManager 這是一個netty的ChannelHandler實現。負責每個連接上CMPP消息的存儲,短信重發,流量窗口控制,過期短信的處理
CMPP協議解析器
CMPP20MessageCodecAggregator [2.0協議] CMPPMessageCodecAggregator [這是3.0協議] 聚合了CMPP主要消息協議的解析,編碼,長短信拆分,合并處理。
短信持久化存儲實現 StoredMapFactory
使用BDB的StoreMap實現消息持久化,防止系統意外丟失短信。
程序啟動處理流程
程序啟動類 new 一個CMPPEndpointEntity的實體類并設置IP,port,用戶名,密碼,業務處理的Handler等參數,
程序啟動類 調用EndpointManager.addEndpointEntity(endpoint)方法,將端口加入管理器
程序啟動類 調用EndpointManager.openAll()或者EndpointManager.openEndpoint()方法打開端口。
EndpointManager會調用EndpointEntity.buildConnector()創建一個端口連接器,并調用EndpointConnector.open()方法打開端口。
如果是CMPPClientEndpointEntity的話,就會向服務器發起TCP連接請求,如果是CMPPServerEndpointEntity則會在本機開啟一個服務端口等客戶端連接。
TCP連接建立完成后。netty會調用EndpointConnector.initPipeLine()方法初始化PipeLine,把CMPP協議解析器,SessionLoginManager加到PipeLine里去,然后netty觸發ChannelActive事件。
在SessionLoginManager類里,客戶端收到ChannelActive事件后會發送一個CMPPConnnect消息,請求建立CMPP連接.
同樣在SessionLoginManager.channelRead()方法里,服務端會收到CMPPConnnect消息,開始對用戶名,密碼進行鑒權,并給客戶端鑒權結果。
鑒權通過后,SessionLoginManager調用EndpointConnector.addChannel(channel)方法,把channel加入ArrayList,并給pipeLine上掛載SessionStateManager和業務處理的ChannelHandler,如心跳處理,日志記錄,長短信合并拆分處理類。
EndpointConnector.addChannel(channel)完成后,SessionLoginManager調用ctx.fireUserEventTriggered()方法,觸發 SessionState.Connect事件。
以上CMPP連接建立完成。
業務處理類收到SessionState.Connect事件,開始業務處理,如從MQ獲取短信下發,或開啟Consumer接收MQ推送的消息。
SessionStateManager會攔截所有read()和write()的消息,進行消息持久化,消息重發,流量控制。
增加同步調用api
smsgate自開發以來,一直使用netty的異步發送消息,但實際使用場景中同步發送消息的更方便,或者能方便的取到response。因此增加一個同步調用的api。即:發送消息后等接收到對應的響應后才返回。 使用方法如下:
//因為長短信要拆分,因此返回一個promiseList.每個拆分后的短信對應一個promise
List futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
for(Promise future: futures){
//調用sync()方法,阻塞線程。等待接收response
future.sync();
//接收成功,如果失敗可以獲取失敗原因,比如遇到連接突然中斷錯誤等等
if(future.isSuccess()){
//打印收到的response消息
logger.info("response:{}",future.get());
}else{
打印錯誤原因
logger.error("response:{}",future.cause());
}
}
//或者不阻塞進程,不調用sync()方法。
List promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
for(Promise promise: promises){
//接收到response后回調Listener方法
promise.addListener(new GenericFutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
//接收成功,如果失敗可以獲取失敗原因,比如遇到連接突然中斷錯誤等等
if(future.isSuccess()){
//打印收到的response消息
logger.info("response:{}",future.get());
}else{
打印錯誤原因
logger.error("response:{}",future.cause());
}
}
});
}
CMPP Api使用舉例
public class TestCMPPEndPoint {
private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class);
@Test
public void testCMPPEndpoint() throws Exception {
ResourceLeakDetector.setLevel(Level.ADVANCED);
final EndpointManager manager = EndpointManager.INS;
CMPPServerEndpointEntity server = new CMPPServerEndpointEntity();
server.setId("server");
server.setHost("127.0.0.1");
server.setPort(7890);
server.setValid(true);
//使用ssl加密數據流
server.setUseSSL(false);
CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity();
child.setId("child");
child.setChartset(Charset.forName("utf-8"));
child.setGroupName("test");
child.setUserName("901783");
child.setPassword("ICP001");
child.setValid(true);
child.setVersion((short)0x30);
child.setMaxChannels((short)4);
child.setRetryWaitTimeSec((short)30);
child.setMaxRetryCnt((short)3);
child.setReSendFailMsg(true);
//child.setWriteLimit(200);
//child.setReadLimit(200);
List serverhandlers = new ArrayList();
serverhandlers.add(new CMPPMessageReceiveHandler()); //在這個handler里接收短信
child.setBusinessHandlerSet(serverhandlers);
server.addchild(child);
manager.addEndpointEntity(server);
CMPPClientEndpointEntity client = new CMPPClientEndpointEntity();
client.setId("client");
client.setHost("127.0.0.1");
//client.setLocalhost("127.0.0.1");
//client.setLocalport(65521);
client.setPort(7890);
client.setChartset(Charset.forName("utf-8"));
client.setGroupName("test");
client.setUserName("901783");
client.setPassword("ICP001");
client.setMaxChannels((short)10);
client.setVersion((short)0x30);
client.setRetryWaitTimeSec((short)30);
client.setUseSSL(false);
//client.setWriteLimit(100);
client.setReSendFailMsg(true);
client.setSupportLongmsg(SupportLongMessage.BOTH);
List clienthandlers = new ArrayList();
clienthandlers.add( new CMPPSessionConnectedHandler(10000)); //在這個handler里發送短信
client.setBusinessHandlerSet(clienthandlers);
manager.addEndpointEntity(client);
manager.openEndpoint(server);
Thread.sleep(1000);
for(int i=0;i<=child.getMaxChannels()+1;i++)
manager.openEndpoint(client);
System.out.println("start.....");
//Thread.sleep(300000);
LockSupport.park();
EndpointManager.INS.close();
}
}
SMPP Api使用舉例
public class TestSMPPEndPoint {
private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class);
@Test
public void testSMPPEndpoint() throws Exception {
final EndpointManager manager = EndpointManager.INS;
SMPPServerEndpointEntity server = new SMPPServerEndpointEntity();
server.setId("smppserver");
server.setHost("127.0.0.1");
server.setPort(2776);
server.setValid(true);
//使用ssl加密數據流
server.setUseSSL(false);
SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity();
child.setId("smppchild");
child.setSystemId("901782");
child.setPassword("ICP");
child.setValid(true);
child.setChannelType(ChannelType.DUPLEX);
child.setMaxChannels((short)3);
child.setRetryWaitTimeSec((short)30);
child.setMaxRetryCnt((short)3);
child.setReSendFailMsg(true);
child.setIdleTimeSec((short)15);
//child.setWriteLimit(200);
//child.setReadLimit(200);
List serverhandlers = new ArrayList();
serverhandlers.add(new SMPPSessionConnectedHandler(10000));
child.setBusinessHandlerSet(serverhandlers);
server.addchild(child);
SMPPClientEndpointEntity client = new SMPPClientEndpointEntity();
client.setId("smppclient");
client.setHost("127.0.0.1");
client.setPort(2776);
client.setSystemId("901782");
client.setPassword("ICP");
client.setChannelType(ChannelType.DUPLEX);
client.setMaxChannels((short)12);
client.setRetryWaitTimeSec((short)100);
client.setUseSSL(false);
client.setReSendFailMsg(true);
//client.setWriteLimit(200);
//client.setReadLimit(200);
client.setSupportLongmsg(SupportLongMessage.SEND); //接收長短信時不自動合并
List clienthandlers = new ArrayList();
clienthandlers.add( new SMPPMessageReceiveHandler());
client.setBusinessHandlerSet(clienthandlers);
manager.addEndpointEntity(server);
manager.addEndpointEntity(client);
manager.openAll();
manager.startConnectionCheckTask();
Thread.sleep(1000);
for(int i=0;i
manager.openEndpoint(client);
System.out.println("start.....");
LockSupport.park();
EndpointManager.INS.close();
}
}
SGIP Api使用舉例
public class TestSgipEndPoint {
private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class);
@Test
public void testsgipEndpoint() throws Exception {
ResourceLeakDetector.setLevel(Level.ADVANCED);
final EndpointManager manager = EndpointManager.INS;
SgipServerEndpointEntity server = new SgipServerEndpointEntity();
server.setId("sgipserver");
server.setHost("127.0.0.1");
server.setPort(8001);
server.setValid(true);
//使用ssl加密數據流
server.setUseSSL(false);
SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity();
child.setId("sgipchild");
child.setLoginName("333");
child.setLoginPassowrd("0555");
child.setValid(true);
child.setChannelType(ChannelType.DUPLEX);
child.setMaxChannels((short)3);
child.setRetryWaitTimeSec((short)30);
child.setMaxRetryCnt((short)3);
child.setReSendFailMsg(false);
child.setIdleTimeSec((short)30);
//child.setWriteLimit(200);
//child.setReadLimit(200);
child.setSupportLongmsg(SupportLongMessage.SEND); //接收長短信時不自動合并
List serverhandlers = new ArrayList();
serverhandlers.add(new SgipReportRequestMessageHandler());
serverhandlers.add(new SGIPMessageReceiveHandler());
child.setBusinessHandlerSet(serverhandlers);
server.addchild(child);
manager.addEndpointEntity(server);
SgipClientEndpointEntity client = new SgipClientEndpointEntity();
client.setId("sgipclient");
client.setHost("127.0.0.1");
client.setPort(8001);
client.setLoginName("333");
client.setLoginPassowrd("0555");
client.setChannelType(ChannelType.DUPLEX);
client.setMaxChannels((short)10);
client.setRetryWaitTimeSec((short)100);
client.setUseSSL(false);
client.setReSendFailMsg(true);
//client.setWriteLimit(200);
//client.setReadLimit(200);
List clienthandlers = new ArrayList();
clienthandlers.add(new SGIPSessionConnectedHandler(10000));
client.setBusinessHandlerSet(clienthandlers);
manager.addEndpointEntity(client);
manager.openAll();
Thread.sleep(1000);
for(int i=0;i
manager.openEndpoint(client);
System.out.println("start.....");
LockSupport.park();
EndpointManager.INS.close();
}
}
Demo 執行日志
11:31:52.842 [workGroup2] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df
11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be
11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58
11:31:52.869 [workGroup1] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776]
11:31:52.867 [workGroup2] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481]
11:31:53.863 [busiWork-3] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343, speed : 343/s
11:31:54.872 [busiWork-1] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381, speed : 1038/s
11:31:55.873 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704, speed : 1323/s
11:31:56.875 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010, speed : 1306/s
11:31:57.880 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416, speed : 1406/s
11:31:58.881 [busiWork-7] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442, speed : 2026/s
11:31:59.882 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581, speed : 2139/s
11:32:00.883 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865, speed : 3284/s
11:32:01.884 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937, speed : 3072/s
11:32:02.886 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489, speed : 3552/s
11:32:03.887 [busiWork-6] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065, speed : 3576/s
11:32:04.888 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337, speed : 3272/s
總結
以上是生活随笔為你收集整理的android 短信编解码方式,中移短信cmpp协议/smpp协议 netty实现编解码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BUUCTF-WEB:[HCTF 201
- 下一篇: 将某内存单元数据做乘法 + 内存间数据的