日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

NGINX-RTMP复杂度分析

發(fā)布時間:2024/2/28 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 NGINX-RTMP复杂度分析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
很好奇nginx如何處理異步請求,我看nginx-rtmp在處理異步時狀態(tài)也還行,所以調(diào)試下看看。
純異步做rtmp協(xié)議,真是非常復(fù)雜,特別是需要做回源。無數(shù)的回調(diào)和處理邏輯。NGINX-RTMP幾個簡化問題的方法:
1. RTMP-CHUNK協(xié)議解析直接在一個函數(shù)里做,避免添加狀態(tài)。
2. CHUNK的長度可以計算得出,所以收到要求的長度的數(shù)據(jù)后,才開始協(xié)議解析。
3. 收發(fā)數(shù)據(jù),協(xié)議解析,包邏輯處理,三部分分離。
4. 包收到后通過各種handler回調(diào),包發(fā)送時只是發(fā)送到chain,異步的發(fā)送和包發(fā)送分離。
5. 使用ATM模型來收發(fā)數(shù)據(jù)。
6. 使用chain作為收發(fā)數(shù)據(jù)的緩存。收取時,chain達(dá)到長度才開始處理;發(fā)送包時,緩存到chain,然后啟動異步發(fā)送即可。
整個機(jī)制雖然有所簡化,比起同步的調(diào)用,要復(fù)雜至少10倍。


首先,下載和解壓nginx:
? ? ?tar xf nginx-1.5.0.tar.gz && tar xf nginx-rtmp-module-1.0.4.tar.gz
? ? ?cd nginx-1.5.0 && ./configure --add-module=/home/winlin/nginx-rtmp-module-1.0.4 --with-http_ssl_module --prefix=`pwd`/release
修改編譯參數(shù),將優(yōu)化去掉:
? ? ?sed -i "s/-O /-O0 /g" objs/Makefile
編譯:
? ? ?make && make install
配置RTMP:
? ? ?vi conf/nginx.conf
添加如下:
? ? daemon off;
? ? master_process off;
? ? rtmp {
? ? ? ? server {
? ? ? ? ? ? listen 19352;
? ? ? ? ? ? application live {
? ? ? ? ? ? ? ? live on;
? ? ? ? ? ? ? ? allow publish all;
? ? ? ? ? ? ? ? allow play all;
? ? ? ? ? ? }
? ? ? ? }
? ? }
配置可以在nginx.c中看到:
? ? { ngx_string("daemon"),
? ? ? NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_FLAG,
? ? ? ngx_conf_set_flag_slot,
? ? ? 0,
? ? ? offsetof(ngx_core_conf_t, daemon),
? ? ? NULL },
跳到函數(shù):ngx_conf_set_flag_slot,可以看到應(yīng)該配置為on或off。
開始調(diào)試。


ultimate IDE的project如附件:


目錄結(jié)構(gòu)如下:
E:\Chnvideo\research\nginx
main
nginx-1.5.0
nginx-rtmp-module-1.0.4
只要將nginx和rtmp模塊解壓到這個目錄,然后將當(dāng)前目錄(E:\Chnvideo\research\nginx)在IDE打開即可。
IDE=>New assembly => package nests 輸入:E:\Chnvideo\research\nginx


Rtmp模塊初始化
rtmp模塊初始化函數(shù)調(diào)用如下:
(gdb) bt
#0 ?ngx_rtmp_optimize_servers (cf=0x7fffffffe1d0, ports=0x7fffffffddb0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:605
#1 ?0x00000000004ae362 in ngx_rtmp_block (cf=0x7fffffffe1d0, cmd=0x711680, conf=0xa4da78) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:312
#2 ?0x000000000041f173 in ngx_conf_handler (cf=0x7fffffffe1d0, last=1) at src/core/ngx_conf_file.c:387
#3 ?0x000000000041ed1e in ngx_conf_parse (cf=0x7fffffffe1d0, filename=0xa4ce00) at src/core/ngx_conf_file.c:243
#4 ?0x000000000041b9cc in ngx_init_cycle (old_cycle=0x7fffffffe310) at src/core/ngx_cycle.c:268
#5 ?0x0000000000406230 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:333
(gdb) f
#0 ?ngx_rtmp_optimize_servers (cf=0x7fffffffe1d0, ports=0x7fffffffddb0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:605
605 ? ? ? ? ? ? ? ? ls->handler = ngx_rtmp_init_connection;
默認(rèn)將accept之后的處理函數(shù)設(shè)置為了ngx_rtmp_init_connection。


