日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ZLMediaKit接收ffmpeg rtmp推流

發布時間:2023/12/20 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ZLMediaKit接收ffmpeg rtmp推流 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

一 關鍵類

二 推流緩沖


webrtc拉流篇,可參考

https://mp.csdn.net/mp_blog/creation/editor/122743325

RTMP采用的封裝格式是FLV。所以在指定輸出流媒體的時候須要指定其封裝格式為“flv”。同理,其余流媒體協議也須要指定其封裝格式。例如采用UDP推送流媒體的時候,能夠指定其封裝格式為“mpegts”。

一 關鍵類

環形緩沖,聚合了_RingStorage

template<typename T>

class RingBuffer : public enable_shared_from_this<RingBuffer<T> > {

public:

? ? typedef std::shared_ptr<RingBuffer> Ptr;

? ? typedef _RingReader<T> RingReader;

? ? typedef _RingStorage<T> RingStorage;

? ? typedef _RingReaderDispatcher<T> RingReaderDispatcher;

? ? typedef function<void(int size)> onReaderChanged;

? ? RingBuffer(int max_size = 1024, const onReaderChanged &cb = nullptr) {

? ? ? ? _on_reader_changed = cb;

? ? ? ? _storage = std::make_shared<RingStorage>(max_size);

? ? }

? ? ~RingBuffer() {}

? ? void write(T in, bool is_key = true) {

? ? ? ? if (_delegate) {

? ? ? ? ? ? _delegate->onWrite(std::move(in), is_key);

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? LOCK_GUARD(_mtx_map);

? ? ? ? for (auto &pr : _dispatcher_map) {

? ? ? ? ? ? auto &second = pr.second;

? ? ? ? ? ? //切換線程后觸發onRead事件

? ? ? ? ? ? pr.first->async([second, in, is_key]() {

? ? ? ? ? ? ? ? second->write(std::move(const_cast<T &>(in)), is_key);

? ? ? ? ? ? }, false);

? ? ? ? }

? ? ? ? _storage->write(std::move(in), is_key);

? ? }

? ? int readerCount() {

? ? ? ? return _total_count;

? ? }

? ? void clearCache(){

? ? ? ? LOCK_GUARD(_mtx_map);

? ? ? ? _storage->clearCache();

? ? ? ? for (auto &pr : _dispatcher_map) {

? ? ? ? ? ? auto &second = pr.second;

? ? ? ? ? ? //切換線程后清空緩存

? ? ? ? ? ? pr.first->async([second]() {

? ? ? ? ? ? ? ? second->clearCache();

? ? ? ? ? ? }, false);

? ? ? ? }

? ? }

private:

? ? struct HashOfPtr {

? ? ? ? std::size_t operator()(const EventPoller::Ptr &key) const {

? ? ? ? ? ? return (std::size_t) key.get();

? ? ? ? }

? ? };

private:

? ? mutex _mtx_map;

? ? atomic_int _total_count {0};

? ? typename RingStorage::Ptr _storage;

? ? typename RingDelegate<T>::Ptr _delegate;

? ? onReaderChanged _on_reader_changed;

? ? unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;

};

存放ffmpeg rtmp推流,demuxer,解碼,重新編碼后的rtppacket數據

template<typename T>

class _RingStorage {

public:

? ? typedef std::shared_ptr<_RingStorage> Ptr;

? ? _RingStorage(int max_size) {

? ? ? ? //gop緩存個數不能小于32

? ? ? ? if(max_size < RING_MIN_SIZE){

? ? ? ? ? ? max_size = RING_MIN_SIZE;

? ? ? ? }

? ? ? ? _max_size = max_size;

? ? }

? ? ~_RingStorage() {}

? ? /**

? ? ?* 寫入環形緩存數據

? ? ?* @param in 數據

? ? ?* @param is_key 是否為關鍵幀

? ? ?* @return 是否觸發重置環形緩存大小

? ? ?*/

? ? ?void write(T in, bool is_key = true) {

? ? ? ? if (is_key) {

? ? ? ? ? ? //遇到I幀,那么移除老數據

? ? ? ? ? ? _size = 0;

? ? ? ? ? ? _have_idr = true;

? ? ? ? ? ? _data_cache.clear();

? ? ? ? }

? ? ? ? if (!_have_idr) {

? ? ? ? ? ? //緩存中沒有關鍵幀,那么gop緩存無效

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? _data_cache.emplace_back(std::make_pair(is_key, std::move(in)));

? ? ? ? if (++_size > _max_size) {

? ? ? ? ? ? //GOP緩存溢出,清空關老數據

? ? ? ? ? ? _size = 0;

? ? ? ? ? ? _have_idr = false;

? ? ? ? ? ? _data_cache.clear();

? ? ? ? }

? ? }

? ? Ptr clone() const {

? ? ? ? Ptr ret(new _RingStorage());

? ? ? ? ret->_size = _size;

? ? ? ? ret->_have_idr = _have_idr;

? ? ? ? ret->_max_size = _max_size;

? ? ? ? ret->_data_cache = _data_cache;

? ? ? ? return ret;

? ? }

? ? const List<pair<bool, T> > &getCache() const {

? ? ? ? return _data_cache;

? ? }

? ? void clearCache(){

? ? ? ? _size = 0;

? ? ? ? _data_cache.clear();

? ? }

private:

? ? _RingStorage() = default;

private:

? ? bool _have_idr = false;

? ? int _size = 0;

? ? int _max_size;

? ? List<pair<bool, T> > _data_cache;

};

webrtc從此處reader

template<typename T>

class _RingReader {

public:

? ? typedef std::shared_ptr<_RingReader> Ptr;

? ? friend class _RingReaderDispatcher<T>;

? ? _RingReader(const std::shared_ptr<_RingStorage<T> > &storage, bool use_cache) {

? ? ? ? _storage = storage;

? ? ? ? _use_cache = use_cache;

? ? }

? ? ~_RingReader() {}

? ? void setReadCB(const function<void(const T &)> &cb) {

? ? ? ? if (!cb) {

? ? ? ? ? ? _read_cb = [](const T &) {};

? ? ? ? } else {

? ? ? ? ? ? _read_cb = cb;

? ? ? ? ? ? flushGop();

? ? ? ? }

? ? }

? ? void setDetachCB(const function<void()> &cb) {

? ? ? ? if (!cb) {

? ? ? ? ? ? _detach_cb = []() {};

? ? ? ? } else {

? ? ? ? ? ? _detach_cb = cb;

? ? ? ? }

? ? }

private:

? ? void onRead(const T &data, bool is_key) {

? ? ? ? _read_cb(data);

? ? }

? ? void onDetach() const {

? ? ? ? _detach_cb();

? ? }

? ? void flushGop() {

? ? ? ? if (!_use_cache) {

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? _storage->getCache().for_each([&](const pair<bool, T> &pr) {

? ? ? ? ? ? onRead(pr.second, pr.first);

? ? ? ? });

? ? }

private:

? ? bool _use_cache;

? ? shared_ptr<_RingStorage<T> > _storage;

? ? function<void(void)> _detach_cb = []() {};

? ? function<void(const T &)> _read_cb = [](const T &) {};

};

二 推流緩沖

void EventPoller::runLoop(bool blocked,bool regist_self)

bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) /lambda表達式

ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept

void TcpServer::onAcceptConnection(const Socket::Ptr &sock) /lambda表達式

void RtmpSession::onRecv(const Buffer::Ptr &buf)

void RtmpProtocol::onParseRtmp(const char *data, size_t size)

void HttpRequestSplitter::input(const char *data,size_t len)

const char *RtmpProtocol::onSearchPacketTail(const char *data,size_t len)

const char* RtmpProtocol::handle_C2(const char *data, size_t len)

const char* RtmpProtocol::handle_rtmp(const char *data, size_t len)

void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet)

void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet)?

