日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 综合教程 >内容正文

综合教程

skynet源码分析之网络层——网关服务器

發(fā)布時(shí)間:2024/8/26 综合教程 57 生活家
生活随笔 收集整理的這篇文章主要介紹了 skynet源码分析之网络层——网关服务器 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在上一篇文章里介紹Lua層通過(guò)lualib/skynet/socket.lua這個(gè)庫(kù)與網(wǎng)絡(luò)底層交互(http://www.cnblogs.com/RainRill/p/8707328.html)。除此之外,skynet還提供一個(gè)通用模板lualib/snax/gateserver來(lái)啟動(dòng)一個(gè)網(wǎng)關(guān)服務(wù)器,通過(guò)TCP連接和客戶端交換數(shù)據(jù),這個(gè)庫(kù)不能與socket.lua共用,因?yàn)檫@個(gè)庫(kù)接管了底層傳來(lái)的socket類消息,具體用法參考官方wikihttps://github.com/cloudwu/skynet/wiki/GateServer。

1. 概述

gateserver注冊(cè)接收網(wǎng)絡(luò)底層傳過(guò)來(lái)的socket消息,通過(guò)netpack.filter解析消息包(第6行),稍后會(huì)著重分析如何解析,解析完返回的type有6中類型,每種類型指定特定的回調(diào)函數(shù)。注:當(dāng)一個(gè)包不完整時(shí),type為nil,這種情況不需要處理。

"open":新連接建立;"close":關(guān)閉連接;"warning":當(dāng)fd上待發(fā)送的數(shù)據(jù)累積超過(guò)1M時(shí),會(huì)收到這個(gè)消息;’"error":發(fā)生錯(cuò)誤,關(guān)閉fd;“data”:表示收到一個(gè)完整的tcp包,回調(diào)函數(shù)把這個(gè)包傳給邏輯層去處理;

 1  -- lualib/snax/gateserver.lua
 2  skynet.register_protocol {
 3      name = "socket",
 4      id = skynet.PTYPE_SOCKET,       -- PTYPE_SOCKET = 6
 5      unpack = function ( msg, sz )
 6          return netpack.filter( queue, msg, sz)
 7      end,
 8      dispatch = function (_, _, q, type, ...)
 9          queue = q
10          if type then
11               MSG[type](...)
12          end
13      end
14  }

“more”:表示收到的數(shù)據(jù)不止一個(gè)tcp包,netpack.filter會(huì)把包依次放到隊(duì)列里,然后回調(diào)函數(shù)一個(gè)個(gè)從隊(duì)列中pop出來(lái)(第11行)

 1 -- lualib/snax/gateserver.lua
 2 local function dispatch_queue()
 3     local fd, msg, sz = netpack.pop(queue)
 4     if fd then
 5     -- may dispatch even the handler.message blocked
 6     -- If the handler.message never block, the queue should be --
 7     -- empty, so only fork once and then exit.
 8         skynet.fork(dispatch_queue)
 9             dispatch_msg(fd, msg, sz)
10 
11             for fd, msg, sz in netpack.pop, queue do
12                  dispatch_msg(fd, msg, sz)
13             end
14         end
15     end
16 
17     MSG.more = dispatch_queue

2. 如何解析TCP數(shù)據(jù)包

為了說(shuō)明如何解析TCP數(shù)據(jù)包,先了解下網(wǎng)絡(luò)底層是采用什么策略接收數(shù)據(jù)的。單個(gè)socket每次從內(nèi)核嘗試讀取的數(shù)據(jù)字節(jié)數(shù)為sz(第6行),這個(gè)值保存在s->p.size中,初始是MIN_READ_BUFFER(64b),當(dāng)實(shí)際讀到的數(shù)據(jù)等于sz時(shí),sz擴(kuò)大一倍(8-9行);如果小于sz的一半,則設(shè)置sz為原來(lái)的一半(10-11行)。

比如,客戶端發(fā)了一個(gè)1kb的數(shù)據(jù),socket線程會(huì)從內(nèi)核里依次讀取64b,128b,256b,512b,64b數(shù)據(jù),總共需讀取5次,即會(huì)向gateserver服務(wù)發(fā)5條消息,一個(gè)TCP包被切割成5個(gè)數(shù)據(jù)塊。第5次嘗試讀取1024b數(shù)據(jù),所以可能會(huì)讀到其他TCP包的數(shù)據(jù)(只要客戶端有發(fā)送其他數(shù)據(jù))。接下來(lái),客戶端再發(fā)一個(gè)1kb的數(shù)據(jù),socket線程只需從內(nèi)核讀取一次即可。

 1 // skynet-src/socket_server.c
 2 static int
 3 forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
 4     int sz = s->p.size;
 5     char * buffer = MALLOC(sz);
 6     int n = (int)read(s->fd, buffer, sz);
 7     ...
 8     if (n == sz) {
 9         s->p.size *= 2;
10     } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
11         s->p.size /= 2;
12     }
13 }

