RtmpMediaSource.h 5.43 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9 10 11 12 13
 */

#ifndef SRC_RTMP_RTMPMEDIASOURCE_H_
#define SRC_RTMP_RTMPMEDIASOURCE_H_

xiongzilaing committed
14 15
#include <mutex>
#include <memory>
xzl committed
16 17 18 19 20
#include <string>
#include <functional>
#include <unordered_map>
#include "amf.h"
#include "Rtmp.h"
xiongziliang committed
21
#include "RtmpDemuxer.h"
xiongziliang committed
22
#include "Common/config.h"
23
#include "Common/MediaSource.h"
xiongzilaing committed
24 25 26 27 28 29 30
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/RingBuffer.h"
#include "Util/TimeTicker.h"
#include "Util/ResourcePool.h"
#include "Util/NoticeCenter.h"
#include "Thread/ThreadPool.h"
xiongziliang committed
31
using namespace toolkit;
xzl committed
32

xiongziliang committed
33
#define RTMP_GOP_SIZE 512
xiongziliang committed
34
namespace mediakit {
xzl committed
35

36 37 38 39 40 41 42 43
/**
 * rtmp媒体源的数据抽象
 * rtmp有关键的三要素,分别是metadata、config帧,普通帧
 * 其中metadata是非必须的,有些编码格式也没有config帧(比如MP3)
 * 只要生成了这三要素,那么要实现rtmp推流、rtmp服务器就很简单了
 * rtmp推拉流协议中,先传递metadata,然后传递config帧,然后一直传递普通帧
 */
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr> {
xzl committed
44
public:
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    typedef std::shared_ptr<RtmpMediaSource> Ptr;
    typedef RingBuffer<RtmpPacket::Ptr> RingType;

    /**
     * 构造函数
     * @param vhost 虚拟主机名
     * @param app 应用名
     * @param stream_id 流id
     * @param ring_size 可以设置固定的环形缓冲大小,0则自适应
     */
    RtmpMediaSource(const string &vhost,
                    const string &app,
                    const string &stream_id,
                    int ring_size = RTMP_GOP_SIZE) :
            MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
    }

    virtual ~RtmpMediaSource() {}

    /**
     * 	获取媒体源的环形缓冲
     */
    const RingType::Ptr &getRing() const {
        return _ring;
    }

    /**
     * 获取播放器个数
     * @return
     */
    int readerCount() override {
        return _ring ? _ring->readerCount() : 0;
    }

    /**
     * 获取metadata
     */
    const AMFValue &getMetaData() const {
        lock_guard<recursive_mutex> lock(_mtx);
        return _metadata;
    }

    /**
     * 获取所有的config帧
     */
    template<typename FUNC>
    void getConfigFrame(const FUNC &f) {
        lock_guard<recursive_mutex> lock(_mtx);
        for (auto &pr : _config_frame_map) {
            f(pr.second);
        }
    }

    /**
     * 设置metadata
     */
    virtual void setMetaData(const AMFValue &metadata) {
        lock_guard<recursive_mutex> lock(_mtx);
        _metadata = metadata;
        if(_ring){
            regist();
        }
    }

    /**
     * 输入rtmp包
     * @param pkt rtmp包
     * @param key 是否为关键帧
     */
    void onWrite(const RtmpPacket::Ptr &pkt, bool key = true) override {
        lock_guard<recursive_mutex> lock(_mtx);
        if(pkt->typeId == MSG_VIDEO){
            //有视频,那么启用GOP缓存
118
            _have_video = true;
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
        }
        if (pkt->isCfgFrame()) {
            _config_frame_map[pkt->typeId] = pkt;
            return;
        }

        if (!_ring) {
            weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
            auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
                auto strongSelf = weakSelf.lock();
                if (!strongSelf) {
                    return;
                }
                strongSelf->onReaderChanged(size);
            };

            //rtmp包缓存最大允许512个,如果是纯视频(25fps)大概为20秒数据
            //但是这个是GOP缓存的上限值,真实的GOP缓存大小等于两个I帧之间的包数的两倍
            //而且每次遇到I帧,则会清空GOP缓存,所以真实的GOP缓存远小于此值
            _ring = std::make_shared<RingType>(_ring_size,std::move(lam));
            onReaderChanged(0);

            if(_metadata){
                regist();
            }
        }
        _track_stamps_map[pkt->typeId] = pkt->timeStamp;
        //不存在视频,为了减少缓存延时,那么关闭GOP缓存
        _ring->write(pkt, _have_video ? pkt->isVideoKeyFrame() : true);
    }

    /**
     * 获取当前时间戳
     */
    uint32_t getTimeStamp(TrackType trackType) override {
        lock_guard<recursive_mutex> lock(_mtx);
        switch (trackType) {
            case TrackVideo:
                return _track_stamps_map[MSG_VIDEO];
            case TrackAudio:
                return _track_stamps_map[MSG_AUDIO];
            default:
                return MAX(_track_stamps_map[MSG_VIDEO], _track_stamps_map[MSG_AUDIO]);
        }
    }
164 165

private:
166 167 168 169
    /**
     * 每次增减消费者都会触发该函数
     */
    void onReaderChanged(int size) {
170
        if (size == 0) {
171 172 173
            onNoneReader();
        }
    }
174 175

private:
176 177 178 179 180 181 182
    int _ring_size;
    bool _have_video = false;
    mutable recursive_mutex _mtx;
    AMFValue _metadata;
    RingBuffer<RtmpPacket::Ptr>::Ptr _ring;
    unordered_map<int, uint32_t> _track_stamps_map;
    unordered_map<int, RtmpPacket::Ptr> _config_frame_map;
xzl committed
183 184
};

xiongziliang committed
185
} /* namespace mediakit */
xzl committed
186 187

#endif /* SRC_RTMP_RTMPMEDIASOURCE_H_ */