MediaSource.h 11.1 KB
Newer Older
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11 12 13 14 15 16 17 18 19
 */

#ifndef ZLMEDIAKIT_MEDIASOURCE_H
#define ZLMEDIAKIT_MEDIASOURCE_H

#include <mutex>
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Common/config.h"
xiongziliang committed
20
#include "Common/Parser.h"
21 22 23
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongziliang committed
24
#include "Util/List.h"
25
#include "Network/Socket.h"
xiongziliang committed
26 27
#include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h"
xiongziliang committed
28
#include "Extension/Track.h"
29
#include "Record/Recorder.h"
30 31

using namespace std;
xiongziliang committed
32
using namespace toolkit;
33

34 35
namespace toolkit{
    class TcpSession;
xiongziliang committed
36
}// namespace toolkit
37

xiongziliang committed
38
namespace mediakit {
39

40 41 42 43 44 45 46 47 48 49 50 51 52
enum class MediaOriginType : uint8_t {
    unknown = 0,
    rtmp_push ,
    rtsp_push,
    rtp_push,
    pull,
    ffmpeg_pull,
    mp4_vod,
    device_chn
};

string getOriginTypeString(MediaOriginType type);

53 54
class MediaSource;
class MediaSourceEvent{
55
public:
56
    friend class MediaSource;
57 58
    MediaSourceEvent(){};
    virtual ~MediaSourceEvent(){};
xiongziliang committed
59

60 61 62 63 64 65 66
    // 获取媒体源类型
    virtual MediaOriginType getOriginType(MediaSource &sender) const { return MediaOriginType::unknown; }
    // 获取媒体源url或者文件路径
    virtual string getOriginUrl(MediaSource &sender) const { return ""; }
    // 获取媒体源客户端相关信息
    virtual std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const { return nullptr; }

xiongziliang committed
67
    // 通知拖动进度条
xiongziliang committed
68 69 70 71
    virtual bool seekTo(MediaSource &sender, uint32_t stamp) { return false; }
    // 通知其停止产生流
    virtual bool close(MediaSource &sender, bool force) { return false; }
    // 获取观看总人数
72
    virtual int totalReaderCount(MediaSource &sender) = 0;
73 74
    // 通知观看人数变化
    virtual void onReaderChanged(MediaSource &sender, int size);
75 76
    //流注册或注销事件
    virtual void onRegist(MediaSource &sender, bool regist) {};
77

xiongziliang committed
78 79 80 81 82
    ////////////////////////仅供MultiMediaSourceMuxer对象继承////////////////////////
    // 开启或关闭录制
    virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; };
    // 获取录制状态
    virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
83 84
    // 获取所有track相关信息
    virtual vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const { return vector<Track::Ptr>(); };
85 86 87 88
    // 开始发送ps-rtp
    virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) { cb(SockException(Err_other, "not implemented"));};
    // 停止发送ps-rtp
    virtual bool stopSendRtp(MediaSource &sender) {return false; }
xiongziliang committed
89

90 91
private:
    Timer::Ptr _async_close_timer;
92
};
93

xiongziliang committed
94 95 96 97 98 99
//该对象用于拦截感兴趣的MediaSourceEvent事件
class MediaSourceEventInterceptor : public MediaSourceEvent{
public:
    MediaSourceEventInterceptor(){}
    ~MediaSourceEventInterceptor() override {}

100 101 102
    void setDelegate(const std::weak_ptr<MediaSourceEvent> &listener);
    std::shared_ptr<MediaSourceEvent> getDelegate() const;

103 104 105 106
    MediaOriginType getOriginType(MediaSource &sender) const override;
    string getOriginUrl(MediaSource &sender) const override;
    std::shared_ptr<SockInfo> getOriginSock(MediaSource &sender) const override;

xiongziliang committed
107 108 109
    bool seekTo(MediaSource &sender, uint32_t stamp) override;
    bool close(MediaSource &sender, bool force) override;
    int totalReaderCount(MediaSource &sender) override;
110
    void onReaderChanged(MediaSource &sender, int size) override;
111
    void onRegist(MediaSource &sender, bool regist) override;
xiongziliang committed
112 113
    bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
    bool isRecording(MediaSource &sender, Recorder::type type) override;
114
    vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
115 116
    void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) override;
    bool stopSendRtp(MediaSource &sender) override;
