CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)
BindMessageListener
如下代碼作為理解入口,安卓、IOS、web在連接成功后才會發起所謂登錄,登錄時會攜帶必要信息,例如個人賬號唯一標識uid、設備類型deviceId等字段發起登錄請求,目前重寫的onMessage回調方法中只是對登錄邏輯進行處理,若要實現單聊及群聊需要額外拓展參數msgType(信息參數類型),根據信息類型作出相應的邏輯處理,在登錄邏輯中:
1.Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class); String uid = session.getUid(); 獲取uid,后面根據uid及設備限定類型過濾篩選出此賬號的所有頻道(因為一個賬號可以在不同終端登錄,而同一終端只能一個賬號登錄)
2.channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));意思是將要通知下線的頻道集合中過濾掉本次連接的頻道,不然也會通知剛連接上的頻道
3.Collection<Channel> 相同賬號不同終端的頻道都放在此
4.@Resource private SessionGroup sessionGroup;sessionGroup是存放所有頻道信息的容器,后續單聊、群聊拓展需要從此理解及開發邏輯
netty即時通訊SDK部分源碼
package com.farsunset.cim.sdk.server.group;import com.farsunset.cim.sdk.server.constant.ChannelAttr; import com.farsunset.cim.sdk.server.model.Message; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Predicate; import java.util.stream.Collectors;public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>> {private static final Collection<Channel> EMPTY_LIST = new LinkedList<>();private final transient ChannelFutureListener remover = new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future){future.removeListener(this);remove(future.channel());}};protected String getKey(Channel channel){return channel.attr(ChannelAttr.UID).get();}public void remove(Channel channel){String uid = getKey(channel);if(uid == null){return;}Collection<Channel> collections = getOrDefault(uid,EMPTY_LIST);collections.remove(channel);if (collections.isEmpty()){remove(uid);}}public void add(Channel channel){String uid = getKey(channel);if (uid == null || !channel.isActive()){return;}channel.closeFuture().addListener(remover);Collection<Channel> collections = this.putIfAbsent(uid,new ConcurrentLinkedQueue<>(Collections.singleton(channel)));if (collections != null){collections.add(channel);}if (!channel.isActive()){remove(channel);}}public void write(String key,Message message){find(key).forEach(channel -> channel.writeAndFlush(message));}public void write(String key, Message message, Predicate<Channel> matcher){find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message));}public void write(String key, Message message, Collection<String> excludedSet){find(key).stream().filter(channel -> excludedSet == null || !excludedSet.contains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writeAndFlush(message));}public void write(Message message){this.write(message.getReceiver(),message);}public Collection<Channel> find(String key){return this.getOrDefault(key,EMPTY_LIST);}public Collection<Channel> find(String key,String... channel){List<String> channels = Arrays.asList(channel);return find(key).stream().filter(item -> channels.contains(item.attr(ChannelAttr.CHANNEL).get())).collect(Collectors.toList());} }安卓發送信息接口
package com.farsunset.cim.http;import retrofit2.Call; import retrofit2.http.Field; import retrofit2.http.FormUrlEncoded; import retrofit2.http.POST;public interface MessageService {@POST("/send")@FormUrlEncodedCall<Void> send(@Field("sender") String sender,@Field("receiver") String receiver,@Field("action") String action,@Field("content") String content); }總結
以上是生活随笔為你收集整理的CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机组装与维护配置清单作业,计算机组装
- 下一篇: subject.login(token)