MediaSource.cpp 23.4 KB
Newer Older
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11
 */

#include "MediaSource.h"
xiongziliang committed
12
#include "Record/MP4Reader.h"
13
#include "Util/util.h"
14
#include "Network/sockutil.h"
15
#include "Network/TcpSession.h"
xiongziliang committed
16 17
using namespace toolkit;
namespace mediakit {
18

xiongziliang committed
19 20
recursive_mutex s_media_source_mtx;
MediaSource::SchemaVhostAppStreamMap s_media_source_map;
21

22
string getOriginTypeString(MediaOriginType type){
23
#define SWITCH_CASE(type) case MediaOriginType::type : return #type
24
    switch (type) {
25 26 27 28 29 30 31 32
        SWITCH_CASE(unknown);
        SWITCH_CASE(rtmp_push);
        SWITCH_CASE(rtsp_push);
        SWITCH_CASE(rtp_push);
        SWITCH_CASE(pull);
        SWITCH_CASE(ffmpeg_pull);
        SWITCH_CASE(mp4_vod);
        SWITCH_CASE(device_chn);
xiongziliang committed
33
        default : return "unknown";
34 35 36
    }
}

xiongziliang committed
37 38 39 40
MediaSource::MediaSource(const string &schema, const string &vhost, const string &app, const string &stream_id){
    GET_CONFIG(bool, enableVhost, General::kEnableVhost);
    if (!enableVhost) {
        _vhost = DEFAULT_VHOST;
xiongziliang committed
41
    } else {
xiongziliang committed
42
        _vhost = vhost.empty() ? DEFAULT_VHOST : vhost;
xiongziliang committed
43
    }
xiongziliang committed
44 45 46
    _schema = schema;
    _app = app;
    _stream_id = stream_id;
47
    _create_stamp = time(NULL);
xiongziliang committed
48 49 50 51 52 53 54
}

MediaSource::~MediaSource() {
    unregist();
}

const string& MediaSource::getSchema() const {
xiongziliang committed
55
    return _schema;
xiongziliang committed
56 57 58
}

const string& MediaSource::getVhost() const {
xiongziliang committed
59
    return _vhost;
xiongziliang committed
60 61 62 63
}

const string& MediaSource::getApp() const {
    //获取该源的id
xiongziliang committed
64
    return _app;
xiongziliang committed
65 66 67
}

const string& MediaSource::getId() const {
xiongziliang committed
68
    return _stream_id;
xiongziliang committed
69 70
}

xiongziliang committed
71 72 73 74 75
int MediaSource::getBytesSpeed(TrackType type){
    if(type == TrackInvalid){
        return _speed[TrackVideo].getSpeed() + _speed[TrackAudio].getSpeed();
    }
    return _speed[type].getSpeed();
76 77
}

78 79 80 81 82 83 84 85 86
uint64_t MediaSource::getCreateStamp() const {
    return _create_stamp;
}

uint64_t MediaSource::getAliveSecond() const {
    //使用Ticker对象获取存活时间的目的是防止修改系统时间导致回退
    return _ticker.createdTime() / 1000;
}

xiongziliang committed
87
vector<Track::Ptr> MediaSource::getTracks(bool ready) const {
88 89 90
    auto listener = _listener.lock();
    if(!listener){
        return vector<Track::Ptr>();
xiongziliang committed
91
    }
92
    return listener->getTracks(const_cast<MediaSource &>(*this), ready);
xiongziliang committed
93 94 95 96 97 98
}

void MediaSource::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
    _listener = listener;
}

99 100 101 102 103 104 105 106 107 108 109 110 111
std::weak_ptr<MediaSourceEvent> MediaSource::getListener(bool next) const{
    if (!next) {
        return _listener;
    }
    auto listener = dynamic_pointer_cast<MediaSourceEventInterceptor>(_listener.lock());
    if (!listener) {
        //不是MediaSourceEventInterceptor对象或者对象已经销毁
        return _listener;
    }
    //获取被拦截的对象
    auto next_obj = listener->getDelegate();
    //有则返回之
    return next_obj ? next_obj : _listener;
xiongziliang committed
112 113
}