xiongziliang committed
117

118
private:
xiongziliang committed
119 120 121
    std::weak_ptr<MediaSourceEvent> _listener;
};

122 123 124
/**
 * 解析url获取媒体相关信息
 */
125
class MediaInfo{
126
public:
xiongziliang committed
127 128 129
    ~MediaInfo() {}
    MediaInfo() {}
    MediaInfo(const string &url) { parse(url); }
130
    void parse(const string &url);
xiongziliang committed
131

132
public:
133
    string _full_url;
134 135 136 137 138 139
    string _schema;
    string _host;
    string _port;
    string _vhost;
    string _app;
    string _streamid;
140
    string _param_strs;
141 142
};

143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
class BytesSpeed {
public:
    BytesSpeed() = default;
    ~BytesSpeed() = default;

    /**
     * 添加统计字节
     */
    BytesSpeed& operator += (uint64_t bytes) {
        _bytes += bytes;
        if (_bytes > 1024 * 1024) {
            //数据大于1MB就计算一次网速
            computeSpeed();
        }
        return *this;
    }

    /**
     * 获取速度,单位bytes/s
     */
    int getSpeed() {
        if (_ticker.elapsedTime() < 1000) {
            //获取频率小于1秒,那么返回上次计算结果
            return _speed;
        }
        return computeSpeed();
    }

private:
    uint64_t computeSpeed() {
        auto elapsed = _ticker.elapsedTime();
        if (!elapsed) {
            return _speed;
        }
        _speed = _bytes * 1000 / elapsed;
        _ticker.resetTime();
        _bytes = 0;
        return _speed;
    }

private:
    int _speed = 0;
    uint64_t _bytes = 0;
    Ticker _ticker;
};

xiongziliang committed
189 190 191
/**
 * 媒体源,任何rtsp/rtmp的直播流都源自该对象
 */
xiongziliang committed
192
class MediaSource: public TrackSource, public enable_shared_from_this<MediaSource> {
193 194 195 196 197 198 199
public:
    typedef std::shared_ptr<MediaSource> Ptr;
    typedef unordered_map<string, weak_ptr<MediaSource> > StreamMap;
    typedef unordered_map<string, StreamMap > AppStreamMap;
    typedef unordered_map<string, AppStreamMap > VhostAppStreamMap;
    typedef unordered_map<string, VhostAppStreamMap > SchemaVhostAppStreamMap;

xiongziliang committed
200
    MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id) ;
xiongziliang committed
201 202
    virtual ~MediaSource() ;

203 204
    ////////////////获取MediaSource相关信息////////////////

xiongziliang committed
205 206 207 208 209 210 211 212 213
    // 获取协议类型
    const string& getSchema() const;
    // 虚拟主机
    const string& getVhost() const;
    // 应用名
    const string& getApp() const;
    // 流id
    const string& getId() const;

214
    // 获取所有Track
xiongziliang committed
215
    vector<Track::Ptr> getTracks(bool ready = true) const override;
216

217 218 219 220 221
    // 获取流当前时间戳
    virtual uint32_t getTimeStamp(TrackType type) { return 0; };
    // 设置时间戳
    virtual void setTimeStamp(uint32_t stamp) {};

222 223
    // 获取数据速率,单位bytes/s
    int getBytesSpeed();
224 225 226 227
    // 获取流创建GMT unix时间戳,单位秒
    uint64_t getCreateStamp() const;
    // 获取流上线时间,单位秒
    uint64_t getAliveSecond() const;
228

229 230
    ////////////////MediaSourceEvent相关接口实现////////////////

xiongziliang committed
231
    // 设置监听者
232
    virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
233
    // 获取监听者
234
    std::weak_ptr<MediaSourceEvent> getListener(bool next = false) const;
235

236
    // 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数
xiongziliang committed
237
    virtual int readerCount() = 0;
238 239
    // 观看者个数,包括(hls/rtsp/rtmp)
    virtual int totalReaderCount();
