日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)

發布時間:2023/12/10 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

BindMessageListener

如下代碼作為理解入口,安卓、IOS、web在連接成功后才會發起所謂登錄,登錄時會攜帶必要信息,例如個人賬號唯一標識uid、設備類型deviceId等字段發起登錄請求,目前重寫的onMessage回調方法中只是對登錄邏輯進行處理,若要實現單聊及群聊需要額外拓展參數msgType(信息參數類型),根據信息類型作出相應的邏輯處理,在登錄邏輯中:
1.Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class); String uid = session.getUid(); 獲取uid,后面根據uid及設備限定類型過濾篩選出此賬號的所有頻道(因為一個賬號可以在不同終端登錄,而同一終端只能一個賬號登錄)
2.channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));意思是將要通知下線的頻道集合中過濾掉本次連接的頻道,不然也會通知剛連接上的頻道
3.Collection<Channel> 相同賬號不同終端的頻道都放在此
4.@Resource private SessionGroup sessionGroup;sessionGroup是存放所有頻道信息的容器,后續單聊、群聊拓展需要從此理解及開發邏輯

package com.farsunset.cim.component.message;import com.farsunset.cim.entity.Session; import com.farsunset.cim.sdk.server.constant.ChannelAttr; import com.farsunset.cim.sdk.server.group.SessionGroup; import com.farsunset.cim.sdk.server.model.Message; import com.farsunset.cim.util.JSONUtils; import io.netty.channel.Channel; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects;/*** 集群環境下,監控多設備登錄情況,控制是否其余終端下線的邏輯*/ @Component public class BindMessageListener implements MessageListener {private static final String FORCE_OFFLINE_ACTION = "999";private static final String SYSTEM_ID = "0";/*一個賬號只能在同一個類型的終端登錄如: 多個android或ios不能同時在線一個android或ios可以和web,桌面同時在線*/private final Map<String,String[]> conflictMap = new HashMap<>();@Resourceprivate SessionGroup sessionGroup;public BindMessageListener(){conflictMap.put(Session.CHANNEL_ANDROID,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});conflictMap.put(Session.CHANNEL_IOS,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});conflictMap.put(Session.CHANNEL_WINDOWS,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});conflictMap.put(Session.CHANNEL_WEB,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});conflictMap.put(Session.CHANNEL_MAC,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});}@Overridepublic void onMessage(org.springframework.data.redis.connection.Message redisMessage, byte[] bytes) {Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class);String uid = session.getUid();String[] conflictChannels = conflictMap.get(session.getChannel());Collection<Channel> channelList = sessionGroup.find(uid,conflictChannels);channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));/** 獲取到其他在線的終端連接,提示賬號再其他終端登錄*/channelList.forEach(channel -> {if (Objects.equals(session.getDeviceId(),channel.attr(ChannelAttr.DEVICE_ID).get())){channel.close();return;}Message message = new Message();message.setAction(FORCE_OFFLINE_ACTION);message.setReceiver(uid);message.setSender(SYSTEM_ID);message.setContent(session.getDeviceName());channel.writeAndFlush(message);channel.close();});} }

netty即時通訊SDK部分源碼

package com.farsunset.cim.sdk.server.group;import com.farsunset.cim.sdk.server.constant.ChannelAttr; import com.farsunset.cim.sdk.server.model.Message; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Predicate; import java.util.stream.Collectors;public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>> {private static final Collection<Channel> EMPTY_LIST = new LinkedList<>();private final transient ChannelFutureListener remover = new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future){future.removeListener(this);remove(future.channel());}};protected String getKey(Channel channel){return channel.attr(ChannelAttr.UID).get();}public void remove(Channel channel){String uid = getKey(channel);if(uid == null){return;}Collection<Channel> collections = getOrDefault(uid,EMPTY_LIST);collections.remove(channel);if (collections.isEmpty()){remove(uid);}}public void add(Channel channel){String uid = getKey(channel);if (uid == null || !channel.isActive()){return;}channel.closeFuture().addListener(remover);Collection<Channel> collections = this.putIfAbsent(uid,new ConcurrentLinkedQueue<>(Collections.singleton(channel)));if (collections != null){collections.add(channel);}if (!channel.isActive()){remove(channel);}}public void write(String key,Message message){find(key).forEach(channel -> channel.writeAndFlush(message));}public void write(String key, Message message, Predicate<Channel> matcher){find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message));}public void write(String key, Message message, Collection<String> excludedSet){find(key).stream().filter(channel -> excludedSet == null || !excludedSet.contains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writeAndFlush(message));}public void write(Message message){this.write(message.getReceiver(),message);}public Collection<Channel> find(String key){return this.getOrDefault(key,EMPTY_LIST);}public Collection<Channel> find(String key,String... channel){List<String> channels = Arrays.asList(channel);return find(key).stream().filter(item -> channels.contains(item.attr(ChannelAttr.CHANNEL).get())).collect(Collectors.toList());} }

安卓發送信息接口

package com.farsunset.cim.http;import retrofit2.Call; import retrofit2.http.Field; import retrofit2.http.FormUrlEncoded; import retrofit2.http.POST;public interface MessageService {@POST("/send")@FormUrlEncodedCall<Void> send(@Field("sender") String sender,@Field("receiver") String receiver,@Field("action") String action,@Field("content") String content); }

總結

以上是生活随笔為你收集整理的CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。