日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

srs代码学习(4)-怎么转发流

發布時間:2024/2/28 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 srs代码学习(4)-怎么转发流 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

publish的流和play的流怎么連接呢?這個恐怕是最繞的地方了。看了一上午的代碼,淹沒于各種數據結構與流程之中后,俺終于發現了連接publish和play的關鍵連個類是

SrsSource

SrsConsumer

負責連接著連個類實例的是

SrsRtmpConn?

下面我們詳細講解連接過程


上片我們說到。在底層客戶端連接上來后,會經過一系列處理,最后繞到SrsRtmpConn類的循環函數中。就是下面的函數

[cpp]?view plaincopy
  • int?SrsConnection::cycle()??
  • {??
  • ????int?ret?=?ERROR_SUCCESS;??
  • ??????
  • ????_srs_context->generate_id();??
  • ????id?=?_srs_context->get_id();??
  • ??????
  • ????ip?=?srs_get_peer_ip(st_netfd_fileno(stfd));??
  • ??????
  • ????ret?=?do_cycle();??
  • ??????
  • ????//?if?socket?io?error,?set?to?closed.??
  • ????if?(srs_is_client_gracefully_close(ret))?{??
  • ????????ret?=?ERROR_SOCKET_CLOSED;??
  • ????}??
  • ??????
  • ????//?success.??
  • ????if?(ret?==?ERROR_SUCCESS)?{??
  • ????????srs_trace("client?finished.");??
  • ????}??
  • ??????
  • ????//?client?close?peer.??
  • ????if?(ret?==?ERROR_SOCKET_CLOSED)?{??
  • ????????srs_warn("client?disconnect?peer.?ret=%d",?ret);??
  • ????}??
  • ??
  • ????return?ERROR_SUCCESS;??
  • }??

  • 這個是SrsRtmpConn的基類SrsConnection的函數。在基類里,do_cycle()是個純虛函數。具體實現完全是靠這子類來的。

    那么rtmp類型的這個子類,到底有多么的變態呢,先看看我畫的一個流程圖,都沒有畫完。一張放不下,的截好幾張








    夠長的,這里我還只是畫到了播放的時候,發布流程還沒有畫。因為太復雜了。


    下面開始一步一步的分析

    首先看do_cycle()函數這個函數主要負責握手和連命令。并在成功后。獲取流的配置信息。關鍵代碼如下

    [cpp]?view plaincopy
  • if?((ret?=?rtmp->handshake())?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("rtmp?handshake?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??
  • ????srs_verbose("rtmp?handshake?success");??
  • ??????
  • ????if?((ret?=?rtmp->connect_app(req))?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("rtmp?connect?vhost/app?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??
  • ????srs_verbose("rtmp?connect?app?success");??
  • 注意這里有一個比較重要的數據結構

    [cpp]?view plaincopy
  • SrsRequest*?req??

  • 這個主要是存儲請求信息的,比如app turl streamid等等。

    在各種分析后,進入下一個cycle,service_cycle()函數

    service_cycly()函數在做了一些設置工作,設置比如chunk size。代碼如下


    [cpp]?view plaincopy
  • if?((ret?=?rtmp->set_window_ack_size((int)(2.5?*?1000?*?1000)))?!=?ERROR_SUCCESS)?{??
  • ???????srs_error("set?window?acknowledgement?size?failed.?ret=%d",?ret);??
  • ???????return?ret;??
  • ???}??
  • ???srs_verbose("set?window?acknowledgement?size?success");??
  • ?????????
  • ???if?((ret?=?rtmp->set_peer_bandwidth((int)(2.5?*?1000?*?1000),?2))?!=?ERROR_SUCCESS)?{??
  • ???????srs_error("set?peer?bandwidth?failed.?ret=%d",?ret);??
  • ???????return?ret;??
  • ???}??

  • 下面一段代碼沒有看明白。這個是一個補丁打上去的,說說為了做do token traverse。這個暫時先不研究了。

    [cpp]?view plaincopy
  • if?(true)?{??
  • ????????bool?vhost_is_edge?=?_srs_config->get_vhost_is_edge(req->vhost);??
  • ????????bool?edge_traverse?=?_srs_config->get_vhost_edge_token_traverse(req->vhost);??
  • ????????if?(vhost_is_edge?&&?edge_traverse)?{??
  • ????????????if?((ret?=?check_edge_token_traverse_auth())?!=?ERROR_SUCCESS)?{??
  • ????????????????srs_warn("token?auth?failed,?ret=%d",?ret);??
  • ????????????????return?ret;??
  • ????????????}??
  • ????????}??
  • ????}??

  • 接著設置chunk 的大小

    [cpp]?view plaincopy
  • int?chunk_size?=?_srs_config->get_chunk_size(req->vhost);??
  • ???if?((ret?=?rtmp->set_chunk_size(chunk_size))?!=?ERROR_SUCCESS)?{??
  • ???????srs_error("set?chunk_size=%d?failed.?ret=%d",?chunk_size,?ret);??
  • ???????return?ret;??
  • ???}??

  • 回應客戶端。連接ok

    [cpp]?view plaincopy
  • if?((ret?=?rtmp->response_connect_app(req,?local_ip.c_str()))?!=?ERROR_SUCCESS)?{??
  • ???????srs_error("response?connect?app?failed.?ret=%d",?ret);??
  • ???????return?ret;??
  • ???}??

  • 然后連接就結束了,進入stream_service_cycle()函數,從名字上就可以看出。這個函數是開始就如流命令時代

    [cpp]?view plaincopy
  • while?(!disposed)?{??
  • ????ret?=?stream_service_cycle();??
  • ??????
  • ????//?stream?service?must?terminated?with?error,?never?success.??
  • ????//?when?terminated?with?success,?it's?user?required?to?stop.??
  • ????if?(ret?==?ERROR_SUCCESS)?{??
  • ????????continue;??
  • ????}??
  • ??????
  • ????//?when?not?system?control?error,?fatal?error,?return.??
  • ????if?(!srs_is_system_control_error(ret))?{??
  • ????????if?(ret?!=?ERROR_SOCKET_TIMEOUT?&&?!srs_is_client_gracefully_close(ret))?{??
  • ????????????srs_error("stream?service?cycle?failed.?ret=%d",?ret);??
  • ????????}??
  • ????????return?ret;??
  • ????}??
  • ??????
  • ????//?for?republish,?continue?service??
  • ????if?(ret?==?ERROR_CONTROL_REPUBLISH)?{??
  • ????????//?set?timeout?to?a?larger?value,?wait?for?encoder?to?republish.??
  • ????????rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);??
  • ????????rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);??
  • ??????????
  • ????????srs_trace("control?message(unpublish)?accept,?retry?stream?service.");??
  • ????????continue;??
  • ????}??
  • ??????
  • ????//?for?"some"?system?control?error,???
  • ????//?logical?accept?and?retry?stream?service.??
  • ????if?(ret?==?ERROR_CONTROL_RTMP_CLOSE)?{??
  • ????????//?TODO:?FIXME:?use?ping?message?to?anti-death?of?socket.??
  • ????????//?@see:?https://github.com/ossrs/srs/issues/39??
  • ????????//?set?timeout?to?a?larger?value,?for?user?paused.??
  • ????????rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);??
  • ????????rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);??
  • ??????????
  • ????????srs_trace("control?message(close)?accept,?retry?stream?service.");??
  • ????????continue;??
  • ????}??
  • ??????
  • ????//?for?other?system?control?message,?fatal?error.??
  • ????srs_error("control?message(%d)?reject?as?error.?ret=%d",?ret,?ret);??
  • ????return?ret;??
  • }??

  • stream_service_cycle()函數閃亮登場

    首先進行一些安全驗證

    [cpp]?view plaincopy
  • f?((ret?=?rtmp->identify_client(res->stream_id,?type,?req->stream,?req->duration))?!=?ERROR_SUCCESS)?{??
  • ????????if?(!srs_is_client_gracefully_close(ret))?{??
  • ????????????srs_error("identify?client?failed.?ret=%d",?ret);??
  • ????????}??
  • ????????return?ret;??
  • ????}??
  • ????req->strip();??
  • ????srs_trace("client?identified,?type=%s,?stream_name=%s,?duration=%.2f",???
  • ????????srs_client_type_string(type).c_str(),?req->stream.c_str(),?req->duration);??
  • ??????
  • ????//?security?check??
  • ????if?((ret?=?security->check(type,?ip,?req))?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("security?check?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??
  • ????srs_info("security?check?ok");??

  • 然后進入比較有意思的環節

    [cpp]?view plaincopy
  • SrsSource*?source?=?SrsSource::fetch(req);??
  • ???if?(!source)?{??
  • ???????if?((ret?=?SrsSource::create(req,?server,?server,?&source))?!=?ERROR_SUCCESS)?{??
  • ???????????return?ret;??
  • ???????}??
  • ???}??
  • ???srs_assert(source?!=?NULL);??

  • 根據req,尋找是否有這個源,如果沒有,那么久創建一個。主要creat()是個靜態函數。實現代碼為

    [cpp]?view plaincopy
  • int?SrsSource::create(SrsRequest*?r,?ISrsSourceHandler*?h,?ISrsHlsHandler*?hh,?SrsSource**?pps)??
  • {??
  • ????int?ret?=?ERROR_SUCCESS;??
  • ??????
  • ????string?stream_url?=?r->get_stream_url();??
  • ????string?vhost?=?r->vhost;??
  • ??????
  • ????//?should?always?not?exists?for?create?a?source.??
  • ????srs_assert?(pool.find(stream_url)?==?pool.end());??
  • ??
  • ????SrsSource*?source?=?new?SrsSource();??
  • ????if?((ret?=?source->initialize(r,?h,?hh))?!=?ERROR_SUCCESS)?{??
  • ????????srs_freep(source);??
  • ????????return?ret;??
  • ????}??
  • ??????????
  • ????pool[stream_url]?=?source;??
  • ????srs_info("create?new?source?for?url=%s,?vhost=%s",?stream_url.c_str(),?vhost.c_str());??
  • ??????
  • ????*pps?=?source;??
  • ??????
  • ????return?ret;??
  • }??

  • 創建一個新的source,并且放到poo中。pool是什么

    [cpp]?view plaincopy
  • static?std::map<std::string,?SrsSource*>?pool;??

  • 也是一個全局的靜態變量,用了存儲所欲的源。到此。謎底進一步解開了。

    同意fetch()函數也是靜態的。

    [cpp]?view plaincopy
  • static?SrsSource*?fetch(SrsRequest*?r);??
  • static?SrsSource*?fetch(std::string?vhost,?std::string?app,?std::string?stream);??
  • ok!在繞道循環函數看看接下來該怎么辦

    [cpp]?view plaincopy
  • SrsStatistic*?stat?=?SrsStatistic::instance();??
  • ????if?((ret?=?stat->on_client(_srs_context->get_id(),?req,?this,?type))?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("stat?client?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??

  • 這個是做統計用的,沒啥。

    [cpp]?view plaincopy
  • bool?vhost_is_edge?=?_srs_config->get_vhost_is_edge(req->vhost);??
  • ????bool?enabled_cache?=?_srs_config->get_gop_cache(req->vhost);??
  • ????srs_trace("source?url=%s,?ip=%s,?cache=%d,?is_edge=%d,?source_id=%d[%d]",??
  • ????????req->get_stream_url().c_str(),?ip.c_str(),?enabled_cache,?vhost_is_edge,???
  • ????????source->source_id(),?source->source_id());??
  • ????source->set_cache(enabled_cache);??
  • 判斷是否是邊緣節點,是否需要gop緩沖。無他

    [cpp]?view plaincopy
  • switch?(type)?{??
  • ???????case?SrsRtmpConnPlay:?{??
  • ???????????srs_verbose("start?to?play?stream?%s.",?req->stream.c_str());??
  • ?????????????
  • ???????????//?response?connection?start?play??
  • ???????????if?((ret?=?rtmp->start_play(res->stream_id))?!=?ERROR_SUCCESS)?{??
  • ???????????????srs_error("start?to?play?stream?failed.?ret=%d",?ret);??
  • ???????????????return?ret;??
  • ???????????}??
  • ???????????if?((ret?=?http_hooks_on_play())?!=?ERROR_SUCCESS)?{??
  • ???????????????srs_error("http?hook?on_play?failed.?ret=%d",?ret);??
  • ???????????????return?ret;??
  • ???????????}??
  • ?????????????
  • ???????????srs_info("start?to?play?stream?%s?success",?req->stream.c_str());??
  • ???????????ret?=?playing(source);??
  • ???????????http_hooks_on_stop();??
  • ?????????????
  • ???????????return?ret;??
  • ???????}??
  • ???????case?SrsRtmpConnFMLEPublish:?{??
  • ???????????srs_verbose("FMLE?start?to?publish?stream?%s.",?req->stream.c_str());??
  • ?????????????
  • ???????????if?((ret?=?rtmp->start_fmle_publish(res->stream_id))?!=?ERROR_SUCCESS)?{??
  • ???????????????srs_error("start?to?publish?stream?failed.?ret=%d",?ret);??
  • ???????????????return?ret;??
  • ???????????}??
  • ?????????????
  • ???????????return?publishing(source);??
  • ???????}??
  • ???????case?SrsRtmpConnFlashPublish:?{??
  • ???????????srs_verbose("flash?start?to?publish?stream?%s.",?req->stream.c_str());??
  • ?????????????
  • ???????????if?((ret?=?rtmp->start_flash_publish(res->stream_id))?!=?ERROR_SUCCESS)?{??
  • ???????????????srs_error("flash?start?to?publish?stream?failed.?ret=%d",?ret);??
  • ???????????????return?ret;??
  • ???????????}??
  • ?????????????
  • ???????????return?publishing(source);??
  • ???????}??
  • ???????default:?{??
  • ???????????ret?=?ERROR_SYSTEM_CLIENT_INVALID;??
  • ???????????srs_info("invalid?client?type=%d.?ret=%d",?type,?ret);??
  • ???????????return?ret;??
  • ???????}??
  • ???}??

  • 大流程看。好像是根據不同走到了發布或者播放流程里。但首先。這個type是從哪里來的。怎么沒有發現呢?

    [cpp]?view plaincopy
  • int?SrsRtmpServer::identify_client(int?stream_id,?SrsRtmpConnType&?type,?string&?stream_name,?double&?duration)??
  • 在這個函數里做確認類型的。rmptserver類不在我們這次分析。


    我們分析下play的流程,函數名稱為

    [cpp]?view plaincopy
  • int?SrsRtmpConn::playing(SrsSource*?source)??
  • 關鍵代碼

    [cpp]?view plaincopy
  • SrsConsumer*?consumer?=?NULL;??
  • ???if?((ret?=?source->create_consumer(this,?consumer))?!=?ERROR_SUCCESS)?{??
  • ???????srs_error("create?consumer?failed.?ret=%d",?ret);??
  • ???????return?ret;??
  • ???}??
  • ???SrsAutoFree(SrsConsumer,?consumer);??
  • ???srs_verbose("consumer?created?success.");??

  • ?利用source創建一個consumer.創建代碼為

    [cpp]?view plaincopy
  • int?SrsSource::create_consumer(SrsConnection*?conn,?SrsConsumer*&?consumer,?bool?ds,?bool?dm,?bool?dg)??
  • {??
  • ????int?ret?=?ERROR_SUCCESS;??
  • ??????
  • ????consumer?=?new?SrsConsumer(this,?conn);??
  • ????consumers.push_back(consumer);??
  • ??????
  • ????double?queue_size?=?_srs_config->get_queue_length(_req->vhost);??
  • ????consumer->set_queue_size(queue_size);??
  • ??????
  • ????//?if?atc,?update?the?sequence?header?to?gop?cache?time.??
  • ????if?(atc?&&?!gop_cache->empty())?{??
  • ????????if?(cache_metadata)?{??
  • ????????????cache_metadata->timestamp?=?gop_cache->start_time();??
  • ????????}??
  • ????????if?(cache_sh_video)?{??
  • ????????????cache_sh_video->timestamp?=?gop_cache->start_time();??
  • ????????}??
  • ????????if?(cache_sh_audio)?{??
  • ????????????cache_sh_audio->timestamp?=?gop_cache->start_time();??
  • ????????}??
  • ????}??
  • ??????
  • ????//?copy?metadata.??
  • ????if?(dm?&&?cache_metadata?&&?(ret?=?consumer->enqueue(cache_metadata,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("dispatch?metadata?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??
  • ????srs_info("dispatch?metadata?success");??
  • ??????
  • ????//?copy?sequence?header??
  • ????//?copy?audio?sequence?first,?for?hls?to?fast?parse?the?"right"?audio?codec.??
  • ????//?@see?https://github.com/ossrs/srs/issues/301??
  • ????if?(ds?&&?cache_sh_audio?&&?(ret?=?consumer->enqueue(cache_sh_audio,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("dispatch?audio?sequence?header?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??
  • ????srs_info("dispatch?audio?sequence?header?success");??
  • ??
  • ????if?(ds?&&?cache_sh_video?&&?(ret?=?consumer->enqueue(cache_sh_video,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("dispatch?video?sequence?header?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??
  • ????srs_info("dispatch?video?sequence?header?success");??
  • ??????
  • ????//?copy?gop?cache?to?client.??
  • ????if?(dg?&&?(ret?=?gop_cache->dump(consumer,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{??
  • ????????return?ret;??
  • ????}??
  • ??????
  • ????//?print?status.??
  • ????if?(dg)?{??
  • ????????srs_trace("create?consumer,?queue_size=%.2f,?jitter=%d",?queue_size,?jitter_algorithm);??
  • ????}?else?{??
  • ????????srs_trace("create?consumer,?ignore?gop?cache,?jitter=%d",?jitter_algorithm);??
  • ????}??
  • ??
  • ????//?for?edge,?when?play?edge?stream,?check?the?state??
  • ????if?(_srs_config->get_vhost_is_edge(_req->vhost))?{??
  • ????????//?notice?edge?to?start?for?the?first?client.??
  • ????????if?((ret?=?play_edge->on_client_play())?!=?ERROR_SUCCESS)?{??
  • ????????????srs_error("notice?edge?start?play?stream?failed.?ret=%d",?ret);??
  • ????????????return?ret;??
  • ????????}??
  • ????}??
  • ??????
  • ????return?ret;??
  • }??

  • 代碼好長。主要是創建 放進數據結構中,并拷貝一些metadata進去,對于edge的處理,還沒有看明白。

    之后的動作

    [cpp]?view plaincopy
  • SrsQueueRecvThread?trd(consumer,?rtmp,?SRS_PERF_MW_SLEEP);??
  • ??????
  • ????//?start?isolate?recv?thread.??
  • ????if?((ret?=?trd.start())?!=?ERROR_SUCCESS)?{??
  • ????????srs_error("start?isolate?recv?thread?failed.?ret=%d",?ret);??
  • ????????return?ret;??
  • ????}??

  • 什么?單獨創建了一個接受線程,實現了recv never send,send never recv,據說這樣效率提高了33%

    在繞回去

    [cpp]?view plaincopy
  • //?delivery?messages?for?clients?playing?stream.??
  • wakable?=?consumer;??
  • ret?=?do_playing(source,?consumer,?&trd);??
  • wakable?=?NULL;??
  • 進入下一個循環體do_playing()

    在分析下一個函數之前。讓我總結下縮做的工作

    1)創建或者獲取了一個source

    2)創建一個consumer

    3) 創建一個接受線程


    下面開始看函數的關鍵代碼

    [cpp]?view plaincopy
  • //?setup?the?realtime.??
  • ????realtime?=?_srs_config->get_realtime_enabled(req->vhost);??
  • ????//?setup?the?mw?config.??
  • ????//?when?mw_sleep?changed,?resize?the?socket?send?buffer.??
  • ????mw_enabled?=?true;??
  • ????change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));??
  • ????//?initialize?the?send_min_interval??
  • ????send_min_interval?=?_srs_config->get_send_min_interval(req->vhost);??

  • 做實時性 merge write 的設置

    [cpp]?view plaincopy
  • while?(!trd->empty())?{??
  • ??????????SrsCommonMessage*?msg?=?trd->pump();??
  • ??????????srs_verbose("pump?client?message?to?process.");??
  • ????????????
  • ??????????if?((ret?=?process_play_control_msg(consumer,?msg))?!=?ERROR_SUCCESS)?{??
  • ??????????????if?(!srs_is_system_control_error(ret)?&&?!srs_is_client_gracefully_close(ret))?{??
  • ??????????????????srs_error("process?play?control?message?failed.?ret=%d",?ret);??
  • ??????????????}??
  • ??????????????return?ret;??
  • ??????????}??
  • ??????}??
  • 首先處理接受消息。主要是暫停的消息。

    下面進入核心代碼

    [cpp]?view plaincopy
  • <strong><span?style="color:#ff6666;">?int?count?=?(send_min_interval?>?0)??1?:?0;??
  • ????????if?((ret?=?consumer->dump_packets(&msgs,?count))?!=?ERROR_SUCCESS)?{??
  • ????????????srs_error("get?messages?from?consumer?failed.?ret=%d",?ret);??
  • ????????????return?ret;??
  • ????????}</span></strong>??
  • 這段代碼的作用就是,把消息,從consumer里,拷貝到本地的msg隊列里。當然。這個拷貝是淺拷貝,只是指針過來了。

    首先看msgs的定義

    [cpp]?view plaincopy
  • SrsMessageArray?msgs(SRS_PERF_MW_MSGS)??

  • 這個類里有個核心變量
    [cpp]?view plaincopy
  • SrsSharedPtrMessage**?msgs;??
  • 可以看到它保存的是一個指向指針的指針。

    那么dump_packets是怎么實現的呢?

    [cpp]?view plaincopy
  • int?SrsConsumer::dump_packets(SrsMessageArray*?msgs,?int&?count)??
  • {??
  • ????int?ret?=ERROR_SUCCESS;??
  • ??????
  • ????srs_assert(count?>=?0);??
  • ????srs_assert(msgs->max?>?0);??
  • ??????
  • ????//?the?count?used?as?input?to?reset?the?max?if?positive.??
  • ????int?max?=?count??srs_min(count,?msgs->max)?:?msgs->max;??
  • ??????
  • ????//?the?count?specifies?the?max?acceptable?count,??
  • ????//?here?maybe?1+,?and?we?must?set?to?0?when?got?nothing.??
  • ????count?=?0;??
  • ??????
  • ????if?(should_update_source_id)?{??
  • ????????srs_trace("update?source_id=%d[%d]",?source->source_id(),?source->source_id());??
  • ????????should_update_source_id?=?false;??
  • ????}??
  • ??????
  • ????//?paused,?return?nothing.??
  • ????if?(paused)?{??
  • ????????return?ret;??
  • ????}??
  • ??
  • ????//?pump?msgs?from?queue.??
  • ????if?((ret?=?queue->dump_packets(max,?msgs->msgs,?count))?!=?ERROR_SUCCESS)?{??
  • ????????return?ret;??
  • ????}??
  • ??????
  • ????return?ret;??
  • }??

  • [cpp]?view plaincopy
  • int?SrsMessageQueue::dump_packets(int?max_count,?SrsSharedPtrMessage**?pmsgs,?int&?count)??
  • {??
  • ????int?ret?=?ERROR_SUCCESS;??
  • ??????
  • ????int?nb_msgs?=?(int)msgs.size();??
  • ????if?(nb_msgs?<=?0)?{??
  • ????????return?ret;??
  • ????}??
  • ??????
  • ????srs_assert(max_count?>?0);??
  • ????count?=?srs_min(max_count,?nb_msgs);??
  • ??
  • ????SrsSharedPtrMessage**?omsgs?=?msgs.data();??
  • ????for?(int?i?=?0;?i?<?count;?i++)?{??
  • ????????pmsgs[i]?=?omsgs[i];??
  • ????}??
  • ??????
  • ????SrsSharedPtrMessage*?last?=?omsgs[count?-?1];??
  • ????av_start_time?=?last->timestamp;??
  • ??????
  • ????if?(count?>=?nb_msgs)?{??
  • ????????//?the?pmsgs?is?big?enough?and?clear?msgs?at?most?time.??
  • ????????msgs.clear();??
  • ????}?else?{??
  • ????????//?erase?some?vector?elements?may?cause?memory?copy,??
  • ????????//?maybe?can?use?more?efficient?vector.swap?to?avoid?copy.??
  • ????????//?@remark?for?the?pmsgs?is?big?enough,?for?instance,?SRS_PERF_MW_MSGS?128,??
  • ????????//??????the?rtmp?play?client?will?get?128msgs?once,?so?this?branch?rarely?execute.??
  • ????????msgs.erase(msgs.begin(),?msgs.begin()?+?count);??
  • ????}??
  • ??????
  • ????return?ret;??
  • }??

  • ok代碼我就不想分析了。只是個指針拷貝。

    下一個問題。consumer的數據是怎么來的呢?

    看source的代碼,比如音頻數據

    [cpp]?view plaincopy
  • int?SrsSource::on_audio_imp(SrsSharedPtrMessage*?msg)??

  • 代碼里有這么一段

    [cpp]?view plaincopy
  • //?copy?to?all?consumer??
  • ???if?(!drop_for_reduce)?{??
  • ???????for?(int?i?=?0;?i?<?(int)consumers.size();?i++)?{??
  • ???????????SrsConsumer*?consumer?=?consumers.at(i);??
  • ???????????if?((ret?=?consumer->enqueue(msg,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{??
  • ???????????????srs_error("dispatch?the?audio?failed.?ret=%d",?ret);??
  • ???????????????return?ret;??
  • ???????????}??
  • ???????}??
  • ???????srs_info("dispatch?audio?success.");??
  • ???}??

  • 到此,一個循環就結束了。

    總結

    以上是生活随笔為你收集整理的srs代码学习(4)-怎么转发流的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。