240

241 242 243 244 245 246 247
    // 获取媒体源类型
    MediaOriginType getOriginType() const;
    // 获取媒体源url或者文件路径
    string getOriginUrl() const;
    // 获取媒体源客户端相关信息
    std::shared_ptr<SockInfo> getOriginSock() const;

xiongziliang committed
248
    // 拖动进度条
xiongziliang committed
249
    bool seekTo(uint32_t stamp);
xiongziliang committed
250 251
    // 关闭该流
    bool close(bool force);
252 253
    // 该流观看人数变化
    void onReaderChanged(int size);
254
    // 开启或关闭录制
255
    bool setupRecord(Recorder::type type, bool start, const string &custom_path);
256
    // 获取录制状态
257
    bool isRecording(Recorder::type type);
258 259 260 261
    // 开始发送ps-rtp
    void startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb);
    // 停止发送ps-rtp
    bool stopSendRtp();
262

263 264
    ////////////////static方法,查找或生成MediaSource////////////////

xiongziliang committed
265
    // 同步查找流
266
    static Ptr find(const string &schema, const string &vhost, const string &app, const string &id);
267 268 269 270

    // 忽略类型,同步查找流,可能返回rtmp/rtsp/hls类型
    static Ptr find(const string &vhost, const string &app, const string &stream_id);

xiongziliang committed
271 272 273 274
    // 异步查找流
    static void findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, const function<void(const Ptr &src)> &cb);
    // 遍历所有流
    static void for_each_media(const function<void(const Ptr &src)> &cb);
275
    // 从mp4文件生成MediaSource
xiongziliang committed
276
    static MediaSource::Ptr createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &file_path = "", bool check_app = true);
277

278
protected:
xiongziliang committed
279 280
    //媒体注册
    void regist();
281 282

private:
xiongziliang committed
283 284 285 286 287
    //媒体注销
    bool unregist();
    //触发媒体事件
    void emitEvent(bool regist);

288 289 290
protected:
    BytesSpeed _speed;

291
private:
292 293
    time_t _create_stamp;
    Ticker _ticker;
xiongziliang committed
294 295 296 297 298
    string _schema;
    string _vhost;
    string _app;
    string _stream_id;
    std::weak_ptr<MediaSourceEvent> _listener;
299 300
};

xiongziliang committed
301 302 303
///缓存刷新策略类
class FlushPolicy {
public:
304
    FlushPolicy() = default;
xiongziliang committed
305 306
    ~FlushPolicy() = default;

xiongziliang committed
307
    bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size);
308

xiongziliang committed
309
private:
xiongziliang committed
310
    uint64_t _last_stamp[2] = {0, 0};
xiongziliang committed
311 312
};

313
/// 合并写缓存模板
xiongziliang committed
314 315 316 317
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
318
class PacketCache {
xiongziliang committed
319
public:
320
    PacketCache(){
xiongziliang committed
321 322 323
        _cache = std::make_shared<packet_list>();
    }

324
    virtual ~PacketCache() = default;
xiongziliang committed
325

xiongziliang committed
326 327
    void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
        if (_policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
xiongziliang committed
328 329 330 331
            flushAll();
        }

        //追加数据到最后
332
        _cache->emplace_back(std::move(pkt));
xiongziliang committed
333 334 335 336 337
        if (key_pos) {
            _key_pos = key_pos;
        }
    }

338 339 340 341
    virtual void clearCache() {
        _cache->clear();
    }

342
    virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
xiongziliang committed
343 344 345 346 347 348

private:
    void flushAll() {
        if (_cache->empty()) {
            return;
        }
349
        onFlush(std::move(_cache), _key_pos);
xiongziliang committed
350 351 352 353 354
        _cache = std::make_shared<packet_list>();
        _key_pos = false;
    }

private:
xiongziliang committed
355
    bool _key_pos = false;
xiongziliang committed
356 357 358 359
    policy _policy;
    std::shared_ptr<packet_list> _cache;
};

xiongziliang committed
360
} /* namespace mediakit */
361
#endif //ZLMEDIAKIT_MEDIASOURCE_H