114 115 116 117 118 119 120
int MediaSource::totalReaderCount(){
    auto listener = _listener.lock();
    if(!listener){
        return readerCount();
    }
    return listener->totalReaderCount(*this);
}
xiongziliang committed
121

122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
MediaOriginType MediaSource::getOriginType() const {
    auto listener = _listener.lock();
    if (!listener) {
        return MediaOriginType::unknown;
    }
    return listener->getOriginType(const_cast<MediaSource &>(*this));
}

string MediaSource::getOriginUrl() const {
    auto listener = _listener.lock();
    if (!listener) {
        return "";
    }
    return listener->getOriginUrl(const_cast<MediaSource &>(*this));
}

std::shared_ptr<SockInfo> MediaSource::getOriginSock() const {
    auto listener = _listener.lock();
    if (!listener) {
        return nullptr;
    }
    return listener->getOriginSock(const_cast<MediaSource &>(*this));
}

xiongziliang committed
146
bool MediaSource::seekTo(uint32_t stamp) {
xiongziliang committed
147 148 149 150
    auto listener = _listener.lock();
    if(!listener){
        return false;
    }
xiongziliang committed
151
    return listener->seekTo(*this, stamp);
xiongziliang committed
152 153 154 155 156 157 158 159 160 161
}

bool MediaSource::close(bool force) {
    auto listener = _listener.lock();
    if(!listener){
        return false;
    }
    return listener->close(*this,force);
}

162
void MediaSource::onReaderChanged(int size) {
xiongziliang committed
163
    auto listener = _listener.lock();
164 165
    if (listener) {
        listener->onReaderChanged(*this, size);
166
    }
xiongziliang committed
167 168
}

169 170 171
bool MediaSource::setupRecord(Recorder::type type, bool start, const string &custom_path){
    auto listener = _listener.lock();
    if (!listener) {
172
        WarnL << "未设置MediaSource的事件监听者,setupRecord失败:" << getSchema() << "/" << getVhost() << "/" << getApp() << "/" << getId();
173 174 175 176 177 178 179 180 181 182 183 184 185
        return false;
    }
    return listener->setupRecord(*this, type, start, custom_path);
}

bool MediaSource::isRecording(Recorder::type type){
    auto listener = _listener.lock();
    if(!listener){
        return false;
    }
    return listener->isRecording(*this, type);
}

xiongziliang committed
186
void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
    auto listener = _listener.lock();
    if (!listener) {
        cb(SockException(Err_other, "尚未设置事件监听器"));
        return;
    }
    return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, cb);
}

bool MediaSource::stopSendRtp() {
    auto listener = _listener.lock();
    if (!listener) {
        return false;
    }
    return listener->stopSendRtp(*this);
}

xiongziliang committed
203
void MediaSource::for_each_media(const function<void(const MediaSource::Ptr &src)> &cb) {
xiongziliang committed
204
    decltype(s_media_source_map) copy;
xiongziliang committed
205
    {
xiongziliang committed
206
        //拷贝s_media_source_map后再遍历,考虑到是高频使用的全局单例锁,并且在上锁时会执行回调代码
xiongziliang committed
207
        //很容易导致多个锁交叉死锁的情况,而且该函数使用频率不高,拷贝开销相对来说是可以接受的
xiongziliang committed
208 209
        lock_guard<recursive_mutex> lock(s_media_source_mtx);
        copy = s_media_source_map;
xiongziliang committed
210 211 212
    }

    for (auto &pr0 : copy) {
xiongziliang committed
213 214 215 216 217 218 219 220 221 222 223 224 225 226
        for (auto &pr1 : pr0.second) {
            for (auto &pr2 : pr1.second) {
                for (auto &pr3 : pr2.second) {
                    auto src = pr3.second.lock();
                    if(src){
                        cb(src);
                    }
                }
            }
        }
    }
}

