MediaSource.h 7.56 KB
Newer Older
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11 12 13 14 15 16 17 18 19
 */

#ifndef ZLMEDIAKIT_MEDIASOURCE_H
#define ZLMEDIAKIT_MEDIASOURCE_H

#include <mutex>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Common/config.h"
xiongziliang committed
20
#include "Common/Parser.h"
21 22 23
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongziliang committed
24 25 26
#include "Util/List.h"
#include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h"
xiongziliang committed
27
#include "Extension/Track.h"
28
#include "Record/Recorder.h"
29 30

using namespace std;
xiongziliang committed
31
using namespace toolkit;
32

33 34
namespace toolkit{
    class TcpSession;
xiongziliang committed
35
}// namespace toolkit
36

xiongziliang committed
37
namespace mediakit {
38

39 40
class MediaSource;
class MediaSourceEvent{
41
public:
42
    friend class MediaSource;
43 44
    MediaSourceEvent(){};
    virtual ~MediaSourceEvent(){};
xiongziliang committed
45 46

    // 通知拖动进度条
47
    virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){ return false; }
xiongziliang committed
48
    // 通知其停止推流
49
    virtual bool close(MediaSource &sender,bool force) { return false;}
50 51
    // 观看总人数
    virtual int totalReaderCount(MediaSource &sender) = 0;
52 53 54 55
    // 开启或关闭录制
    virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; };
    // 获取录制状态
    virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
56 57 58 59 60
private:
    // 通知无人观看
    void onNoneReader(MediaSource &sender);
private:
    Timer::Ptr _async_close_timer;
61
};
62

63 64 65
/**
 * 解析url获取媒体相关信息
 */
66
class MediaInfo{
67 68 69
public:
    MediaInfo(){}
    ~MediaInfo(){}
70
    MediaInfo(const string &url){ parse(url); }
71 72
    void parse(const string &url);
public:
73 74 75 76 77 78
    string _schema;
    string _host;
    string _port;
    string _vhost;
    string _app;
    string _streamid;
79
    string _param_strs;
80 81
};

xiongziliang committed
82 83 84
/**
 * 媒体源,任何rtsp/rtmp的直播流都源自该对象
 */
xiongziliang committed
85
class MediaSource: public TrackSource, public enable_shared_from_this<MediaSource> {
86 87 88 89 90 91 92
public:
    typedef std::shared_ptr<MediaSource> Ptr;
    typedef unordered_map<string, weak_ptr<MediaSource> > StreamMap;
    typedef unordered_map<string, StreamMap > AppStreamMap;
    typedef unordered_map<string, AppStreamMap > VhostAppStreamMap;
    typedef unordered_map<string, VhostAppStreamMap > SchemaVhostAppStreamMap;

xiongziliang committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106
    MediaSource(const string &strSchema, const string &strVhost, const string &strApp, const string &strId) ;
    virtual ~MediaSource() ;

    // 获取协议类型
    const string& getSchema() const;
    // 虚拟主机
    const string& getVhost() const;
    // 应用名
    const string& getApp() const;
    // 流id
    const string& getId() const;

    // 设置TrackSource
    void setTrackSource(const std::weak_ptr<TrackSource> &track_src);
107 108 109
    // 获取所有Track
    vector<Track::Ptr> getTracks(bool trackReady = true) const override;

xiongziliang committed
110 111
    // 设置监听者
    virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
112 113 114
    // 获取监听者
    const std::weak_ptr<MediaSourceEvent>& getListener() const;

115
    // 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数
xiongziliang committed
116
    virtual int readerCount() = 0;
117 118
    // 观看者个数,包括(hls/rtsp/rtmp)
    virtual int totalReaderCount();
119

xiongziliang committed
120
    // 获取流当前时间戳
121
    virtual uint32_t getTimeStamp(TrackType trackType) { return 0; };
122 123
    // 设置时间戳
    virtual void setTimeStamp(uint32_t uiStamp) {};
124

xiongziliang committed
125 126 127 128 129 130
    // 拖动进度条
    bool seekTo(uint32_t ui32Stamp);
    // 关闭该流
    bool close(bool force);
    // 该流无人观看
    void onNoneReader();
131
    // 开启或关闭录制
132
    virtual bool setupRecord(Recorder::type type, bool start, const string &custom_path);
133
    // 获取录制状态
134
    virtual bool isRecording(Recorder::type type);
135

xiongziliang committed
136 137 138 139 140 141
    // 同步查找流
    static Ptr find(const string &schema, const string &vhost, const string &app, const string &id, bool bMake = true) ;
    // 异步查找流
    static void findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, const function<void(const Ptr &src)> &cb);
    // 遍历所有流
    static void for_each_media(const function<void(const Ptr &src)> &cb);
