RtmpMediaSource.h 6.06 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9 10 11 12 13
 */

#ifndef SRC_RTMP_RTMPMEDIASOURCE_H_
#define SRC_RTMP_RTMPMEDIASOURCE_H_

xiongzilaing committed
14 15
#include <mutex>
#include <memory>
xzl committed
16 17 18 19 20
#include <string>
#include <functional>
#include <unordered_map>
#include "amf.h"
#include "Rtmp.h"
xiongziliang committed
21
#include "RtmpDemuxer.h"
xiongziliang committed
22
#include "Common/config.h"
23
#include "Common/MediaSource.h"
xiongzilaing committed
24 25 26 27 28 29 30
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/RingBuffer.h"
#include "Util/TimeTicker.h"
#include "Util/ResourcePool.h"
#include "Util/NoticeCenter.h"
#include "Thread/ThreadPool.h"
xiongziliang committed
31
using namespace toolkit;
xzl committed
32

xiongziliang committed
33
#define RTMP_GOP_SIZE 512
xiongziliang committed
34
namespace mediakit {
xzl committed
35

36 37 38 39 40 41 42
/**
 * rtmp媒体源的数据抽象
 * rtmp有关键的三要素,分别是metadata、config帧,普通帧
 * 其中metadata是非必须的,有些编码格式也没有config帧(比如MP3)
 * 只要生成了这三要素,那么要实现rtmp推流、rtmp服务器就很简单了
 * rtmp推拉流协议中,先传递metadata,然后传递config帧,然后一直传递普通帧
 */
43
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr>, public PacketCache<RtmpPacket>{
xzl committed
44
public:
45
    typedef std::shared_ptr<RtmpMediaSource> Ptr;
xiongziliang committed
46 47
    typedef std::shared_ptr<List<RtmpPacket::Ptr> > RingDataType;
    typedef RingBuffer<RingDataType> RingType;
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110

    /**
     * 构造函数
     * @param vhost 虚拟主机名
     * @param app 应用名
     * @param stream_id 流id
     * @param ring_size 可以设置固定的环形缓冲大小,0则自适应
     */
    RtmpMediaSource(const string &vhost,
                    const string &app,
                    const string &stream_id,
                    int ring_size = RTMP_GOP_SIZE) :
            MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
    }

    virtual ~RtmpMediaSource() {}

    /**
     * 	获取媒体源的环形缓冲
     */
    const RingType::Ptr &getRing() const {
        return _ring;
    }

    /**
     * 获取播放器个数
     * @return
     */
    int readerCount() override {
        return _ring ? _ring->readerCount() : 0;
    }

    /**
     * 获取metadata
     */
    const AMFValue &getMetaData() const {
        lock_guard<recursive_mutex> lock(_mtx);
        return _metadata;
    }

    /**
     * 获取所有的config帧
     */
    template<typename FUNC>
    void getConfigFrame(const FUNC &f) {
        lock_guard<recursive_mutex> lock(_mtx);
        for (auto &pr : _config_frame_map) {
            f(pr.second);
        }
    }

    /**
     * 设置metadata
     */
    virtual void setMetaData(const AMFValue &metadata) {
        lock_guard<recursive_mutex> lock(_mtx);
        _metadata = metadata;
        if(_ring){
            regist();
        }
    }

    /**
111 112 113 114 115 116 117 118
     * 更新metadata
     */
    void updateMetaData(const AMFValue &metadata) {
        lock_guard<recursive_mutex> lock(_mtx);
        _metadata = metadata;
    }

    /**
119 120 121 122 123 124 125 126
     * 输入rtmp包
     * @param pkt rtmp包
     * @param key 是否为关键帧
     */
    void onWrite(const RtmpPacket::Ptr &pkt, bool key = true) override {
        lock_guard<recursive_mutex> lock(_mtx);
        if(pkt->typeId == MSG_VIDEO){
            //有视频,那么启用GOP缓存
127
            _have_video = true;
128 129 130 131 132 133
        }
        if (pkt->isCfgFrame()) {
            _config_frame_map[pkt->typeId] = pkt;
            return;
        }

xiongziliang committed
134 135 136
        //保存当前时间戳
        _track_stamps_map[pkt->typeId] = pkt->timeStamp;

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
        if (!_ring) {
            weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
            auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
                auto strongSelf = weakSelf.lock();
                if (!strongSelf) {
                    return;
                }
                strongSelf->onReaderChanged(size);
            };

            //rtmp包缓存最大允许512个,如果是纯视频(25fps)大概为20秒数据
            //但是这个是GOP缓存的上限值,真实的GOP缓存大小等于两个I帧之间的包数的两倍
            //而且每次遇到I帧,则会清空GOP缓存,所以真实的GOP缓存远小于此值
            _ring = std::make_shared<RingType>(_ring_size,std::move(lam));
            onReaderChanged(0);

            if(_metadata){
                regist();
            }
        }
157
        PacketCache<RtmpPacket>::inputPacket(pkt->typeId == MSG_VIDEO, pkt, key);
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
    }

    /**
     * 获取当前时间戳
     */
    uint32_t getTimeStamp(TrackType trackType) override {
        lock_guard<recursive_mutex> lock(_mtx);
        switch (trackType) {
            case TrackVideo:
                return _track_stamps_map[MSG_VIDEO];
            case TrackAudio:
                return _track_stamps_map[MSG_AUDIO];
            default:
                return MAX(_track_stamps_map[MSG_VIDEO], _track_stamps_map[MSG_AUDIO]);
        }
    }
174 175

private:
xiongziliang committed
176 177

    /**
178 179
    * 批量flush rtmp包时触发该函数
    * @param rtmp_list rtmp包列表
xiongziliang committed
180 181
    * @param key_pos 是否包含关键帧
    */
182 183 184
    void onFlush(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list, bool key_pos) override {
        //如果不存在视频,那么就没有存在GOP缓存的意义,所以is_key一直为true确保一直清空GOP缓存
        _ring->write(rtmp_list, _have_video ? key_pos : true);
xiongziliang committed
185 186
    }

187 188 189 190
    /**
     * 每次增减消费者都会触发该函数
     */
    void onReaderChanged(int size) {
191
        if (size == 0) {
192 193 194
            onNoneReader();
        }
    }
195 196

private:
197 198 199 200
    int _ring_size;
    bool _have_video = false;
    mutable recursive_mutex _mtx;
    AMFValue _metadata;
xiongziliang committed
201
    RingType::Ptr _ring;
202 203
    unordered_map<int, uint32_t> _track_stamps_map;
    unordered_map<int, RtmpPacket::Ptr> _config_frame_map;
xzl committed
204 205
};

xiongziliang committed
206
} /* namespace mediakit */
xzl committed
207 208

#endif /* SRC_RTMP_RTMPMEDIASOURCE_H_ */