template<typename MAP, typename FUNC>
227
static bool searchMedia(MAP &map, const string &schema, const string &vhost, const string &app, const string &id, FUNC &&func) {
xiongziliang committed
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
    auto it0 = map.find(schema);
    if (it0 == map.end()) {
        //未找到协议
        return false;
    }
    auto it1 = it0->second.find(vhost);
    if (it1 == it0->second.end()) {
        //未找到vhost
        return false;
    }
    auto it2 = it1->second.find(app);
    if (it2 == it1->second.end()) {
        //未找到app
        return false;
    }
    auto it3 = it2->second.find(id);
    if (it3 == it2->second.end()) {
        //未找到streamId
        return false;
    }
    return func(it0, it1, it2, it3);
}
250

xiongziliang committed
251 252 253 254 255 256 257 258 259 260 261
template<typename MAP, typename IT0, typename IT1, typename IT2>
static void eraseIfEmpty(MAP &map, IT0 it0, IT1 it1, IT2 it2) {
    if (it2->second.empty()) {
        it1->second.erase(it2);
        if (it1->second.empty()) {
            it0->second.erase(it1);
            if (it0->second.empty()) {
                map.erase(it0);
            }
        }
    }
262
}
xiongziliang committed
263

xiongziliang committed
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, const string &app, const string &id, bool create_new) {
    string vhost = vhost_in;
    GET_CONFIG(bool,enableVhost,General::kEnableVhost);
    if(vhost.empty() || !enableVhost){
        vhost = DEFAULT_VHOST;
    }

    MediaSource::Ptr ret;
    {
        lock_guard<recursive_mutex> lock(s_media_source_mtx);
        //查找某一媒体源,找到后返回
        searchMedia(s_media_source_map, schema, vhost, app, id,
                    [&](MediaSource::SchemaVhostAppStreamMap::iterator &it0, MediaSource::VhostAppStreamMap::iterator &it1,
                        MediaSource::AppStreamMap::iterator &it2, MediaSource::StreamMap::iterator &it3) {
            ret = it3->second.lock();
            if (!ret) {
                //该对象已经销毁
                it2->second.erase(it3);
                eraseIfEmpty(s_media_source_map, it0, it1, it2);
                return false;
            }
            return true;
        });
    }

289 290 291
    if(!ret && create_new && schema != HLS_SCHEMA){
        //未查找媒体源,则读取mp4创建一个
        //播放hls不触发mp4点播(因为HLS也可以用于录像,不是纯粹的直播)
xiongziliang committed
292 293 294 295 296 297 298 299
        ret = MediaSource::createFromMP4(schema, vhost, app, id);
    }
    return ret;
}

