rtsp协议_Chromium(3/5):rtsp客户端
以上是之前用live555+webrtc實現的rtsp客戶端,考慮到一些原因要用chromium代替live555。
- chromium提供了非常好的連接溢出時間機制,能實現靈活的rtsp重連。
- app基于Rose,Rose已在廣泛使用chromium,live555又自帶一套消息循環和socket庫,這重復了。
說是用chromium代替live555,具體實現上是用chromium的消息循環和socket庫,至于協議處理邏輯還是用live555中代碼。
注:為讓chromium中的GURL支持解析rtsp url,需修改chromium源碼。具體是增加變量kRtspScheme,把它加到kStandardURLSchemes。
<chromium>/url/url_constants.cc const char kRtspScheme[] = "rtsp";<chromium>/url/url_util.cc const SchemeWithType kStandardURLSchemes[] = {......{kRtspScheme, SCHEME_WITH_PORT}, // Rtsp. };注:只要把live555.cpp中的use_chromium值改為false,就會使用live555自帶的消息循環和socket庫。
一、線程模型
圖1 rtsp的線程模型系統中至少存在1+1+N個線程,第一個1是main線程,第二個1是socket線程,N表示要同時接收N個rtsp設備,每個設備需要一個DecodingThread。
- main線程。main函數所在的線程,系統創建。用于和各設備建立rtsp連接。
- socket線程。使用Chromium中的base::Thread創建。使用TCP傳rtsp時,建立rtsp連接(DESCRIBE、SETUP、PLAY)和后面接收媒體流數據是用同一個socket,加上Chromium硬性規定,“同一個socket上的操作必須放在同一個線程,包括創建、連接、讀、寫、關閉”,于是新開一個線程,專門處理和socket相關任務。當要接收N個rtsp設備時,系統也只有一個socket線程。
- DecodingThread。webrtc創建。通過socket線程收到一幀后,解碼,解碼出的幀通過rtc::VideoSinkInterface::OnFrame傳給app。
二、建立階段
圖1中“live555::start”執行建立rtsp連接,過程中要發送DESCRIBE、SETUP、PLAY。以下是流程。
步驟1的Start_chromium運行在socket線程,設置next_state_是解析域名狀態,隨即調用DoLoop。
int RTSPClient::DoLoop(int result) {DCHECK_NE(next_state_, STATE_NONE);int rv = result;do {State state = next_state_;next_state_ = STATE_NONE;switch (state) {case STATE_RESOLVE_HOST:DCHECK_EQ(net::OK, rv);rv = DoResolveHost();break;case STATE_RESOLVE_HOST_COMPLETE:rv = DoResolveHostComplete(rv);break;case STATE_TRANSPORT_CONNECT:DCHECK_EQ(net::OK, rv);rv = DoTransportConnect();break;case STATE_TRANSPORT_CONNECT_COMPLETE:rv = DoTransportConnectComplete(rv);break;case STATE_TRANSPORT_WRITE_COMPLETE:rv = DoTransportWriteComplete(rv);break;case STATE_TRANSPORT_READ_COMPLETE:rv = DoTransportReadComplete(rv);break;default:NOTREACHED();rv = net::ERR_FAILED;break;}} while (rv != net::ERR_IO_PENDING && next_state_ != STATE_NONE);return rv; }void RTSPClient::OnIOComplete(int result) {DoLoop(result); }以上是基于Chromium的socket庫寫的標準DoLoop、OnIOComplete(關于這兩函數細節參考“Chromium(2/4):消息循環和socket庫”中“二、Socket庫”)。DoLoop把建立rtsp連接分為三個步驟:解析域名、連接和發請求/收應答。發請求/收應答的個數是1+N+1。N是sdp中的媒體流數目,有多少個媒體流就須要處理多少個SETUP。以下摘取當中DoTransportWriteComplete進行分析。
int RTSPClient::DoTransportWriteComplete(int result) {if (result > 0) {next_state_ = STATE_TRANSPORT_READ_COMPLETE;VALIDATE(fResponseBufferBytesLeft > 1, null_str);// why fResponseBufferBytesLeft - 1, references to begin of handleResponseBytes1:if (!read_by_rtpinterface_) {const int max_bytes = SDL_min(envir().read_buf->size(), (int)fResponseBufferBytesLeft - 1);result = ctrl_socket_->Read(envir().read_buf.get(), max_bytes, envir().iocomplete);} else {// let RTPInterface read this socket. net::ERR_IO_PENDING make DoLoop exit.if (!in_rtpinterface_iocomplete_) {SDL_Log("RTSPClient::DoTransportWriteComplete(result: %i)", result);envir().iocomplete.Run(SPECIAL_CALL_MAGIC);}result = net::ERR_IO_PENDING;}if (result > 0) {result = DoTransportReadComplete(result);}}if (result < 0 && result != net::ERR_IO_PENDING) {next_state_ = STATE_NONE;socket_io_fail();}return result; }參數result>0時,表示之前Write成功,于是發起Read。當Read返回值>0,表示此次Read是同步讀,立即調用DoTransportReadComplete,讓處理讀到的應答。否則或是異步(ERR_IO_PENDING),或錯誤,是錯誤時調用soket_io_fail。是異步時返回ERR_IO_PENDING,回到上層DoLoop,不再滿足while繼續循環條件,DoLoop以ERR_IO_PENDING退出。后續任務得靠系統觸發OnIOComplete才能處理。
Read時,為什么要使用read_by_rtpinterface_?——使用tcp時,server發出SETUP應答后,就有可能向外發rtp包。此時在554上會出現rtsp消息和rtp包混雜情況,read_by_rtpinterface_=true表示client收到過一個SETUP應答了,為區分是SETUP/PLAY應答還是rtp數據,后面一個字節一個字節接收(rtp還是按負載長度收),直到收完PALY應答,它確保了不會收走一個本屬于rtp的字節。
三、接收媒體數據階段
首先說下TCP傳輸時流媒體數據的頂層格式。
- 整個流被拆分成好多個MTU,每個MTU由兩部分組成,4字節前綴和payload。4字節前綴中第一個字節是“$”,第二個字節channel號,第三個、第四個字節是后面payload字節數。RTP包位在payload中。
- MTU長度是服務器自個設的一個值,像4+1408。而一視頻幀不可能才1000多字節,于是一幀會被拆成多個MTU。
- 會不會發生一個MTU包含多幀數據?舉個例子,#8幀只有最后400個字節了,于是放在了接下MTU a中,此時MTU a還有數百字節空著,會不會放#9幀的前面數百字節。——個人認為不會出現這種情況。
要了解live555如何接收rtsp數據可參考“live555從RTSP服務器讀取數據到使用接收到的數據流程分析”。核心函數是MultiFramedRTPSource::networkReadHandler1,它既負責從socket接收媒體流,又負責處理收到的數據。處理過程包括,1)數據存到BufferedPacket鏈表,2)收到完整的一幀后,存到app要求的fTo,并調用app設置的、收到一幀后的回調函數afterGettingFrame。
3.1 live555::frame_slice接收一幀
讓回看圖1中“live555::frame_slice”,它向socket線程接收一幀數據。以下是流程。
步驟1的continuePlaying_chromium運行在socket線程,執行兩個操作。1)調用continuePlaying,它設置相關變量,讓live555進入NeedDelivery狀態。當中有個參數叫fIsCurrentlyAwaitingData,執行前必須false,執行后設為true。2)調用RTPInterface::chromium_slice,它會循環調用SocketDescriptor::tcpReadhandle1_chromium,直到后者返回false。
tcpReadhandle1_chromium來自live555提供的tcpReadhandle1,只是把讀socket部分改為用chromium的socket庫。從socket收到數據后,原有的live555處理邏輯都不用變。
3.2 fIsCurrentlyAwaitingData變量
為什么要額外說這個變量,讓看tcpReadHandler1_chromium中代碼。
Boolean SocketDescriptor::tcpReadHandler1_chromium(int rv, ...) {... Boolean callAgain = True;switch (fTCPReadingState) {...case AWAITING_PACKET_DATA: {callAgain = False;fTCPReadingState = AWAITING_DOLLAR;// fStreamChannelId存儲著此個MTU的channel號,由channel號找到能處理它的RTPInterface。RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);if (rtpInterface->fNextTCPReadSize == 0) {// 已讀出該MTU的所有payload,告知caller再次以rv=0調用tcpReadHandler1_chromiumcallAgain = true;break;}if (rtpInterface->fReadHandlerProc != NULL) {fTCPReadingState = AWAITING_PACKET_DATA;rtpInterface->fLastTCPReadResult_ = rv;rtpInterface->fReadHandlerProc(rtpInterface->fOwner, 0);if (rtpInterface->fNextTCPReadSize == 0) {if (fEnv.setup_finished) {RTPSource* source = nullptr;if (rtpInterface->fOwner->isSource()) {// 此個RTPInterface用于RTPSourcesource = static_cast<RTPSource*>(rtpInterface->fOwner);} else {// 此個RTPInterface既然不用于RTPSource,那一定用于RTCPInstanceVALIDATE(rtpInterface->fOwner->isRTCPInstance(), null_str);}if (source == nullptr || source->isCurrentlyAwaitingData()) {// 如果此個RTPInterface用于RTPSource,還須滿足isCurrentlyAwaitingData=true,caller才主動發read。isCurrentlyAwaitingData=false意味著是由DummySink::continuePlaying_chromiumy主動發read。callAgain = true;break;}} else {// during setup, continue finish it.callAgain = true;break;}}}break;} // case AWAITING_PACKET_DATA} // switch (fTCPReadingState)return callAgain; }處理了MTU前綴4字節后,SocketDescriptor進入AWAITING_PACKET_DATA狀態,rtpInterface->fNextTCPReadSize存儲著4字節中后兩字節的值,即payload長度。rtpInterface->fReadHandlerProc會很快調用核心函數MultiFramedRTPSource::networkReadHandler1,后者去讀socket時,每次最多讀fNextTCPReadSize字節。隨著networkReadHandler1不斷被執行,socket讀出數據越多,fNextTCPReadSize會不斷減少,減少到0時表示已讀完這個MTU。一旦讀完MTU,后續不會再有異步觸發出OnIOComplete,此刻需要主動把callAgain置為true,通知上層的chromium_slice或OnIOComplete再次調用tcpReadHandler1_chromium,去讀下一個MTU。當isCurrentlyAwaitingData=false,意味著是由DummySink::continuePlaying_chromiumy主動發read。
fIsCurrentlyAwaitingData有什么用?——多個MTU組成一幀,總會遇到讀完一個MTU時,該幀恰好讀完,這變量讓知道什么時候已讀完一幀。它在continuePlaying時被置為true。networkReadHandler1判斷出收完一幀后,在調用app的afterGettingFrame前會把它置為false。
為避免發生線程爭搶寫fIsCurrentlyAwaitingData,要讓在socket線程執行continuePlaying。要是換放在DecodingThread執行,會產成BUG,讓看以下序列。
四、live555注釋
4.1 _Tables
class _Tables {MediaLookupTable* mediaTable;void* socketTable; };- mediaTable存儲著那些從Medium派生的對象。靜態函數MediaLookupTable::ourMedia(UsageEnvironment& env)得到這個指針。
- socketTable真正類型是HashTable*,存儲著SocketDescriptor。表中key或是sockNum(使用live555自帶socket庫),或是netSock(使用chromium的socket庫)。全局函數socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent)得到這個指針。
每個RTSPClient實例有且只有一個UsageEnvironment,_Tables存放在UsageEnvironment中的liveMediaPriv成員,該成員定義的類型是void*,真正類型是_Tables*。靜態函數_Tables::getOurTables(env)得到這個指針。
綜上所述,每個RTSPClient實例有且只有一個mediaTable、一個socketTable。
4.2 MediaSession、MediaSubsession、MediaSubsessionIterator
收到sdp后,就要由它調用靜態函數MediaSession::createNew。
MediaSession* MediaSession::createNew(UsageEnvironment& env, char const* sdpDescription) {MediaSession* newSession = new MediaSession(env);if (newSession != NULL) {if (!newSession->initializeWithSDP(sdpDescription)) {delete newSession;return NULL;}}return newSession; }createNew執行兩個操作,一是創建MediaSession,二是調用新建對象的initializeWithSDP方法。于是經過createNew后,此個rtsp會話需要的MediaSession、MediaSubsession就都已創建了,并且MediaSession中的fSubsessionsHead、fSubsessionsTail都已指向了正確位置。
MediaSubsessionIterator用于枚舉MediaSession中的所有流。新建MediaSubsessionIterator對象后調用next,或reset后調用next,此個next得到的是第一條流。以下是個枚舉范例。
MediaSubsessionIterator iter(mediaSession); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) {subsession指向一條有效MediaSubsession。 }4.3 SocketDescriptor
SocketDescriptor用于封裝socket,一個SocketDescriptor對應一個socket。一次會話時創建的socket集中存放在每個會話只一個的socketTable(參考“4.1 _Tables”)。
SocketDescriptor內有個叫fSubChannelHashTable的HashTable,這表的key是streamChannelId,value是RTPInterface*。streamChannelId是怎么來的?它包含在服務器對SETUP的應答中,一般從0開始。假設有兩條流,每條流都有RTP、RTCP,通常來說,第一條流RTP的streamChannelId是0,RTCP是1;第二條流RTP的streamChannelId是2,RTCP是3。
fSubChannelHashTable有什么用?——當streamUsingTCP是true,在接收媒體數據階段,554上的流被拆分成好多個MTU,每個MTU由兩部分組成,4字節前綴和payload。4字節前綴中第一個字節是“$”,第二個字節channel號,第三個、第四個字節是后面payload字節數。channel號的值就是streamChannelId。于是SocketDescriptor根據channel號在fSubChannelHashTable找出能處理它的RTPInterface。這部分邏輯參考“3.2 fIsCurrentlyAwaitingData變量“。補說下,RTCPInstance也是通過RTPInteface來收發網絡數據。
4.4 TaskScheduler(任務調度模型)
在調用上,上層使用TaskScheduler的方法就是調用TaskScheduler的doEventLoop。
void BasicTaskScheduler0::doEventLoop(char volatile* watchVariable) {// Repeatedly loop, handling readble sockets and timed events:while (1) {if (watchVariable != NULL && *watchVariable != 0) break;SingleStep();} }三種任務用到的兩種操作函數 typedef void TaskFunc(void* clientData); typedef void BackgroundHandlerProc(void* clientData, int mask);doEventLoop是個阻塞式函數,退出條件是SignleStep在執行過程中把watchVariable置為非0。參數watchVariable是上層自個管理的變量,SingleStep在執行過程中會調用上層提供的操作函數,上層什么時候想退出循環了,就把watchVariable置為非0。doEventLoop好處是可以讓多種操作序列化到一個線程執行,這很好解決了多線程同步問題。
SingleStep是TaskScheduler的時間片函數,它其實有一個參數maxDelayTime,作用是做為select時溢出等待時間。缺省時值填0,意味著select只作即時檢查,不等待。它會依次去執行三種任務,這三種任務被放在三個獨立的集合,分別是HandlerDescriptor、TriggeredEventHandler和DelayQueue。
- HandlerDescriptor。1)用途。用在后臺讀,它和socket機制有關,一個HandlerDescriptor對應一個socket。當select機制查詢到有socket發生事件時,就在此鏈表查找處理者,然且調用相應的處理者操作。2)集合結構:鏈表。節點類型是HandlerDescriptor,操作函數原型。BackgroundHandlerProc。3)如何添加。BasicTaskScheduler::setBackgroundHandling。
為方便遍歷集合中節點,提供輔助類HandlerIterator,該類主要操作是next,HandlerIterator::next有4個特點。1)返回值是“上一次”的HandlerDescriptor。2)執行完后內部指向下一個HandlerDescriptor。3)HandlerIterator構造函數時內部指向第一個HandlerDescriptor,所以第一次next返回的是第一個HandlerDescriptor。4)當此次返回的已是最后一個時,內部指向nullptr,因而下一次next將返回nullptr,caller可根據next的返回值是否nullptr來判斷是否枚舉完了。 - TriggeredEventHandler。1)用途。有事件發生了,但不在SingleStep線程,為簡化多線程同步,就要把這些事件要執行的處理序列化到SingleStep線程。2)集合結構:數組。單元類型是TaskFunc,同時就是操作函數類型。3)如何添加。BasicTaskScheduler0::createEventTrigger。
- DelayQueue。1)用途。它有點像TriggeredEventHandler,但TriggeredEventHandler是即時執行,它則可以設置一個延時時間,一旦該節點給出時刻到了,SingleStep就會調用這個DelayQueue中的操作。2)集合結構:鏈表。節點類型是DelayQueue,操作函數類型TaskFunc。3)如何添加。BasicTaskScheduler0::scheduleDelayedTask。
總結
以上是生活随笔為你收集整理的rtsp协议_Chromium(3/5):rtsp客户端的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 谷歌改善安卓应用内浏览体验:拆分视图更好
- 下一篇: stm32怎么调用for循环内部的变量_