日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

red5源码分析---1

發(fā)布時(shí)間:2023/12/20 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 red5源码分析---1 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

red5源碼分析—客戶端連接

本博文開始分析red5服務(wù)器以及客戶端的源碼,選取的版本為最新的1.0.7。
red5發(fā)展到現(xiàn)在,可以兼容很多流媒體傳輸協(xié)議,例如rtmp、rtmpt等等。本博文只分析rtmp協(xié)議,其他協(xié)議看看以后有沒(méi)有時(shí)間研究吧。
red5的服務(wù)器啟動(dòng)有好幾種方式,standalone、tomcat、jetty等等。本博文只分析standalone的啟動(dòng)方式。本博文假設(shè)客戶端為red5 rtmpclient。
red5 client和server的下載地址如下
https://github.com/Red5
首先看一段網(wǎng)上很常見的red5的客戶端代碼,如下所示

public class RtmpClientTest extends RTMPClient implements INetStreamEventHandler, IPendingServiceCallback, IEventDispatcher { private ConcurrentLinkedQueue<IMessage> frameBuffer = new ConcurrentLinkedQueue<IMessage>();String host = "127.0.0.1";String app = "red5test"; int port = 1935; public RtmpClientTest() { super(); Map<String, Object> map = makeDefaultConnectionParams(host, 1935, "red5test"); connect(host, 1935, map, this); } @Override public void dispatchEvent(IEvent arg0) { } @Override public void resultReceived(IPendingServiceCall call) { Object result = call.getResult(); if (result instanceof ObjectMap) { if ("connect".equals(call.getServiceMethodName())) { createStream(this); } } else { if ("createStream".equals(call.getServiceMethodName())) { if (result instanceof Integer) { Integer streamIdInt = (Integer) result; // int streamId = streamIdInt.intValue(); // publish(streamId, "testgio2", "live", this); invoke("getRoomsInfo", this); } else { disconnect(); } } else if ("getRoomsInfo".equals(call.getServiceMethodName())) { ArrayList<String> list = (ArrayList<String>) result; for (int i = 0; i < list.size(); i++) { System.out.println(list.get(i)); } } } } @Override public void onStreamEvent(Notify arg0) { ObjectMap<?, ?> map = (ObjectMap<?, ?>) notify.getCall().getArguments()[0];String code = (String) map.get("code");if (StatusCodes.NS_PUBLISH_START.equals(code)) {IMessage message = null;while ((message = frameBuffer.poll()) != null) {this.publishStreamData(streamId, message);}} else if (StatusCodes.NS_UNPUBLISHED_SUCCESS.equals(code)) {}} @Override public void connectionOpened(RTMPConnection conn, RTMP state) { super.connectionOpened(conn, state); } public static void main(String[] args) { new RtmpClientTest(); } }

RtmpClientTest實(shí)現(xiàn)的三個(gè)接口INetStreamEventHandler、IPendingServiceCallback、IEventDispatcher和回調(diào)函數(shù)有關(guān),后面的章節(jié)會(huì)分析到。
首先來(lái)看RtmpClientTest的構(gòu)造函數(shù),其父類RTMPClient的構(gòu)造函數(shù)如下

public RTMPClient() {ioHandler = new RTMPMinaIoHandler();ioHandler.setHandler(this);}

red5客戶端使用mina框架來(lái)封裝Java Nio,關(guān)于mina框架的源碼分析請(qǐng)查看博主的mina源碼分析系列文章。這里有兩個(gè)handler,RTMPMinaIoHandler和mina框架有關(guān),另一個(gè)handler就是RTMPClient自身,因?yàn)槠淅^承自BaseRTMPClientHandler,BaseRTMPClientHandler和業(yè)務(wù)有關(guān)。
回到RtmpClientTest構(gòu)造函數(shù),接下來(lái)調(diào)用connect進(jìn)行連接,connect函數(shù)實(shí)現(xiàn)在RTMPClient的父類BaseRTMPClientHandler中,

public void connect(String server, int port, Map<String, Object> connectionParams, IPendingServiceCallback connectCallback) {connect(server, port, connectionParams, connectCallback, null);}public void connect(String server, int port, Map<String, Object> connectionParams, IPendingServiceCallback connectCallback, Object[] connectCallArguments) {this.connectionParams = connectionParams;this.connectArguments = connectCallArguments;if (!connectionParams.containsKey("objectEncoding")) {connectionParams.put("objectEncoding", 0);}this.connectCallback = connectCallback;startConnector(server, port);}

connect函數(shù)一開始作了一些簡(jiǎn)單的設(shè)置,最后通過(guò)startConnector與服務(wù)器建立連接。startConnector定義在RTMPClient中,

protected void startConnector(String server, int port) {socketConnector = new NioSocketConnector();socketConnector.setHandler(ioHandler);future = socketConnector.connect(new InetSocketAddress(server, port));future.addListener(new IoFutureListener<ConnectFuture>() {public void operationComplete(ConnectFuture future) {try {session = future.getSession();} catch (Throwable e) {socketConnector.dispose(false);handleException(e);}}});future.awaitUninterruptibly(CONNECTOR_WORKER_TIMEOUT);}

這里主要構(gòu)造了一個(gè)NioSocketConnector,并調(diào)用其connect函數(shù)。connect函數(shù)會(huì)使用mina框架與服務(wù)器建立連接,下一章會(huì)分析服務(wù)器如何處理客戶端的連接請(qǐng)求。當(dāng)與服務(wù)器建立完連接(TCP連接)時(shí),根據(jù)mina框架的源碼,會(huì)回調(diào)mina框架中IoHandler的處理函數(shù),也即前面注冊(cè)的RTMPMinaIoHandler的sessionCreated和sessionOpened函數(shù),下面依次來(lái)看。

一. sessionCreated

sessionCreated的代碼如下,

public void sessionCreated(IoSession session) throws Exception {session.getFilterChain().addFirst("rtmpeFilter", new RTMPEIoFilter());RTMPMinaConnection conn = createRTMPMinaConnection();conn.setIoSession(session);session.setAttribute(RTMPConnection.RTMP_SESSION_ID, conn.getSessionId());OutboundHandshake outgoingHandshake = new OutboundHandshake();session.setAttribute(RTMPConnection.RTMP_HANDSHAKE, outgoingHandshake);if (enableSwfVerification) {String swfUrl = (String) handler.getConnectionParams().get("swfUrl");if (!StringUtils.isEmpty(swfUrl)) {outgoingHandshake.initSwfVerification(swfUrl);}}session.setAttribute(RTMPConnection.RTMP_HANDLER, handler);handler.setConnection((RTMPConnection) conn);}

sessionCreated函數(shù)首先向mina框架中添加一個(gè)過(guò)濾器RTMPEIoFilter,該過(guò)濾器用來(lái)處理RTMP協(xié)議的握手過(guò)程,具體的RTMP協(xié)議可以從網(wǎng)上下載。sessionCreated接著創(chuàng)建一個(gè)RTMPMinaConnection并進(jìn)行相應(yīng)的設(shè)置,

protected RTMPMinaConnection createRTMPMinaConnection() {return (RTMPMinaConnection) RTMPConnManager.getInstance().createConnection(RTMPMinaConnection.class);}

RTMPConnManager使用單例模式,其createConnection函數(shù)如下,

public RTMPConnection createConnection(Class<?> connCls) {RTMPConnection conn = null;if (RTMPConnection.class.isAssignableFrom(connCls)) {try {conn = createConnectionInstance(connCls);connMap.put(conn.getSessionId(), conn);} catch (Exception ex) {}}return conn;}public RTMPConnection createConnectionInstance(Class<?> cls) throws Exception {RTMPConnection conn = null;if (cls == RTMPMinaConnection.class) {conn = (RTMPMinaConnection) cls.newInstance();} else if (cls == RTMPTClientConnection.class) {conn = (RTMPTClientConnection) cls.newInstance();} else {conn = (RTMPConnection) cls.newInstance();}conn.setMaxHandshakeTimeout(maxHandshakeTimeout);conn.setMaxInactivity(maxInactivity);conn.setPingInterval(pingInterval);ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setDaemon(true);executor.setMaxPoolSize(1);executor.setQueueCapacity(executorQueueCapacity);executor.initialize();conn.setExecutor(executor);return conn;}

這里就是實(shí)例化一個(gè)RTMPMinaConnection,創(chuàng)建ThreadPoolTaskExecutor并進(jìn)行相應(yīng)的設(shè)置,最后添加進(jìn)connMap中。注意每個(gè)RTMPMinaConnection的SessionId是隨機(jī)生成的。
回到sessionCreated中,接下來(lái)設(shè)置剛剛構(gòu)造的RTMPMinaConnection,以及其SessionId,然后創(chuàng)建OutboundHandshake用于RTMP協(xié)議的握手,握手結(jié)束后該OutboundHandshake將會(huì)從session中移除,最后設(shè)置handler和RTMPMinaConnection。

二. sessionOpened

再來(lái)看RTMPMinaIoHandler的sessionOpened函數(shù),代碼如下

public void sessionOpened(IoSession session) throws Exception {super.sessionOpened(session);RTMPHandshake handshake = (RTMPHandshake) session.getAttribute(RTMPConnection.RTMP_HANDSHAKE);IoBuffer clientRequest1 = ((OutboundHandshake) handshake).generateClientRequest1();session.write(clientRequest1);}

這里根據(jù)從session中獲得剛剛在sessionCreated中創(chuàng)建的OutboundHandshake,調(diào)用其generateClientRequest1函數(shù)生成第一次握手請(qǐng)求的數(shù)據(jù),通過(guò)write函數(shù)發(fā)送給服務(wù)器。generateClientRequest1函數(shù)和具體的協(xié)議相關(guān),這里就不繼續(xù)往下看了。

總結(jié)一下,本章分析了如何創(chuàng)建一個(gè)RTMPClient并建立與服務(wù)器的TCP連接,然后發(fā)送第一次握手請(qǐng)求開始與服務(wù)器建立RTMP連接,下一章開始分析red5服務(wù)器端對(duì)應(yīng)的連接函數(shù)。

總結(jié)

以上是生活随笔為你收集整理的red5源码分析---1的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。