static void findAsync_l(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, bool retry,
                        const function<void(const MediaSource::Ptr &src)> &cb){
    auto src = find_l(info._schema, info._vhost, info._app, info._streamid, true);
300
    if (src || !retry) {
301 302 303 304 305
        cb(src);
        return;
    }

    void *listener_tag = session.get();
xiongziliang committed
306
    weak_ptr<TcpSession> weak_session = session;
307

308
    GET_CONFIG(int, maxWaitMS, General::kMaxStreamWaitTimeMS);
xiongziliang committed
309
    auto on_timeout = session->getPoller()->doDelayTask(maxWaitMS, [cb, listener_tag]() {
310 311
        //最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流
        NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
312 313 314 315
        cb(nullptr);
        return 0;
    });

xiongziliang committed
316
    auto cancel_all = [on_timeout, listener_tag]() {
317
        //取消延时任务,防止多次回调
xiongziliang committed
318
        on_timeout->cancel();
319 320 321 322
        //取消媒体注册事件监听
        NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
    };

xiongziliang committed
323 324
    function<void()> close_player = [cb, cancel_all]() {
        cancel_all();
325 326 327 328
        //告诉播放器,流不存在,这样会立即断开播放器
        cb(nullptr);
    };

xiongziliang committed
329 330 331
    auto on_regist = [weak_session, info, cb, cancel_all](BroadcastMediaChangedArgs) {
        auto strong_session = weak_session.lock();
        if (!strong_session) {
332
            //自己已经销毁
xiongziliang committed
333
            cancel_all();
334 335 336
            return;
        }

xiongziliang committed
337 338 339 340 341
        if (!bRegist ||
            sender.getSchema() != info._schema ||
            sender.getVhost() != info._vhost ||
            sender.getApp() != info._app ||
            sender.getId() != info._streamid) {
342 343 344 345
            //不是自己感兴趣的事件,忽略之
            return;
        }

xiongziliang committed
346
        cancel_all();
347

348
        //播发器请求的流终于注册上了,切换到自己的线程再回复
xiongziliang committed
349 350
        strong_session->async([weak_session, info, cb]() {
            auto strongSession = weak_session.lock();
351
            if (!strongSession) {
352 353 354 355
                return;
            }
            DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid;
            //再找一遍媒体源,一般能找到
356
            findAsync_l(info, strongSession, false, cb);
357 358
        }, false);
    };
359

360
    //监听媒体注册事件
xiongziliang committed
361
    NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, on_regist);
362
    //广播未找到流,此时可以立即去拉流,这样还来得及
xiongziliang committed
363
    NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream, info, static_cast<SockInfo &>(*session), close_player);
364
}
xiongziliang committed
365 366 367 368 369

void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session,const function<void(const Ptr &src)> &cb){
    return findAsync_l(info, session, true, cb);
}

370 371 372 373
MediaSource::Ptr MediaSource::find(const string &schema, const string &vhost, const string &app, const string &id) {
    return find_l(schema, vhost, app, id, false);
}

374 375 376 377 378 379 380 381 382 383 384 385
MediaSource::Ptr MediaSource::find(const string &vhost, const string &app, const string &stream_id){
    auto src = MediaSource::find(RTMP_SCHEMA, vhost, app, stream_id);
    if (src) {
        return src;
    }
    src = MediaSource::find(RTSP_SCHEMA, vhost, app, stream_id);
    if (src) {
        return src;
    }
    return MediaSource::find(HLS_SCHEMA, vhost, app, stream_id);
}

xiongziliang committed
386
void MediaSource::emitEvent(bool regist){
387 388
    auto listener = _listener.lock();
    if (listener) {
xiongziliang committed
389 390 391 392 393
        //触发回调
        listener->onRegist(*this, regist);
    }
    //触发广播
    NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, regist, *this);
394
    InfoL << (regist ? "媒体注册:" : "媒体注销:") << _schema << " " << _vhost << " " << _app << " " << _stream_id;
xiongziliang committed
395 396 397 398 399 400 401
}

void MediaSource::regist() {
    {
        //减小互斥锁临界区
        lock_guard<recursive_mutex> lock(s_media_source_mtx);
        s_media_source_map[_schema][_vhost][_app][_stream_id] = shared_from_this();
402
    }
xiongziliang committed
403
    emitEvent(true);
404
}
xiongziliang committed
405 406

//反注册该源
407
bool MediaSource::unregist() {
xiongziliang committed
408 409
    bool ret;
    {
xiongziliang committed
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
        //减小互斥锁临界区
        lock_guard<recursive_mutex> lock(s_media_source_mtx);
        ret = searchMedia(s_media_source_map, _schema, _vhost, _app, _stream_id,
                          [&](SchemaVhostAppStreamMap::iterator &it0, VhostAppStreamMap::iterator &it1,
                              AppStreamMap::iterator &it2, StreamMap::iterator &it3) {
          auto strong_self = it3->second.lock();
          if (strong_self && this != strong_self.get()) {
              //不是自己,不允许反注册
              return false;
          }
          it2->second.erase(it3);
          eraseIfEmpty(s_media_source_map, it0, it1, it2);
          return true;
      });
    }

    if (ret) {
        emitEvent(false);
xiongziliang committed
428 429 430
    }
    return ret;
}
xiongziliang committed
431 432 433