Nginx的回調(diào)函數(shù)
在ngx_rtmp_init_connection(ngx_rtmp_init.c:132)中設(shè)置斷點,可以看到接受連接和處理handshake的過程。
#0 ?ngx_rtmp_init_connection (c=0x7ffff7fad190) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_init.c:31
#1 ?0x000000000042befe in ngx_event_accept (ev=0xa7b370) at src/event/ngx_event_accept.c:357
#2 ?0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=18446744073709551615, flags=1) at src/event/modules/ngx_epoll_module.c:683
#3 ?0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#4 ?0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#5 ?0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
在ngx_event_accept(ngx_event_accept.c:206)中,接受到連接后進(jìn)行處理,其中設(shè)置了recvchain:
? ? ? ? c->recv_chain = ngx_recv_chain;
? ? ? ? c->send_chain = ngx_send_chain;
這兩個值是宏定義,在ngx_event.h:458:
extern ngx_os_io_t ?ngx_io;
#define ngx_recv_chain ? ? ? ngx_io.recv_chain
#define ngx_send_chain ? ? ? ngx_io.send_chain
這個ngx_io是個extern的變量,grep搜索,發(fā)現(xiàn)定義在:
[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_io\;"
src/core/ngx_connection.c:13:ngx_os_io_t ?ngx_io;
[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_io ="|grep epoll
src/event/modules/ngx_epoll_module.c:329: ? ?ngx_io = ngx_os_io;
所以會在epoll這個模塊被重置,可以設(shè)置斷點。
b ngx_epoll_module.c:329
Breakpoint 6, ngx_epoll_init (cycle=0xa4cca0, timer=0) at src/event/modules/ngx_epoll_module.c:329
329 ? ? ? ? ngx_io = ngx_os_io;
(gdb) p &ngx_io
$32 = (ngx_os_io_t *) 0xa222a0
(gdb) p &ngx_os_io
$33 = (ngx_os_io_t *) 0x703440
(gdb) p ngx_os_io
$34 = {recv = 0x42f8b0 <ngx_unix_recv>, recv_chain = 0x42f9c8 <ngx_readv_chain>, udp_recv = 0x42fc74 <ngx_udp_unix_recv>, send = 0x42fd34 <ngx_unix_send>, send_chain = 0x435ee4 <ngx_linux_sendfile_chain>, flags = 1}
(gdb) p ngx_io
$35 = {recv = 0, recv_chain = 0, udp_recv = 0, send = 0, send_chain = 0, flags = 0}
在connection.c中的初始化只是默認(rèn)初始化為0,只有在ngx_os_io這個里面才是真正用到的。
這個ngx_os_io是在這里定義的:
[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_os_io \="|grep linux
src/os/unix/ngx_linux_init.c:75: ? ?ngx_os_io = ngx_linux_io;
這個結(jié)構(gòu)體定義為:
static ngx_os_io_t ngx_linux_io = {
? ? ngx_unix_recv,
? ? ngx_readv_chain,
? ? ngx_unix_send,
? ? ngx_linux_sendfile_chain,
};
函數(shù)定義在ngx_readv_chain.c:19和ngx_linux_sendfile_chain.c:38(對于rtmp還是用的writev,不是用的sendfile)。


處理函數(shù)的改變
ngx_rtmp_init_connection里面調(diào)用了ngx_rtmp_init_session和ngx_rtmp_handshake。
ngx_rtmp_init_session先調(diào)用ngx_rtmp_set_chunk_size,然后調(diào)用 ngx_rtmp_fire_event激發(fā)了事件NGX_RTMP_CONNECT。
ngx_rtmp_set_chunk_size,第一次將chunk-size設(shè)為128(NGX_RTMP_DEFAULT_CHUNK_SIZE),不需要發(fā)包。
ngx_rtmp_fire_event的handler可以搜索:NGX_RTMP_CONNECT,定義在:ngx_rtmp_limit_postconfiguration
h = ngx_array_push(&cmcf->events[NGX_RTMP_CONNECT]);
*h = ngx_rtmp_limit_connect;
所以NGX_RTMP_CONNECT的handler應(yīng)該是ngx_rtmp_limit_connect,這個函數(shù)檢查一個共享的變量,然后判斷是否超過連接數(shù),返回錯誤或者OK:
? ? ? rc = n > (ngx_uint_t) lmcf->max_conn ? NGX_ERROR : NGX_OK;
若可以接受這個連接,則開始握手,調(diào)用函數(shù)ngx_rtmp_handshake如下:
#0 ?ngx_rtmp_handshake (s=0xa665b0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c:576
#1 ?0x00000000004af3ab in ngx_rtmp_init_connection (c=0x7ffff7fad190) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_init.c:132
#2 ?0x000000000042befe in ngx_event_accept (ev=0xa7b370) at src/event/ngx_event_accept.c:357
#3 ?0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=18446744073709551615, flags=1) at src/event/modules/ngx_epoll_module.c:683
#4 ?0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#5 ?0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#6 ?0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
在ngx_rtmp_handshake中,初始化rtmp的讀寫函數(shù)為:
c->read->handler = ?ngx_rtmp_handshake_recv;
c->write->handler = ngx_rtmp_handshake_send;
然后,設(shè)置狀態(tài)機(jī):
? ? ? s->hs_stage = NGX_RTMP_HANDSHAKE_SERVER_RECV_CHALLENGE;
并直接進(jìn)入下一個狀態(tài):
? ? ? ngx_rtmp_handshake_recv(c->read);
可見,還是使用了狀態(tài)機(jī),只是將狀態(tài)組合成了幾個大的階段,通過設(shè)置不同的c->read/write->hander來處理。
在handshake完畢的函數(shù)ngx_rtmp_cycle(s)里面,將handler設(shè)置成了ngx_rtmp_recv/ngx_rtmp_send。


HandshakeBuffer
設(shè)置斷點,看狀態(tài)如何改變:
(gdb) b ngx_rtmp_handshake_recv
Breakpoint 15 at 0x4b0526: file /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c, line 375.
(gdb) b ngx_rtmp_handshake_send
Breakpoint 16 at 0x4b09a6: file /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c, line 490.
ngx_rtmp_handshake_recv,接受消息,處理時,先接受指定長度的數(shù)據(jù),然后再處理。
b = s->hs_buf;
while (b->last != b->end) {
? ? n = c->recv(c, b->last, b->end - b->last);
? ? if (n == NGX_AGAIN) {
? ? ? ? ngx_add_timer(rev, s->timeout);
? ? ? ? if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
? ? ? ? ? ? ngx_rtmp_finalize_session(s);
? ? ? ? }
? ? ? ? return;
? ? }
? ? b->last += n;
}
第一次時,會收1537字節(jié)然后處理,:
? ? ?(gdb) p b->end-b->last
? ? ? $76 = 1537
所以這個s->hs_buf應(yīng)該初始化為1537字節(jié),即handshake的字節(jié)。在調(diào)用handshake時初始化為NGX_RTMP_HANDSHAKE_BUFSIZE長度。


獲取時間:
ngx_time_update這個函數(shù)會更新全局變量 ngx_current_msec ,調(diào)用的是ngx_gettimeofday,如果過多頻繁的調(diào)用會有性能問題。
所以應(yīng)該不會隨意調(diào)用它,很多時候只是直接用,相當(dāng)于cache了這個時間。


timer的處理,超時:
假設(shè)收取失敗,返回的是NGX_AGAIN(-2),則會使用定時器:ngx_add_timer設(shè)置超時,然后調(diào)用ngx_handle_read_event偵聽READ事件。
ngx_add_timer會添加到rbtree里面去:
? ? ? timer = {key = 1380262079704, left = 0x719700, right = 0x719700, parent = 0x0, color = 0 '\000', data = 0 '\000'}
NGX_EAGAIN返回后,會調(diào)用ngx_event_expire_timers函數(shù),這個會取全局的rbtree的root:ngx_event_timer_rbtree
? ? ? (gdb) p ngx_event_timer_rbtree?
? ? ? $98 = {root = 0xa7b468, sentinel = 0x719700, insert = 0x414ded <ngx_rbtree_insert_timer_value>}
? ? ?(gdb) p *ngx_event_timer_rbtree.root
? ? ? $101 = {key = 1380262079704, left = 0x719700, right = 0x719700, parent = 0x0, color = 0 '\000', data = 0 '\000'}
這個root就是ngx_add_timer添加的那個選項,rev就是handshake的那個結(jié)構(gòu)。
rbtree的key是超時時間,所以最小的超時時間會放在前面,ngx_event_add_timer函數(shù)里面:
? ? ? key = ngx_current_msec + timer;
rbtree的節(jié)點是直接指向evt對象的,所以ngx_event_expire_timers可以直接從rbtree的node獲取evt:
? ? ? ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));
然后設(shè)置為timeout后,直接就調(diào)用handler:
? ? ?ev->timedout = 1;
? ? ?ev->handler(ev); // ngx_rtmp_handshake_recv
因為timeout,所以就關(guān)掉連接了。


