RtpSelector.cpp 4.26 KB
Newer Older
Gemfield committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3
 *
4
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
5
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9
 */
Gemfield committed
10

11
#if defined(ENABLE_RTPPROXY)
xiongziliang committed
12
#include <stddef.h>
Gemfield committed
13
#include "RtpSelector.h"
xiongziliang committed
14
#include "RtpSplitter.h"
Gemfield committed
15

夏楚 committed
16 17 18
using namespace std;
using namespace toolkit;

19 20
namespace mediakit{

Gemfield committed
21 22
INSTANCE_IMP(RtpSelector);

23 24 25 26 27
void RtpSelector::clear(){
    lock_guard<decltype(_mtx_map)> lck(_mtx_map);
    _map_rtp_process.clear();
}

28
bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){
29
    if (data_len < 12) {
30
        return false;
Gemfield committed
31
    }
32
    uint32_t *ssrc_ptr = (uint32_t *) (data + 8);
33 34
    ssrc = ntohl(*ssrc_ptr);
    return true;
Gemfield committed
35 36
}

37
RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
Gemfield committed
38
    lock_guard<decltype(_mtx_map)> lck(_mtx_map);
39 40
    auto it = _map_rtp_process.find(stream_id);
    if (it == _map_rtp_process.end() && !makeNew) {
Gemfield committed
41 42
        return nullptr;
    }
43 44
    if (it != _map_rtp_process.end() && makeNew) {
        //已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题
45
        throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id << ") already existed");
46
    }
47 48 49
    RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id];
    if (!ref) {
        ref = std::make_shared<RtpProcessHelper>(stream_id, shared_from_this());
50
        ref->attachEvent();
51
        createTimer();
Gemfield committed
52
    }
53
    return ref->getProcess();
Gemfield committed
54 55
}

56 57 58 59
void RtpSelector::createTimer() {
    if (!_timer) {
        //创建超时管理定时器
        weak_ptr<RtpSelector> weakSelf = shared_from_this();
60
        _timer = std::make_shared<Timer>(3.0f, [weakSelf] {
61 62 63 64 65 66 67 68 69 70
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return false;
            }
            strongSelf->onManager();
            return true;
        }, EventPollerPool::Instance().getPoller());
    }
}

71
void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) {
xiongziliang committed
72 73 74 75 76 77 78 79 80 81 82 83
    RtpProcess::Ptr process;
    {
        lock_guard<decltype(_mtx_map)> lck(_mtx_map);
        auto it = _map_rtp_process.find(stream_id);
        if (it == _map_rtp_process.end()) {
            return;
        }
        if (it->second->getProcess().get() != ptr) {
            return;
        }
        process = it->second->getProcess();
        _map_rtp_process.erase(it);
Gemfield committed
84
    }
85
    process->onDetach();
Gemfield committed
86 87 88
}

void RtpSelector::onManager() {
xiongziliang committed
89 90 91 92 93 94 95 96 97 98 99
    List<RtpProcess::Ptr> clear_list;
    {
        lock_guard<decltype(_mtx_map)> lck(_mtx_map);
        for (auto it = _map_rtp_process.begin(); it != _map_rtp_process.end();) {
            if (it->second->getProcess()->alive()) {
                ++it;
                continue;
            }
            WarnL << "RtpProcess timeout:" << it->first;
            clear_list.emplace_back(it->second->getProcess());
            it = _map_rtp_process.erase(it);
Gemfield committed
100 101
        }
    }
xiongziliang committed
102 103 104 105

    clear_list.for_each([](const RtpProcess::Ptr &process) {
        process->onDetach();
    });
Gemfield committed
106 107
}

108 109
RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector> &parent) {
    _stream_id = stream_id;
110
    _parent = parent;
111
    _process = std::make_shared<RtpProcess>(stream_id);
112 113 114
}

RtpProcessHelper::~RtpProcessHelper() {
115 116 117 118 119 120 121
    auto process = std::move(_process);
    try {
        // flush时,确保线程安全
        process->getOwnerPoller(MediaSource::NullMediaSource())->async([process]() { process->flush(); });
    } catch (...) {
        // 忽略getOwnerPoller可能抛出的异常
    }
122 123 124
}

void RtpProcessHelper::attachEvent() {
125
    //主要目的是close回调触发时能把对象从RtpSelector中删除
126
    _process->setDelegate(shared_from_this());
127 128
}

129
bool RtpProcessHelper::close(MediaSource &sender) {
130 131
    //此回调在其他线程触发
    auto parent = _parent.lock();
132
    if (!parent) {
133 134
        return false;
    }
135
    parent->delProcess(_stream_id, _process.get());
136
    WarnL << "close media: " << sender.getUrl();
137 138 139 140 141 142 143
    return true;
}

RtpProcess::Ptr &RtpProcessHelper::getProcess() {
    return _process;
}

144 145
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)