SOFA 源码分析 — 连接管理器
前言
RPC 框架需要維護客戶端和服務端的連接,通常是一個客戶端對應多個服務端,而客戶端看到的是接口,并不是服務端的地址,服務端地址對于客戶端來講是透明的。
那么,如何實現這樣一個 RPC 框架的網絡連接呢?
我們從 SOFA 中尋找答案。
連接管理器介紹
先從一個小 demo 開始看:
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>().setInterfaceId(HelloService.class.getName()) // 指定接口.setProtocol("bolt") // 指定協議.setDirectUrl("bolt://127.0.0.1:9696"); // 指定直連地址HelloService helloService = consumerConfig.refer();while (true) {System.out.println(helloService.sayHello("world"));try {Thread.sleep(2000);} catch (Exception e) {} }上面的代碼中,一個 ConsumerConfig 對應一個接口服務,并指定了直連地址。
然后調用 ref 方法。每個 ConsumerConfig 綁定了一個 ConsumerBootstrap,這是一個非單例的類。
而每個 ConsumerBootstrap 又綁定了一個 Cluster,這是真正的客戶端。該類包含了一個客戶端所有的關鍵信息,例如:
這 5 個實例是 Cluster 的核心。一個客戶端的正常使用絕對離不開這 5 個元素。
我們之前分析了 5 個中的 4 個,今天分析最后一個 —— 連接管理器。
他可以說是 RPC 網絡通信的核心。
地址管理器代表的是:一個客戶端可以擁有多個接口。
連接管理器代表的是:一個客戶端可以擁有多個 TCP 連接。
很明顯,地址管理器的數據肯定比連接管理器要多。因為通常一個 TCP 連接(Server 端)可以含有多個接口。
那么 SOFA 是如何實現連接管理器的呢?
從 AbstractCluster 的 init 方法中,我們知道,該方法初始化了 Cluster。同時也初始化了 connectionHolder。
具體代碼如下:
// 連接管理器 connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);使用了 SPI 的方式進行的初始化。目前 RPC 框架的具體實現類只有一個 AllConnectConnectionHolder。即長連接管理器。
該類需要一個 ConsumerConfig 才能初始化。
該類中包含很多和連接相關的屬性,有 4 個 Map,未初始化的 Map,存活的節點列表,存活但亞健康的列表,失敗待重試的列表。這些 Map 的元素都會隨著服務的網絡變化而變化。
而這些 Map 中的元素則是:ConcurrentHashMap<ProviderInfo, ClientTransport> 。
即每個服務者的信息對應一個客戶端傳輸。那么這個 ClientTransport 是什么呢?看過之前文章的都知道,這個一個 RPC 和 Bolt 的膠水類。該類的默認實現 BoltClientTransport 包含了一個 RpcClient 屬性,注意,該屬性是個靜態的。也就是說,是所有實例公用的。并且,BoltClientTransport 包含一個 ProviderInfo 屬性。還有一個 Url 屬性,Connection 屬性(網絡連接)。
我們理一下:一個 ConsumerConfig 綁定一個 Cluster,一個 Cluster 綁定一個 connectionHolder,一個 connectionHolder 綁定多個 ProviderInfo 和 ClientTransport。
因為一個客戶端可以和多個服務進行通信。
代碼如何實現?
在 Cluster 中,會對 connectionHolder 進行初始化,在 Cluster 從注冊中心得到服務端列表后,會建立長連接。
從這里開始,地址管理器開始運作。
Cluster 的 updateAllProviders 方法是源頭。該方法會將服務列表添加到 connectionHolder 中。即調用 connectionHolder.updateAllProviders(providerGroups) 方法。該方法會全量更新服務端列表。
如果更新的時候,發現有新的服務,便會建立長連接。具體代碼如下:
if (!needAdd.isEmpty()) {addNode(needAdd); }addNode 方法就是添加新的節點。該方法會多線程建立 TCP 連接。
首先會根據 ProviderInfo 信息創建一個 ClientTransport,然后向線程池提交一個任務,任務內容是 initClientTransport(),即初始化客戶端傳輸。
該方法代碼如下(精簡過了):
private void initClientTransport(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {transport.connect();if (doubleCheck(interfaceId, providerInfo, transport)) {printSuccess(interfaceId, providerInfo, transport);addAlive(providerInfo, transport);} else {printFailure(interfaceId, providerInfo, transport);addRetry(providerInfo, transport);} }其中關鍵是調用 transport 的 connect 方法建立連接。
該方法的默認實現在 BoltClientTransport 中,符合我們的預期。我們知道, BoltClientTransport 有一個 RpcClient 的靜態實例。這個實例在類加載的時候,就會在靜態塊中初始化。初始化內容則是初始化他的一些屬性,例如地址解析器,連接管理器,連接監控等等。
我們再看 BoltClientTransport 的 connect 方法,該方法主要邏輯是初始化連接。方式則是通過 RpcClient 的 getConnection 方法來獲取,具體代碼如下:
connection = RPC_CLIENT.getConnection(url, url.getConnectTimeout());傳入一個 URL 和超時時間。 RpcClient 則是調用連接管理器的 getAndCreateIfAbsent 方法獲取,同樣傳入 Url,這個方法的名字很好,根據 URL 獲取連接,如果沒有,就創建一個。
有必要看看具體代碼:
public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {// get and create a connection pool with initialized connections.ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),new ConnectionPoolCall(url));if (null != pool) {return pool.get();} else {logger.error("[NOTIFYME] bug detected! pool here must not be null!");return null;} }該方法會繼續調用自身的 getConnectionPoolAndCreateIfAbsent 方法,傳入 URL 的唯一標識,和一個 ConnectionPoolCall 對象(實現了 Callable)。
然后阻塞等待返回連接。
我們看看這個 ConnectionPoolCall 的 call 方法實現。該方法調用了連接管理器的 doCreate 方法。傳入了 URL 和一個連接池。然后 call 方法返回連接池。
doCreate 方法中,重點就是 create 方法,傳入了一個 url,返回一個 Connection,并放入連接池。默認池中只有一個長連接。
而 create 方法則是調用連接工廠的 createConnection 方法。然后調用 doCreateConnection 方法。該方法內部給了我們明確的答案:調用 Netty 的 Bootstrap 的 connect 方法。
代碼如下:
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));熟悉 Netty 的同學一眼便看出來了。這是一個連接服務端的操作。而這個 BootStrap 的初始化則是在 RpcClient 初始化的時候進行的。注意:BootStrap 是可以共享的。
可以看到, ConnectionPoolCall 的 call 方法就是用來創建 Netty 連接的。回到 getAndCreateIfAbsent 方法里,繼續看 getConnectionPoolAndCreateIfAbsent 方法的實現。
該方法內部將 Callable 包裝成一個 FutureTask,目的應該是為了以后的異步運行吧,總之,最后還是同步調用了 run 方法。然后調用 get 方法阻塞等待,等待剛剛 call 方法返回的連接池。然后返回。
得到連接池,連接池調用 get 方法,從池中根據策略選取一個連接返回。目前只有一個隨機選取的策略。
這個 Connection 連接實例會保存在 BoltClientTransport 中。
在客戶端進行調用的時候, RpcClient 會根據 URL 找到對應的連接,然后,獲取這個連接對應的 Channel ,向服務端發送數據。具體代碼如下:
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (!f.isSuccess()) {conn.removeInvokeFuture(request.getId());future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), f.cause()));logger.error("Invoke send failed, id={}", request.getId(), f.cause());}} });以上,就是 SOFA 的連接的原理和設計。
總結
連接管理器是我們分析 SOFA—RPC Cluster 中的最后一個模塊,他管理著一個客戶端對應的所有服務網絡連接。
connectionHolder 內部包含多個 Map,Map 中的 key 是 Provider,value 是 ClientTransport,ClientTransport 是 RpcClient 和 SOFA 的膠水類,通常一個 Provider 對應一個 ClientTransport。ClientTransport 其實就是一個連接的包裝。
ClientTransport 獲取連接的方式則是通過 RpcClient 的 連接管理器獲取的。該連接管理器內部包含一個連接工廠,會根據 URL 創建連接。創建連接的凡是則是通過 Netty 的 BootStrap 來創建。
當我們使用 Provider 對應的 ClientTransport 中的 RpcClient 發送數據的時候,則會根據 URL 找到對應 Connection,并獲取他的 Channel ,向服務端發送數據。
好了,以上就是 SOFA—RPC 連接管理的分析。
篇幅有限,如有錯誤,還請指正。
轉載于:https://www.cnblogs.com/stateis0/p/9006080.html
總結
以上是生活随笔為你收集整理的SOFA 源码分析 — 连接管理器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《程序员代码面试指南》第八章 数组和矩阵
- 下一篇: cocos2dx3.0五种屏幕适配模式,