void onWrite(RtmpPacket::Ptr pkt, bool = true) override

void RtmpDemuxer::inputRtmp(const RtmpPacket::Ptr &pkt)?

void H264RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt)?

inline void H264RtmpDecoder::onGetH264(const char* data, size_t len, uint32_t dts, uint32_t pts)?

bool FrameDispatcher ::?inputFrame(const Frame::Ptr &frame) override

bool H264Track::inputFrame(const Frame::Ptr &frame)?

bool H264Track::inputFrame_l(const Frame::Ptr &frame)

?bool FrameWriterInterfaceHelper::inputFrame(const Frame::Ptr &frame) override

bool MediaSink::addTrack(const Track::Ptr &track_in)/lambda表達式

bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in)?

bool RtspMediaSourceMuxer::inputFrame(const Frame::Ptr &frame) override

bool RtspMuxer::inputFrame(const Frame::Ptr &frame)?

bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame)?

bool H264RtpEncoder::inputFrame_l(const Frame::Ptr &frame, bool is_mark)

void H264RtpEncoder::packRtp(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)

void H264RtpEncoder::packRtpFu(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)

bool RtpRing::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos)

void RingBuffer::write(T in, bool is_key = true)

ringbuffer超過maxsize 512之后,推流數據未被拉流的話,會清空。推流和拉流過程是獨立進行,比如只ffmepg推流,webrtc不拉流,播放放。

總結

以上是生活随笔為你收集整理的ZLMediaKit接收ffmpeg rtmp推流的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。