/////////////////////////////////////MediaInfo//////////////////////////////////////

434
void MediaInfo::parse(const string &url_in){
435
    _full_url = url_in;
436 437 438 439 440 441 442
    string url = url_in;
    auto pos = url.find("?");
    if (pos != string::npos) {
        _param_strs = url.substr(pos + 1);
        url.erase(pos);
    }

443
    auto schema_pos = url.find("://");
xiongziliang committed
444 445 446
    if (schema_pos != string::npos) {
        _schema = url.substr(0, schema_pos);
    } else {
447 448
        schema_pos = -3;
    }
xiongziliang committed
449 450
    auto split_vec = split(url.substr(schema_pos + 3), "/");
    if (split_vec.size() > 0) {
451 452
        auto vhost = split_vec[0];
        auto pos = vhost.find(":");
xiongziliang committed
453 454
        if (pos != string::npos) {
            _host = _vhost = vhost.substr(0, pos);
455
            _port = vhost.substr(pos + 1);
xiongziliang committed
456
        } else {
457
            _host = _vhost = vhost;
458
        }
xiongziliang committed
459
        if (_vhost == "localhost" || INADDR_NONE != inet_addr(_vhost.data())) {
460 461 462
            //如果访问的是localhost或ip,那么则为默认虚拟主机
            _vhost = DEFAULT_VHOST;
        }
463
    }
xiongziliang committed
464
    if (split_vec.size() > 1) {
465
        _app = split_vec[1];
466
    }
xiongziliang committed
467 468 469 470
    if (split_vec.size() > 2) {
        string stream_id;
        for (int i = 2; i < split_vec.size(); ++i) {
            stream_id.append(split_vec[i] + "/");
471
        }
xiongziliang committed
472 473
        if (stream_id.back() == '/') {
            stream_id.pop_back();
474
        }
475 476 477 478 479 480
        _streamid = stream_id;
    }

    auto params = Parser::parseArgs(_param_strs);
    if (params.find(VHOST_KEY) != params.end()) {
        _vhost = params[VHOST_KEY];
481
    }
482

xiongziliang committed
483 484
    GET_CONFIG(bool, enableVhost, General::kEnableVhost);
    if (!enableVhost || _vhost.empty()) {
485
        //如果关闭虚拟主机或者虚拟主机为空,则设置虚拟主机为默认
486
        _vhost = DEFAULT_VHOST;
487 488 489
    }
}

xiongziliang committed
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &vhost, const string &app, const string &stream, const string &file_path , bool check_app){
    GET_CONFIG(string, appName, Record::kAppName);
    if (check_app && app != appName) {
        return nullptr;
    }
#ifdef ENABLE_MP4
    try {
        MP4Reader::Ptr pReader(new MP4Reader(vhost, app, stream, file_path));
        pReader->startReadMP4();
        return MediaSource::find(schema, vhost, app, stream);
    } catch (std::exception &ex) {
        WarnL << ex.what();
        return nullptr;
    }
#else
    WarnL << "创建MP4点播失败,请编译时打开\"ENABLE_MP4\"选项";
    return nullptr;
#endif //ENABLE_MP4
}

xiongziliang committed
510 511
/////////////////////////////////////MediaSourceEvent//////////////////////////////////////