142 143 144

    // 从mp4文件生成MediaSource
    static MediaSource::Ptr createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &filePath = "", bool checkApp = true);
145
protected:
146
    void regist() ;
147
    bool unregist() ;
148
private:
xiongziliang committed
149 150 151 152 153 154 155 156
    string _strSchema;
    string _strVhost;
    string _strApp;
    string _strId;
    std::weak_ptr<MediaSourceEvent> _listener;
    weak_ptr<TrackSource> _track_source;
    static SchemaVhostAppStreamMap g_mapMediaSrc;
    static recursive_mutex g_mtxMediaSrc;
157 158
};

xiongziliang committed
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
///缓存刷新策略类
class FlushPolicy {
public:
    FlushPolicy(bool is_audio) {
        _is_audio = is_audio;
    };

    ~FlushPolicy() = default;

    uint32_t getStamp(const RtpPacket::Ptr &packet) {
        return packet->timeStamp;
    }

    uint32_t getStamp(const RtmpPacket::Ptr &packet) {
        return packet->timeStamp;
    }

176
    bool isFlushAble(uint32_t new_stamp, int cache_size);
xiongziliang committed
177 178
private:
    bool _is_audio;
179
    uint32_t _last_stamp= 0;
xiongziliang committed
180 181 182 183 184 185 186 187 188
};

/// 视频合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class VideoPacketCache {
public:
xiongziliang committed
189
    VideoPacketCache() : _policy(false) {
xiongziliang committed
190 191 192 193 194 195
        _cache = std::make_shared<packet_list>();
    }

    virtual ~VideoPacketCache() = default;

    void inputVideo(const std::shared_ptr<packet> &rtp, bool key_pos) {
196
        if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) {
xiongziliang committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
            flushAll();
        }

        //追加数据到最后
        _cache->emplace_back(rtp);
        if (key_pos) {
            _key_pos = key_pos;
        }
    }

    virtual void onFlushVideo(std::shared_ptr<packet_list> &, bool key_pos) = 0;

private:
    void flushAll() {
        if (_cache->empty()) {
            return;
        }
        onFlushVideo(_cache, _key_pos);
        _cache = std::make_shared<packet_list>();
        _key_pos = false;
    }

private:
    policy _policy;
    std::shared_ptr<packet_list> _cache;
    bool _key_pos = false;
};

/// 音频频合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class AudioPacketCache {
public:
xiongziliang committed
232
    AudioPacketCache() : _policy(true) {
xiongziliang committed
233 234 235 236 237 238
        _cache = std::make_shared<packet_list>();
    }

    virtual ~AudioPacketCache() = default;

    void inputAudio(const std::shared_ptr<packet> &rtp) {
239
        if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) {
xiongziliang committed
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
            flushAll();
        }
        //追加数据到最后
        _cache->emplace_back(rtp);
    }

    virtual void onFlushAudio(std::shared_ptr<packet_list> &) = 0;

private:
    void flushAll() {
        if (_cache->empty()) {
            return;
        }
        onFlushAudio(_cache);
        _cache = std::make_shared<packet_list>();
    }

private:
    policy _policy;
    std::shared_ptr<packet_list> _cache;
};

xiongziliang committed
262
} /* namespace mediakit */
263 264 265


#endif //ZLMEDIAKIT_MEDIASOURCE_H