日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

OpenFire源码学习之二十五:消息回执与离线消息(下)

發布時間:2025/4/9 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 OpenFire源码学习之二十五:消息回执与离线消息(下) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這一篇緊接著上面繼續了。

方案二

基于redis的消息回執。主要流程分為下面幾個步驟:

1)將消息暫存儲與redis中,設置好消息的過期時間

2)客戶端回執消息id來消滅暫存的消息

3)開通單獨線程論壇在第1)步中的消息。根據消息的時間重新發送消息。如果消息第一次存放的時間大雨有效期(自定義10秒),解析消息中的to查找用戶是否還在線。如果在則T掉(因為它長時間不理會服務的重要命令),如果不在線則將消息放置離線表。

?

OK,先來看看消息的存儲格式吧。

1.MESSAGE消息 用戶集合

?SADD? SOGU:[username]? [VALUE(messageID)] [VALUE(messageID)] ...

2.已讀消息設備集合

?SADD? RT:[terminalid]? [VALUE(messageID)] [VALUE(messageID)] ...

3.消息內容

?HMSET? OGM:[messageID]? CREATIONDATE [VALUE]? UPDATEDATE [VALUE] STANZA [VALUE]

4.用戶、設備關聯

?SADD URT:[USERNAME]? [VALUE(terminalid)] .......

(先根據消息id查找時間,在java中排序后 查找stanza)

MESSAGE

--離線表

ZADD?OFOFFLINE:[username]? [INDEX(時間戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]...... ????? ?????? [VALUE]

HMSET?OFOFFLINE:[messageID] STANZA[VALUE]

????????CREATIONDATE [VALUE]? MESSAGESIZ[VALUE]

?

將消息暫時消息存儲:

public void storeMessage(String username, Packet packet) {Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();String packetID = "";if (packet instanceof Message) packetID = ((Message)packet).getID();else if (packet instanceof IQ) packetID = ((IQ)packet).getID();else return;try {jedis.sadd("SOGU:" + username, packetID);Map<String, String> hash = new HashMap<String, String>();hash.put("STANZA", packet.toXML());hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));jedis.hmset("OGM:" + packetID, hash);} finally {XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);}htp.execute(addMessagesToDB(packet));}private Runnable addMessagesToDB(final Packet packet) {return new Runnable() {@Overridepublic void run() {MyDBopt.insertMessage(packet);}

客戶端收到消息來回執服務端的操作

private void handle(IQ packet) {JID recipientJID = packet.getTo();if (IQ.Type.crs != packet.getType()) {// Check if the packet was sent to the server hostnameif (recipientJID != null && recipientJID.getNode() == null &&recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {Element childElement = packet.getChildElement();if (childElement != null && childElement.element("addresses") != null) {// to route this packetmulticastRouter.route(packet);return;}}}if (IQ.Type.crs == packet.getType()) {String username = packet.getFrom().getNode();String terminal = packet.getFrom().getTerminal();String msgId = packet.getID();if (username == null || msgId == null || "".equals(msgId)) {return ;}if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();try {jedis.sadd("URT:" + username, terminal);jedis.sadd("RT:" + terminal, packet.getID());} finally {XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);}threadPool.execute(createTask(msgId, username, terminal));return;}if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {// The server got an answer to an IQ packet that was sent from the serverIQResultListener iqResultListener = resultListeners.remove(packet.getID());if (iqResultListener != null) {resultTimeout.remove(packet.getID());if (iqResultListener != null) {try {iqResultListener.receivedAnswer(packet);}catch (Exception e) {Log.error("Error processing answer of remote entity. Answer: "+ packet.toXML(), e);}return;}}}try {// Check for registered components, services or remote serversif (recipientJID != null &&(routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {// A component/service/remote server was found that can handle the PacketroutingTable.routePacket(recipientJID, packet, false);return;}if (isLocalServer(recipientJID)) {// Let the server handle the PacketElement childElement = packet.getChildElement();String namespace = null;if (childElement != null) {namespace = childElement.getNamespaceURI();}if (namespace == null) {if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {// Do nothing. We can't handle queries outside of a valid namespaceLog.warn("Unknown packet " + packet.toXML());}}else {// Check if communication to local users is allowedif (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {PrivacyList list =PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());if (list != null && list.shouldBlockPacket(packet)) {// Communication is blockedif (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {// Answer that the service is unavailablesendErrorPacket(packet, PacketError.Condition.service_unavailable);}return;}}IQHandler handler = getHandler(namespace);if (handler == null) {if (recipientJID == null) {// Answer an error since the server can't handle the requested namespacesendErrorPacket(packet, PacketError.Condition.service_unavailable);}else if (recipientJID.getNode() == null ||"".equals(recipientJID.getNode())) {// Answer an error if JID is of the form <domain>sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);}else {// JID is of the form <node@domain>// Answer an error since the server can't handle packets sent to a nodesendErrorPacket(packet, PacketError.Condition.service_unavailable);}}else {handler.process(packet);}}}else {// JID is of the form <node@domain/resource> or belongs to a remote server// or to an uninstalled componentroutingTable.routePacket(recipientJID, packet, false);}}catch (Exception e) {......}}

離線消息

離線消息的優化。

同樣可以拓展XMPP。比如

客戶端獲取離線消息,可以這么通訊。

1)先向服務器詢問,我總的離線消息的基本狀況(有多大,有多少條)

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#bif"/> </iq>

2)服務端返回

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#bifs"><size>1024b</><count>128</><idset>1001,1002...</></query> </iq>

3)客戶端發送分批獲取命令,一次給我發10條發完為止。

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#start"/><pagesize>10</> </iq>

4)服務端開始發送消息

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">...... </iq> <iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">...... </iq> .....