512 513 514 515 516 517
void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){
    if (size || totalReaderCount(sender)) {
        //还有人观看该视频,不触发关闭事件
        return;
    }
    //没有任何人观看该视频源,表明该源可以关闭了
xiongziliang committed
518
    GET_CONFIG(string, record_app, Record::kAppName);
519
    GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
520
    //如果mp4点播, 无人观看时我们强制关闭点播
xiongziliang committed
521
    bool is_mp4_vod = sender.getApp() == record_app;
522
    weak_ptr<MediaSource> weak_sender = sender.shared_from_this();
523

524 525 526
    _async_close_timer = std::make_shared<Timer>(stream_none_reader_delay / 1000.0, [weak_sender, is_mp4_vod]() {
        auto strong_sender = weak_sender.lock();
        if (!strong_sender) {
527 528 529 530
            //对象已经销毁
            return false;
        }

531 532
        if (strong_sender->totalReaderCount()) {
            //还有人观看该视频,不触发关闭事件
533
            return false;
534
        }
535

536
        if (!is_mp4_vod) {
537 538
            //直播时触发无人观看事件,让开发者自行选择是否关闭
            WarnL << "无人观看事件:"
539 540 541 542 543 544
                  << strong_sender->getSchema() << "/"
                  << strong_sender->getVhost() << "/"
                  << strong_sender->getApp() << "/"
                  << strong_sender->getId();
            NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strong_sender);
        } else {
545 546
            //这个是mp4点播,我们自动关闭
            WarnL << "MP4点播无人观看,自动关闭:"
547 548 549 550 551
                  << strong_sender->getSchema() << "/"
                  << strong_sender->getVhost() << "/"
                  << strong_sender->getApp() << "/"
                  << strong_sender->getId();
            strong_sender->close(false);
552
        }
553 554
        return false;
    }, nullptr);
xiongziliang committed
555 556
}

557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
MediaOriginType MediaSourceEventInterceptor::getOriginType(MediaSource &sender) const {
    auto listener = _listener.lock();
    if (!listener) {
        return MediaOriginType::unknown;
    }
    return listener->getOriginType(sender);
}

string MediaSourceEventInterceptor::getOriginUrl(MediaSource &sender) const {
    auto listener = _listener.lock();
    if (!listener) {
        return "";
    }
    return listener->getOriginUrl(sender);
}

std::shared_ptr<SockInfo> MediaSourceEventInterceptor::getOriginSock(MediaSource &sender) const {
    auto listener = _listener.lock();
    if (!listener) {
        return nullptr;
    }
    return listener->getOriginSock(sender);
}

xiongziliang committed
581 582 583 584
bool MediaSourceEventInterceptor::seekTo(MediaSource &sender, uint32_t stamp) {
    auto listener = _listener.lock();
    if (!listener) {
        return false;
585
    }
xiongziliang committed
586 587 588 589 590 591 592
    return listener->seekTo(sender, stamp);
}

bool MediaSourceEventInterceptor::close(MediaSource &sender, bool force) {
    auto listener = _listener.lock();
    if (!listener) {
        return false;
593
    }
xiongziliang committed
594 595 596 597 598 599 600 601 602 603 604
    return listener->close(sender, force);
}

int MediaSourceEventInterceptor::totalReaderCount(MediaSource &sender) {
    auto listener = _listener.lock();
    if (!listener) {
        return sender.readerCount();
    }
    return listener->totalReaderCount(sender);
}

605
void MediaSourceEventInterceptor::onReaderChanged(MediaSource &sender, int size) {
xiongziliang committed
606 607
    auto listener = _listener.lock();
    if (!listener) {
608 609 610
        MediaSourceEvent::onReaderChanged(sender, size);
    } else {
        listener->onReaderChanged(sender, size);
xiongziliang committed
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
    }
}

void MediaSourceEventInterceptor::onRegist(MediaSource &sender, bool regist) {
    auto listener = _listener.lock();
    if (listener) {
        listener->onRegist(sender, regist);
    }
}