EAGAIN的處理:
假若沒有timeout,則進(jìn)入到EAGAIN的處理。
ngx_event_find_timer會返回一個timeout,共epoll_wait來處理超時事件。
? ? ?timer = ngx_event_find_timer(); //60000,handshake的超時為60秒。
? ? ?events = epoll_wait(ep, event_list, (int) nevents, timer);
若fd可讀,則調(diào)用read的handler:
? ? ?if ((revents & EPOLLIN) && rev->active) {
? ? ? ? ? rev = c->read;
? ? ? ? ? rev->handler(rev);
? ? ?(gdb) p *c->read->handler
? ? ? $125 = {void (ngx_event_t *)} 0x4b0519 <ngx_rtmp_handshake_recv>
所以直接就調(diào)用到了這個處理函數(shù)ngx_rtmp_handshake_recv,先清除timer,然后讀取到handshake需要的長度的數(shù)據(jù)。
? ? ?(gdb) p b->end-b->last
? ? ? $126 = 1437
還需要讀取這些數(shù)據(jù)。若EAGAIN,還是在可讀時調(diào)用這個函數(shù)。
可見nginx來設(shè)置不同的handler,相當(dāng)于將大狀態(tài)機(jī)切割成小狀態(tài)機(jī),小狀態(tài)機(jī)里面直接switch。
若c++用類表示大狀態(tài)機(jī),用switch表示小狀態(tài)機(jī),是可以簡化的。設(shè)置不同的handler,就設(shè)置不同的狀態(tài)子類表示。


ATM模型:
ATM就是讀-寫-讀這種分離的模型 ,避免又讀又寫。即同一時刻,只能讀或者寫,做完了才能做下一個。
ngx_rtmp_handshake_recv中,會先讀取,若EAGAIN則偵聽讀事件,若讀取完畢則刪除讀事件。
? ? ?// start read, if not completed, focus read event.
? ? ?n = c->recv(c, b->last, b->end - b->last);
? ? ?if (n == NGX_AGAIN) {
? ? ? ? ? ngx_handle_read_event(c->read, 0);
? ? ? ? ? return;
? ? ?}
? ? ?// read completed, donot focus read event.
? ? ?if (rev->active) {
? ? ? ? ? ngx_del_event(rev, NGX_READ_EVENT, 0);
? ? ?}
若讀取C0C1成功,則發(fā)送S0S1S2。調(diào)用的是:
? ? ?ngx_rtmp_handshake_send(c->write);
這個函數(shù)會嘗試發(fā)送,也是ATM模型:
? ? ?// start write, if not completed, focus write event.
? ? ?n = c->send(c, b->pos, b->last - b->pos);
? ? ?if (n == NGX_AGAIN || n == 0) {
? ? ? ? ? ngx_handle_write_event(c->write, 0)
? ? ? ? ? return;
? ? ?}
? ? ?// write completed, donot focus write event.
? ? ?if (wev->active) {
? ? ? ? ? ngx_del_event(wev, NGX_WRITE_EVENT, 0);
? ? ?}
發(fā)送S0S1完畢,還是要收C2.等C0C1C2和S0S1S2都處理完畢,則進(jìn)入ngx_rtmp_handshake_done,調(diào)用ngx_rtmp_cycle函數(shù),進(jìn)入包處理邏輯。


RTMP收包
這個應(yīng)該是狀態(tài)巨多的一個場景,每個包都是一個大狀態(tài)。
首先把handler設(shè)置成收發(fā)的函數(shù)。
? ? ?c->read->handler = ?ngx_rtmp_recv;
? ? ?c->write->handler = ngx_rtmp_send;
并先開始收數(shù)據(jù)并處理:
? ? ?ngx_rtmp_recv(c->read);
ngx_rtmp_recv的 主要邏輯就是解析chunk為RTMP包,并調(diào)用處理包的函數(shù)。
ngx_rtmp_session_t* s中有個成員是 ngx_rtmp_stream_t* in_streams,這個就是chunk streams
? ? ?st = &s->in_streams[s->in_csid]; // 獲取或創(chuàng)建一個chunk_stream,直接將數(shù)組轉(zhuǎn)換為map,即預(yù)先開辟in_streams的空間。
其中:
typedef struct {
? ? ngx_rtmp_header_t ? ? ? hdr;
? ? uint32_t ? ? ? ? ? ? ? ?dtime;
? ? uint32_t ? ? ? ? ? ? ? ?len; ? ? ? ?/* current fragment length */
? ? uint8_t ? ? ? ? ? ? ? ? ext;
? ? ngx_chain_t ? ? ? ? ? ?*in;
} ngx_rtmp_stream_t;
每個ngx_rtmp_stream_t中都定義了一個chain:in。每個chain其實就是一個緩存 ,就是說給每個chunk_stream新建了一個可以寫的buffer。
? ? ?n = c->recv(c, b->last, b->end - b->last); // 先收146字節(jié)的數(shù)據(jù)。調(diào)試時可以改為10,看如何解析。
b的初始化在ngx_rtmp_init_session,設(shè)置了in_streams里面:
? ? ?s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) * cscf->max_streams);
收到數(shù)據(jù)后,開始解析chunk。若只讀取了10字節(jié),那么basic_header當(dāng)fmt為0時需要12字節(jié),會在這個地方再讀取:
? ? ?if (fmt == 0) {
? ? ? ? ? if (b->last - p < 4)
? ? ? ? ? ? ? ?continue;
? ? (gdb) p b->last - p
? ? $173 = 2
再次循環(huán)調(diào)用recv和解析:
? ? ?n = c->recv(c, b->last, b->end - b->last); // b->end - b->last=136,讀取了10字節(jié)。
解析時,還是會重頭開始解析一遍。 除非header解析完了,才會改變pos位置
? ? ?/* header done */
? ? ?b->pos = p; // p-b->start=12, fmt為0時的12字節(jié)basic header.
解析完的頭如下:
? ? ?(gdb) p st->hdr
? ? ? $189 = {csid = 3, timestamp = 0, mlen = 161, type = 20 '\024', msid = 0}
有個地方驗證了消息的最大長度,不能超過這個: cscf->max_message,默認(rèn)1048576即1M,這個對大碼率視頻可能需要配置
解析完頭,就得看body是否足夠了:
? ? ?size = b->last - b->pos; // buffer的數(shù)據(jù),134
? ? ?fsize = h->mlen - st->len; // 消息還沒有下載的長度。
這個時候,還需要更多的數(shù)據(jù):
? ? ?if (fsize > s->in_chunk_size) {
? ? ? ? ? st->len += s->in_chunk_size;
? ? ? ? ? b->last = b->pos + s->in_chunk_size;
這樣就算出了還需要收多少數(shù)據(jù):
? ? ?(gdb) p b->end-b->last
? ? ? $198 = 6
然后會接著收,一直收完為止。


RTMP處理包
ngx_rtmp_recv收到一個包后 ,調(diào)用ngx_rtmp_receive_message處理, 這個函數(shù)根據(jù)消息的類型來找到對應(yīng)的handler:
? ? ? evhs = &cmcf->events[h->type];
? ? ?evh = evhs->elts;
? ? ?(*evh)(s, h, in)
消息處理的鉤子函數(shù)的初始化是在ngx_rtmp_init_event_handlers,譬如AMF0的消息處理在:
? ? static size_t ? ? ? ? ? ? ? amf_events[] = {
? ? ? ? NGX_RTMP_MSG_AMF_CMD,
? ? ? ? NGX_RTMP_MSG_AMF_META,
? ? ? ? NGX_RTMP_MSG_AMF_SHARED,
? ? ? ? NGX_RTMP_MSG_AMF3_CMD,
? ? ? ? NGX_RTMP_MSG_AMF3_META,
? ? ? ? NGX_RTMP_MSG_AMF3_SHARED
? ? };
? ? /* init amf events */
? ? for(n = 0; n < sizeof(amf_events) / sizeof(amf_events[0]); ++n) {
? ? ? ? eh = ngx_array_push(&cmcf->events[amf_events[n]]);
? ? ? ? *eh = ngx_rtmp_amf_message_handler;
? ? }
在amf的處理函數(shù)中,還得解析body內(nèi)容后再調(diào)用其他鉤子,來處理特定的amf0函數(shù):
? ? ?ph = ch->elts;
? ? ?(*ph)(s, h, in) // ngx_rtmp_cmd_connect_init
也就是說,這個處理實際上是最難的,雖然一個消息能被一個處理函數(shù)處理,可是這個函數(shù)不知道很多狀態(tài)信息。
不過nginx-rtmp這樣還是處理的很漂亮,非常Nice。
其實同步的邏輯是這樣寫:
? ? ? ? ? MessagePacket* packet = NULL;
? ? ? ? ? if((ret = ctx_core->rtmp->RecvMessage(&packet)) != ErrorCode::Success){
? ? ? ? ? ? ? ?return ret;
? ? ? ? ? }
? ? ? ? ? if(packet is play){
? ? ? ? ? ? ? ?return process_play();
? ? ? ? ? }
? ? ? ? ? if(packet is publish){
? ? ? ? ? ? ? ?return process_publish();
? ? ? ? ? }
純異步的方式是超級復(fù)雜,特別是加上了回源pull和push兩種方式后。
能簡化純異步的技術(shù)有:
1. 將收包和解析處理分開,確認(rèn)消息收到了才開始解析。
2. 異步的ATM模型,讀寫分開,讀完才寫,寫完才讀。
3. 處理時使用handler,譬如handshake有handler,amf0有handler,amf0-connect有handler。
同步方式是要簡單很多。


Edge-relay-pull模式
nginx-rtmp的邊緣叫做relay,即中繼模式。有pull和push,即上行推流和下行播放。
實際上FMS是不區(qū)分這兩個的,中繼時只需要指定上游服務(wù)器地址即可。
收到請求前的調(diào)用:
ngx_rtmp_relay_create_app_conf,初始化app的配置。
ngx_rtmp_relay_push_pull,解析pull的命令和參數(shù)。
ngx_rtmp_relay_merge_app_conf
ngx_rtmp_relay_postconfiguration
ngx_rtmp_relay_init_process
連接上游的調(diào)用:
ngx_rtmp_relay_pull
ngx_rtmp_relay_create_local_ctx
ngx_rtmp_relay_create_remote_ctx
ngx_rtmp_relay_create_connection
ngx_event_connect_peer(堆棧如下)
#0 ?ngx_event_connect_peer (pc=0xa53ed8) at src/event/ngx_event_connect.c:25
#1 ?0x00000000004cb815 in ngx_rtmp_relay_create_connection (cctx=0x7fffffffdd60, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:468
#2 ?0x00000000004cba2a in ngx_rtmp_relay_create_remote_ctx (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:527
#3 ?0x00000000004cbce3 in ngx_rtmp_relay_create (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0, create_publish_ctx=0x4cb9d5 <ngx_rtmp_relay_create_remote_ctx>, create_play_ctx=0x4cba2c <ngx_rtmp_relay_create_local_ctx>)
? ? at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:607
#4 ?0x00000000004cbde9 in ngx_rtmp_relay_pull (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:630
#5 ?0x00000000004cc226 in ngx_rtmp_relay_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:738
#6 ?0x00000000004d0df5 in ngx_rtmp_enotify_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_enotify_module.c:412
#7 ?0x00000000004d4045 in ngx_rtmp_notify_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_notify_module.c:1407
#8 ?0x00000000004d5b6d in ngx_rtmp_log_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_log_module.c:844
#9 ?0x00000000004b8f7c in ngx_rtmp_cmd_play_init (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_cmd_module.c:569
#10 0x00000000004b6d2b in ngx_rtmp_amf_message_handler (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_receive.c:436
#11 0x00000000004b28f3 in ngx_rtmp_receive_message (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:791
#12 0x00000000004b1d19 in ngx_rtmp_recv (rev=0xa7b440) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:456
#13 0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=59762, flags=1) at src/event/modules/ngx_epoll_module.c:683
#14 0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#15 0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#16 0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
連接后,進(jìn)行握手:
? ? ?ngx_rtmp_relay_create_connection
? ? ?ngx_event_connect_peer
? ? ?ngx_rtmp_init_session
? ? ?ngx_rtmp_client_handshake
握手完成時,回調(diào)relay的函數(shù):
? ? ?ngx_epoll_process_events
? ? ?ngx_rtmp_handshake_recv
? ? ?ngx_rtmp_handshake_recv
? ? ?ngx_rtmp_handshake_send
? ? ?ngx_rtmp_handshake_done
? ? ?ngx_rtmp_fire_event
? ? ?ngx_rtmp_relay_handshake_done
? ? ?ngx_rtmp_relay_send_connect
正常的握手是沒有回調(diào)的,就進(jìn)入了ngx_rtmp_cycle,相當(dāng)于relay在這個地方進(jìn)行了hook:
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
? ? ?ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE)
? ? ?ngx_rtmp_cycle(s);


異步發(fā)送多個包
回源連接connect時,連續(xù)發(fā)送了幾個包,對于異步的socket,如何做到的呢?
ngx_rtmp_relay_send_connect
? ? return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
? ? ? ? || ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
? ? ? ? || ngx_rtmp_send_amf(s, &h, out_elts,
? ? ? ? ? ? sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? ? ? ? ? NGX_ERROR
? ? ? ? : NGX_OK;
分析一個:ngx_rtmp_send_chunk_size
? ? ?ngx_rtmp_relay_send_connect
? ? ?ngx_rtmp_send_chunk_size
? ? ?ngx_rtmp_send_shared_packet
? ? ?ngx_rtmp_send_message
? ? ?ngx_rtmp_send
這個函數(shù)是先將包寫到buffer:
ngx_int_t
ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
{
? ? return ngx_rtmp_send_shared_packet(s,
? ? ? ? ? ?ngx_rtmp_create_chunk_size(s, chunk_size));
}
ngx_chain_t *
ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)?
其中,ngx_rtmp_create_chunk_size就是將包寫到ngx_chain_t*,調(diào)用ngx_rtmp_prepare_message。
發(fā)送多包的邏輯是:
? ? ?if (!s->connection->write->active) {
? ? ? ? ? ngx_rtmp_send(s->connection->write);
? ? ?}
其中,ngx_rtmp_send就是一個沒有返回值的函數(shù),它主要是啟動ATM到寫模式,若寫不完,在可寫時會自動回調(diào)這個函數(shù)。
所以發(fā)送多包,實際上就是把包發(fā)送到ngx_rtmp_send(ngx_event_t *wev)的wev的chain里面,相當(dāng)于緩沖區(qū),它會保證將它發(fā)送完。
至于ngx_rtmp_send_message,根本就不會處理這個發(fā)送錯誤,它打交道的是緩沖區(qū)。
不過這樣比起同步來,也足夠麻煩了。
簡化的地方:將發(fā)送和組包分離,組包到chain,可以將多個包進(jìn)入隊列,發(fā)送只負(fù)責(zé)發(fā)就好了。
同樣,發(fā)送成功后,需要調(diào)用特殊的回調(diào)函數(shù)。
譬如處理amf的_result消息時:
ngx_rtmp_relay_postconfiguration
? ? ch = ngx_array_push(&cmcf->amf);
? ? ngx_str_set(&ch->name, "_result");
? ? ch->handler = ngx_rtmp_relay_on_result;
這樣可以回調(diào):
? ? ?ngx_rtmp_recv
? ? ?ngx_rtmp_receive_message
? ? ?ngx_rtmp_amf_message_handler
? ? ?ngx_rtmp_relay_on_result
? ? ?ngx_rtmp_relay_send_create_stream
整個異步的狀態(tài),無處不在。


邊緣數(shù)據(jù)
邊緣處理數(shù)據(jù)包的調(diào)用是:
? ? ?ngx_epoll_process_events
? ? ?ngx_rtmp_recv
? ? ?ngx_rtmp_receive_message
? ? ?ngx_rtmp_codec_av
回調(diào)是在ngx_rtmp_codec_postconfiguration中設(shè)置:
? ? ?h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]);
? ? ?*h = ngx_rtmp_codec_av;
? ? ?h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);
? ? ?*h = ngx_rtmp_codec_av;
其中,回源連接的ngx_rtmp_receive_message會調(diào)用的handler包括:
? ? ?ngx_rtmp_codec_av //解碼av。
? ? ?ngx_rtmp_live_av // 轉(zhuǎn)發(fā)給所有客戶端
ngx_rtmp_live_av的調(diào)用是:
#0 ?ngx_rtmp_send (wev=0xa95450) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:492
#1 ?0x00000000004b283f in ngx_rtmp_send_message (s=0xa665b0, out=0xab33e4, priority=0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:736
#2 ?0x00000000004bef33 in ngx_rtmp_live_av (s=0xa53f88, h=0xaaf4a0, in=0xab1400) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_live_module.c:957
#3 ?0x00000000004b28f3 in ngx_rtmp_receive_message (s=0xa53f88, h=0xaaf4a0, in=0xab1400) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:791
#4 ?0x00000000004b1d19 in ngx_rtmp_recv (rev=0xa7b4a8) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:456
#5 ?0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=49151, flags=1) at src/event/modules/ngx_epoll_module.c:683
#6 ?0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#7 ?0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#8 ?0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
其中ngx_rtmp_live_av,也是調(diào)用ngx_rtmp_send_message,發(fā)送到緩沖區(qū),然后發(fā)起發(fā)送請求而已,不必等到所有數(shù)據(jù)都發(fā)出去了。
? ? ?ngx_rtmp_live_av
? ? ?/* broadcast to all subscribers */
是在live模塊中實現(xiàn)的這個轉(zhuǎn)發(fā),即subscribe模式。
也就是說,這個回源連接收到數(shù)據(jù)后,同時轉(zhuǎn)發(fā)給了其他socket(準(zhǔn)確講是轉(zhuǎn)發(fā)給了各個客戶端的chain,然后異步發(fā)送)。

總結(jié)

以上是生活随笔為你收集整理的NGINX-RTMP复杂度分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。