netpack做的工作就是把這些數(shù)據(jù)塊組裝成一個(gè)完整的TCP包,再交給gateserver去處理。注:netpack約定,tcp包頭兩字節(jié)(大端方式)表示數(shù)據(jù)包長(zhǎng)度。如果采用sproto打包方式,需附加4字節(jié)(32位)的session值。所以客戶端傳過(guò)來(lái)1kb數(shù)據(jù),實(shí)際數(shù)據(jù)只有1024-2-4=1018字節(jié)。

數(shù)據(jù)結(jié)構(gòu):

16-19行,用數(shù)組實(shí)現(xiàn)的隊(duì)列。當(dāng)客戶端連續(xù)發(fā)了幾個(gè)小的tcp包,gateserver收到的一條消息包可能包含多個(gè)tcp包,存放到這個(gè)隊(duì)列里

第20行,存放不完整的tcp包的指針數(shù)組,每一項(xiàng)是指向一個(gè)鏈表,fd hash值相同的組成一個(gè)鏈表。

 1  // lualib-src/lua-netpack.c
 2  struct netpack {
 3      int id; //socket id
 4      int size; //數(shù)據(jù)塊長(zhǎng)度
 5      void * buffer; //數(shù)據(jù)塊
 6  };
 7  
 8  struct uncomplete { //不完整tcp包結(jié)構(gòu)
 9      struct netpack pack; //數(shù)據(jù)塊信息
10      struct uncomplete * next; //鏈表,指向下一個(gè)
11      int read; //已讀的字節(jié)數(shù)
12      int header; //第一個(gè)字節(jié)(代表數(shù)據(jù)長(zhǎng)度的高8位)
13  };
14  
15  struct queue {
16      int cap;
17      int head;
18      int tail;
19      struct netpack queue[QUEUESIZE]; //一次從內(nèi)核讀取多個(gè)tcp包時(shí)放入該隊(duì)列里
20      struct uncomplete * hash[HASHSIZE]; //指針數(shù)組,數(shù)組里每個(gè)位置指向一個(gè)不完整的tcp包鏈表,fd hash值相同的組成一個(gè)鏈表
21  }; 

解析流程:

最終會(huì)調(diào)用filter_data_這個(gè)接口解析,下面著重介紹:

參數(shù):fd socket; buffer從內(nèi)核中讀到的數(shù)據(jù)塊;size數(shù)據(jù)塊大小

先看后半段46-82行,當(dāng)queue里并沒(méi)有該socket剩余的數(shù)據(jù)塊,執(zhí)行46行分支。

46-51行,是一個(gè)不完整的tcp包,只有一個(gè)字節(jié)數(shù)據(jù),說(shuō)明表示長(zhǎng)度的頭部?jī)勺止?jié)數(shù)據(jù)都還差一個(gè)字節(jié),構(gòu)造一個(gè)uncomplete結(jié)構(gòu)(簡(jiǎn)稱uc),然后存在queue->hash里。uc->read設(shè)置為-1,uc->header存放這一個(gè)字節(jié)。返回給Lua層的type是nil,Lua層不需要處理。

52-54行,通過(guò)頭部?jī)勺止?jié)計(jì)算tcp包的長(zhǎng)度read_size,接下來(lái)比較收到的數(shù)據(jù)size與真正需要的數(shù)據(jù)pack_size。

56-63行,size<pack_size,說(shuō)明tcp包還有未讀到的數(shù)據(jù),將已讀到的數(shù)據(jù)構(gòu)造一個(gè)uc結(jié)構(gòu),保存在queue->hash里,返回給Lua層的type是nil,Lua層不需要處理。uc->read已讀到字節(jié),uc->pack.size目標(biāo)字節(jié)數(shù)

64-73行,size=pack_size,說(shuō)明是一個(gè)完整的tcp包,大部分是這種情況,把tcp包返回給Lua層即可,此時(shí)返回的type是“data”(第66行)。

74-82行,size>pack_size,說(shuō)明不止一個(gè)tcp包的數(shù)據(jù),則先通過(guò)push_data保存第一個(gè)完整的tcp包(76行),接著通過(guò)push_more處理余下的數(shù)據(jù)(79行)。返回的type是"more"。

    push_data做的工作是將tcp包保存在隊(duì)列里,供Lua層pop出使用。

    push_more是一個(gè)遞歸操作,流程跟上面一樣,對(duì)比讀到的數(shù)據(jù)和需要的數(shù)據(jù)做對(duì)應(yīng)的處理。