bool MediaSourceEventInterceptor::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) {
    auto listener = _listener.lock();
    if (!listener) {
        return false;
    }
    return listener->setupRecord(sender, type, start, custom_path);
}

bool MediaSourceEventInterceptor::isRecording(MediaSource &sender, Recorder::type type) {
    auto listener = _listener.lock();
    if (!listener) {
        return false;
    }
    return listener->isRecording(sender, type);
635
}
636

637 638 639 640 641 642 643 644
vector<Track::Ptr> MediaSourceEventInterceptor::getTracks(MediaSource &sender, bool trackReady) const {
    auto listener = _listener.lock();
    if (!listener) {
        return vector<Track::Ptr>();
    }
    return listener->getTracks(sender, trackReady);
}

xiongziliang committed
645
void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
    auto listener = _listener.lock();
    if (listener) {
        listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb);
    } else {
        MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb);
    }
}

bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender){
    auto listener = _listener.lock();
    if (listener) {
        return listener->stopSendRtp(sender);
    }
    return false;
}

662 663 664 665 666 667 668 669 670 671 672
void MediaSourceEventInterceptor::setDelegate(const std::weak_ptr<MediaSourceEvent> &listener) {
    if (listener.lock().get() == this) {
        throw std::invalid_argument("can not set self as a delegate");
    }
    _listener = listener;
}

std::shared_ptr<MediaSourceEvent> MediaSourceEventInterceptor::getDelegate() const{
    return _listener.lock();
}

xiongziliang committed
673 674
/////////////////////////////////////FlushPolicy//////////////////////////////////////

xiongziliang committed
675
static bool isFlushAble_default(bool is_video, uint64_t last_stamp, uint64_t new_stamp, int cache_size) {
676 677
    if (new_stamp + 500 < last_stamp) {
        //时间戳回退比较大(可能seek中),由于rtp中时间戳是pts,是可能存在一定程度的回退的
xiongziliang committed
678 679 680
        return true;
    }

681 682
    //时间戳发送变化或者缓存超过1024个,sendmsg接口一般最多只能发送1024个数据包
    return last_stamp != new_stamp || cache_size >= 1024;
xiongziliang committed
683 684
}

xiongziliang committed
685
static bool isFlushAble_merge(bool is_video, uint64_t last_stamp, uint64_t new_stamp, int cache_size, int merge_ms) {
686 687
    if (new_stamp + 500 < last_stamp) {
        //时间戳回退比较大(可能seek中),由于rtp中时间戳是pts,是可能存在一定程度的回退的
xiongziliang committed
688 689 690
        return true;
    }

691
    if (new_stamp > last_stamp + merge_ms) {
xiongziliang committed
692 693 694 695
        //时间戳增量超过合并写阈值
        return true;
    }

696 697 698
    //缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题
    //而且sendmsg接口一般最多只能发送1024个数据包
    return cache_size >= 1024;
xiongziliang committed
699 700
}

xiongziliang committed
701
bool FlushPolicy::isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size) {
702 703 704 705
    bool flush_flag = false;
    if (is_key && is_video) {
        //遇到关键帧flush掉前面的数据,确保关键帧为该组数据的第一帧,确保GOP缓存有效
        flush_flag = true;
706
    } else {
707 708 709 710 711 712 713
        GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
        if (mergeWriteMS <= 0) {
            //关闭了合并写或者合并写阈值小于等于0
            flush_flag = isFlushAble_default(is_video, _last_stamp[is_video], new_stamp, cache_size);
        } else {
            flush_flag = isFlushAble_merge(is_video, _last_stamp[is_video], new_stamp, cache_size, mergeWriteMS);
        }
714 715
    }

716 717
    if (flush_flag) {
        _last_stamp[is_video] = new_stamp;
xiongziliang committed
718
    }
719
    return flush_flag;
xiongziliang committed
720 721
}

xiongziliang committed
722
} /* namespace mediakit */