RtpSession.cpp 6.96 KB
Newer Older
Gemfield committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3
 *
4
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
5
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11
 */

#if defined(ENABLE_RTPPROXY)
Gemfield committed
12 13
#include "RtpSession.h"
#include "RtpSelector.h"
14
#include "Network/TcpServer.h"
15
#include "Rtsp/Rtsp.h"
16
#include "Rtsp/RtpReceiver.h"
mtdxc committed
17
#include "Common/config.h"
夏楚 committed
18 19 20 21

using namespace std;
using namespace toolkit;

22
namespace mediakit{
Gemfield committed
23

24
const string RtpSession::kStreamID = "stream_id";
25
const string RtpSession::kSSRC = "ssrc";
26
const string RtpSession::kOnlyAudio = "only_audio";
27

ziyue committed
28
void RtpSession::attachServer(const Server &server) {
29 30 31 32 33 34
    setParams(const_cast<Server &>(server));
}

void RtpSession::setParams(mINI &ini) {
    _stream_id = ini[kStreamID];
    _ssrc = ini[kSSRC];
35
    _only_audio = ini[kOnlyAudio];
36 37
}

38
RtpSession::RtpSession(const Socket::Ptr &sock) : Session(sock) {
Gemfield committed
39
    DebugP(this);
40
    socklen_t addr_len = sizeof(_addr);
xiongziliang committed
41
    getpeername(sock->rawFD(), (struct sockaddr *)&_addr, &addr_len);
ziyue committed
42 43 44 45 46
    _is_udp = sock->sockType() == SockNum::Sock_UDP;
    if (_is_udp) {
        // 设置udp socket读缓存
        SockUtil::setRecvBuf(getSock()->rawFD(), 4 * 1024 * 1024);
    }
Gemfield committed
47
}
48

Gemfield committed
49 50
RtpSession::~RtpSession() {
    DebugP(this);
51
    if(_process){
52
        RtpSelector::Instance().delProcess(_stream_id,_process.get());
Gemfield committed
53 54 55 56
    }
}

void RtpSession::onRecv(const Buffer::Ptr &data) {
57 58 59
    if (_is_udp) {
        onRtpPacket(data->data(), data->size());
        return;
Gemfield committed
60
    }
61
    RtpSplitter::input(data->data(), data->size());
Gemfield committed
62 63 64
}

void RtpSession::onError(const SockException &err) {
ziyue committed
65
    WarnP(this) << _stream_id << " " << err.what();
Gemfield committed
66 67 68 69 70 71 72 73 74 75 76 77
}

void RtpSession::onManager() {
    if(_process && !_process->alive()){
        shutdown(SockException(Err_timeout, "receive rtp timeout"));
    }

    if(!_process && _ticker.createdTime() > 10 * 1000){
        shutdown(SockException(Err_timeout, "illegal connection"));
    }
}

78
void RtpSession::onRtpPacket(const char *data, size_t len) {
79 80 81 82 83 84 85 86 87
    if (_delay_close) {
        // 正在延时关闭中,忽略所有数据
        return;
    }
    if (!isRtp(data, len)) {
        // 忽略非rtp数据
        WarnP(this) << "Not rtp packet";
        return;
    }
88 89 90 91 92 93 94 95 96 97
    if (!_is_udp) {
        if (_search_rtp) {
            //搜索上下文期间,数据丢弃
            if (_search_rtp_finished) {
                //下个包开始就是正确的rtp包了
                _search_rtp_finished = false;
                _search_rtp = false;
            }
            return;
        }
monktan committed
98 99
        GET_CONFIG(uint32_t, rtpMaxSize, Rtp::kRtpMaxSize);
        if (len > 1024 * rtpMaxSize) {
100 101 102
            _search_rtp = true;
            WarnL << "rtp包长度异常(" << len << "),发送端可能缓存溢出并覆盖,开始搜索ssrc以便恢复上下文";
            return;
103
        }
104
    }
105
    if (!_process) {
106 107
        //未设置ssrc时,尝试获取ssrc
        if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) {
108 109
            return;
        }
110 111
        if (_stream_id.empty()) {
            //未指定流id就使用ssrc为流id
112
            _stream_id = printSSRC(_ssrc);
113
        }
114 115 116 117 118 119 120 121 122 123 124 125
        try {
            _process = RtpSelector::Instance().getProcess(_stream_id, true);
        } catch (RtpSelector::ProcessExisted &ex) {
            if (!_is_udp) {
                // tcp情况下立即断开连接
                throw;
            }
            // udp情况下延时断开连接(等待超时自动关闭),防止频繁创建销毁RtpSession对象
            WarnP(this) << ex.what();
            _delay_close = true;
            return;
        }
126
        _process->setOnlyAudio(_only_audio);
127
        _process->setDelegate(dynamic_pointer_cast<RtpSession>(shared_from_this()));
Gemfield committed
128
    }
129
    try {
130 131 132
        uint32_t rtp_ssrc = 0;
        RtpSelector::getSSRC(data, len, rtp_ssrc);
        if (rtp_ssrc != _ssrc) {
133
            WarnP(this) << "ssrc mismatched, rtp dropped: " << rtp_ssrc << " != " << _ssrc;
134 135
            return;
        }
xiongziliang committed
136
        _process->inputRtp(false, getSock(), data, len, (struct sockaddr *)&_addr);
ziyue committed
137
    } catch (RtpTrack::BadRtpException &ex) {
138 139 140 141 142 143
        if (!_is_udp) {
            WarnL << ex.what() << ",开始搜索ssrc以便恢复上下文";
            _search_rtp = true;
        } else {
            throw;
        }
144 145 146
    } catch (...) {
        throw;
    }
Gemfield committed
147 148
    _ticker.resetTime();
}
149