接著看6-44行,之前收到了tcp包的部分?jǐn)?shù)據(jù)塊。

8-18行,說(shuō)明之前只讀到一個(gè)字節(jié),加上該數(shù)據(jù)塊的第一個(gè)字節(jié),組成兩個(gè)字節(jié)計(jì)算出整個(gè)包的長(zhǎng)度(12行)

第19行,目標(biāo)字節(jié)-已讀字節(jié)=需要的字節(jié)need。

20-27行,如果size<need,說(shuō)明仍然還差數(shù)據(jù)塊沒(méi)收到,此時(shí)將數(shù)據(jù)附加到之前的uc->pack.buffer里。

28-44行,其他兩種情況跟上面處理流程一樣。

 1 // lualib-src/lua-netpack.c
 2 static int
 3 filter_data_(lua_State *L, int fd, uint8_t * buffer, int size) {
 4     struct queue *q = lua_touserdata(L,1);
 5     struct uncomplete * uc = find_uncomplete(q, fd);
 6     if (uc) { //之前收到該包的部分?jǐn)?shù)據(jù)塊,
 7         // fill uncomplete
 8         if (uc->read < 0) {//之前只收到一個(gè)字節(jié),加上該數(shù)據(jù)塊的第一個(gè)字節(jié),表示整個(gè)包的長(zhǎng)度
 9             // read size
10             assert(uc->read == -1);
11             int pack_size = *buffer;
12             pack_size |= uc->header << 8 ;
13             ++buffer;
14             --size;
15             uc->pack.size = pack_size;
16             uc->pack.buffer = skynet_malloc(pack_size);
17             uc->read = 0;
18         }
19         int need = uc->pack.size - uc->read;//包還差多少字節(jié)
20         if (size < need) {
21             memcpy(uc->pack.buffer + uc->read, buffer, size);
22             uc->read += size;
23             int h = hash_fd(fd);
24             uc->next = q->hash[h];
25             q->hash[h] = uc;
26             return 1;
27         }
28         memcpy(uc->pack.buffer + uc->read, buffer, need);
29         buffer += need;
30         size -= need;
31         if (size == 0) {
32             lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
33             lua_pushinteger(L, fd);
34             lua_pushlightuserdata(L, uc->pack.buffer);
35             lua_pushinteger(L, uc->pack.size);
36             skynet_free(uc);
37             return 5;
38         }
39         // more data
40         push_data(L, fd, uc->pack.buffer, uc->pack.size, 0);
41         skynet_free(uc);
42         push_more(L, fd, buffer, size);
43         lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
44         return 2;
45     } else {
46         if (size == 1) {
47             struct uncomplete * uc = save_uncomplete(L, fd);
48             uc->read = -1;
49             uc->header = *buffer;
50             return 1;
51         }
52         int pack_size = read_size(buffer); //需要數(shù)據(jù)包的字節(jié)數(shù)
53         buffer+=2;
54         size-=2;
55 
56         if (size < pack_size) { //說(shuō)明還有未獲得的數(shù)據(jù)包
57             struct uncomplete * uc = save_uncomplete(L, fd); //保存這個(gè)數(shù)據(jù)包
58             uc->read = size;
59             uc->pack.size = pack_size;
60             uc->pack.buffer = skynet_malloc(pack_size);
61             memcpy(uc->pack.buffer, buffer, size);
62             return 1;
63         }
64         if (size == pack_size) { //說(shuō)明是一個(gè)完整包,把包返回給Lua層即可
65             // just one package
66             lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
67             lua_pushinteger(L, fd);
68             void * result = skynet_malloc(pack_size);
69             memcpy(result, buffer, size);
70             lua_pushlightuserdata(L, result);
71             lua_pushinteger(L, size);
72             return 5;
73         }
74         // more data
75         // 說(shuō)明不止同一個(gè)數(shù)據(jù)包,還有額外的
76         push_data(L, fd, buffer, pack_size, 1); //保存第一個(gè)包到q->queue中
77         buffer += pack_size;
78         size -= pack_size;
79         push_more(L, fd, buffer, size); //處理余下的包
80         lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
81         return 2;
82     }
83 }

舉例,客戶端發(fā)了一個(gè)1kb的數(shù)據(jù),socket線程會(huì)從內(nèi)核里依次讀取64b,128b,256b,512b,64b數(shù)據(jù)。gateserver會(huì)執(zhí)行5次filter_data_,分支依次是56行,20行,20行,20行,31行。

總結(jié)

以上是生活随笔為你收集整理的skynet源码分析之网络层——网关服务器的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。