5)告訴客戶端我都發完了

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#end"/> </iq>

6)客戶端本地校驗,回執已經接收到的消息

<message id="BfI3V-47" to="8ntmorv1ep4wgcy" from="test1@8ntmorv1ep4wgcy/Spark 2.6.3" type="crs"/>

這里本人只是做了一個簡單的示意想法。如果需要更加精準的不妨在仔細想想消息處理與格式。

?

離線消息存儲。

將消息存儲到redis中:

public void addMessageToRedis(Message message) {if (message == null) {return;}JID recipient = message.getTo();String username = recipient.getNode();// If the username is null (such as when an anonymous user), don't store.if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {return;}elseif (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {// Do not store messages sent to users of remote serversreturn;}String msgXML = message.getElement().asXML();Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis();try {String newDate = StringUtils.dateToMillis(new java.util.Date());String id = MessageIdTactics.mid(username);jedis.zadd("OFOFFLINE:" + username, Long.valueOf(newDate), id Map<String, String> hash = new HashMap<String, String>();hash.put("STANZA", msgXML);hash.put("MESSAGESIZ", String.valueOf(msgXML.length()));hash.put("CREATIONDATE", newDate);jedis.hmset("OFOFFLINE:" + id, hash);} finally {XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);}if (sizeCache.containsKey(username)) {int size = sizeCache.get(username);size += msgXML.length();sizeCache.put(username, size);}htp.execute(addMessageToDB(message));}

Redis優化這塊就到這啦。主要要做的就是:

第一:存儲用戶或者MUC、Group等這些都需要設置消息存儲的生命周期。當用戶不處于活躍狀態或者長時間不登陸的。要從redis中提出。免得浪費資源。當用戶重新加載的時候再將他放置redis中

第二:將需要回執消息和離線消息分開。需要回執的消息需要設置他的生命周期。離線表最好做個定時器。輪詢消息。將超時出現范圍內的消息(比如周期為一周)的消息同步至關系表中。這里的離線消息需要將用戶的設備分開來。

這里要考慮不同的設備終端等很多不同場景,問題會比較繞口。歡迎大家和我郵件交流。

轉載于:https://www.cnblogs.com/huwf/p/4273343.html

總結

以上是生活随笔為你收集整理的OpenFire源码学习之二十五:消息回执与离线消息(下)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。