150
bool RtpSession::close(MediaSource &sender) {
151
    //此回调在其他线程触发
152 153
    string err = StrPrinter << "close media: " << sender.getUrl();
    safeShutdown(SockException(Err_shutdown, err));
154 155 156
    return true;
}

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) {
    //rtp前面必须预留两个字节的长度字段
    for (ssize_t i = 2; i <= len - 4; ++i) {
        auto ptr = (const uint8_t *) data + i;
        if (ptr[0] == (ssrc >> 24) && ptr[1] == ((ssrc >> 16) & 0xFF) &&
            ptr[2] == ((ssrc >> 8) & 0xFF) && ptr[3] == (ssrc & 0xFF)) {
            return (const char *) ptr;
        }
    }
    return nullptr;
}

//rtp长度到ssrc间的长度固定为10
static size_t constexpr kSSRCOffset = 2 + 4 + 4;

const char *RtpSession::onSearchPacketTail(const char *data, size_t len) {
    if (!_search_rtp) {
        //tcp上下文正常,不用搜索ssrc
        return RtpSplitter::onSearchPacketTail(data, len);
    }
    if (!_process) {
        throw SockException(Err_shutdown, "ssrc未获取到,无法通过ssrc恢复tcp上下文");
    }
    //搜索第一个rtp的ssrc
    auto ssrc_ptr0 = findSSRC(data, len, _ssrc);
    if (!ssrc_ptr0) {
        //未搜索到任意rtp,返回数据不够
        return nullptr;
    }
    //这两个字节是第一个rtp的长度字段
    auto rtp_len_ptr = (ssrc_ptr0 - kSSRCOffset);
    auto rtp_len = ((uint8_t *)rtp_len_ptr)[0] << 8 | ((uint8_t *)rtp_len_ptr)[1];

    //搜索第二个rtp的ssrc
    auto ssrc_ptr1 = findSSRC(ssrc_ptr0 + rtp_len, data + (ssize_t) len - ssrc_ptr0 - rtp_len, _ssrc);
    if (!ssrc_ptr1) {
        //未搜索到第二个rtp,返回数据不够
        return nullptr;
    }

    //两个ssrc的间隔正好等于rtp的长度(外加rtp长度字段),那么说明找到rtp
    auto ssrc_offset = ssrc_ptr1 - ssrc_ptr0;
    if (ssrc_offset == rtp_len + 2 || ssrc_offset == rtp_len + 4) {
        InfoL << "rtp搜索成功,tcp上下文恢复成功,丢弃的rtp残余数据为:" << rtp_len_ptr - data;
        _search_rtp_finished = true;
202 203 204 205
        if (rtp_len_ptr == data) {
            //停止搜索rtp,否则会进入死循环
            _search_rtp = false;
        }
206 207 208 209 210 211 212
        //前面的数据都需要丢弃,这个是rtp的起始
        return rtp_len_ptr;
    }
    //第一个rtp长度不匹配,说明第一个找到的ssrc不是rtp,丢弃之,我们从第二个ssrc所在rtp开始搜索
    return ssrc_ptr1 - kSSRCOffset;
}

213 214
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)