RtpSession.cpp 5.72 KB
Newer Older
Gemfield committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3
 *
4
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
5
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11
 */

#if defined(ENABLE_RTPPROXY)
Gemfield committed
12 13
#include "RtpSession.h"
#include "RtpSelector.h"
14
#include "Network/TcpServer.h"
15
#include "Rtsp/RtpReceiver.h"
16
namespace mediakit{
Gemfield committed
17

18 19 20 21 22 23
const string RtpSession::kStreamID = "stream_id";

void RtpSession::attachServer(const TcpServer &server) {
    _stream_id = const_cast<TcpServer &>(server)[kStreamID];
}

Gemfield committed
24 25 26 27 28 29 30
RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
    DebugP(this);
    socklen_t addr_len = sizeof(addr);
    getpeername(sock->rawFD(), &addr, &addr_len);
}
RtpSession::~RtpSession() {
    DebugP(this);
31
    if(_process){
32
        RtpSelector::Instance().delProcess(_stream_id,_process.get());
Gemfield committed
33 34 35 36 37 38 39 40 41 42 43 44 45 46
    }
}

void RtpSession::onRecv(const Buffer::Ptr &data) {
    try {
        RtpSplitter::input(data->data(), data->size());
    } catch (SockException &ex) {
        shutdown(ex);
    } catch (std::exception &ex) {
        shutdown(SockException(Err_other, ex.what()));
    }
}

void RtpSession::onError(const SockException &err) {
47
    WarnL << _stream_id << " " << err.what();
Gemfield committed
48 49 50 51 52 53 54 55 56 57 58 59
}

void RtpSession::onManager() {
    if(_process && !_process->alive()){
        shutdown(SockException(Err_timeout, "receive rtp timeout"));
    }

    if(!_process && _ticker.createdTime() > 10 * 1000){
        shutdown(SockException(Err_timeout, "illegal connection"));
    }
}

60
void RtpSession::onRtpPacket(const char *data, size_t len) {
61 62 63 64 65 66 67 68 69
    if (_search_rtp) {
        //搜索上下文期间,数据丢弃
        if (_search_rtp_finished) {
            //下个包开始就是正确的rtp包了
            _search_rtp_finished = false;
            _search_rtp = false;
        }
        return;
    }
xiongziliang committed
70
    if (len > 1024 * 10) {
71 72 73
        _search_rtp = true;
        WarnL << "rtp包长度异常(" << len << "),发送端可能缓存溢出并覆盖,开始搜索ssrc以便恢复上下文";
        return;
74
    }
75
    if (!_process) {
76
        if (!RtpSelector::getSSRC(data, len, _ssrc)) {
77 78
            return;
        }
79 80
        if (_stream_id.empty()) {
            //未指定流id就使用ssrc为流id
81
            _stream_id = printSSRC(_ssrc);
82 83 84
        }
        //tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess
        _process = RtpSelector::Instance().getProcess(_stream_id, true);
85
        _process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
Gemfield committed
86
    }
87 88 89 90 91 92 93 94
    try {
        _process->inputRtp(false, getSock(), data, len, &addr);
    } catch (RtpReceiver::BadRtpException &ex) {
        WarnL << ex.what() << ",开始搜索ssrc以便恢复上下文";
        _search_rtp = true;
    } catch (...) {
        throw;
    }
Gemfield committed
95 96
    _ticker.resetTime();
}
97

98 99
bool RtpSession::close(MediaSource &sender, bool force) {
    //此回调在其他线程触发
xiongziliang committed
100
    if(!_process || (!force && _process->getTotalReaderCount())){
101 102 103 104 105 106 107 108 109
        return false;
    }
    string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
    safeShutdown(SockException(Err_shutdown,err));
    return true;
}

int RtpSession::totalReaderCount(MediaSource &sender) {
    //此回调在其他线程触发
xiongziliang committed
110
    return _process ? _process->getTotalReaderCount() : sender.totalReaderCount();
111 112
}

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
static const char *findSSRC(const char *data, ssize_t len, uint32_t ssrc) {
    //rtp前面必须预留两个字节的长度字段
    for (ssize_t i = 2; i <= len - 4; ++i) {
        auto ptr = (const uint8_t *) data + i;
        if (ptr[0] == (ssrc >> 24) && ptr[1] == ((ssrc >> 16) & 0xFF) &&
            ptr[2] == ((ssrc >> 8) & 0xFF) && ptr[3] == (ssrc & 0xFF)) {
            return (const char *) ptr;
        }
    }
    return nullptr;
}

//rtp长度到ssrc间的长度固定为10
static size_t constexpr kSSRCOffset = 2 + 4 + 4;

const char *RtpSession::onSearchPacketTail(const char *data, size_t len) {
    if (!_search_rtp) {
        //tcp上下文正常,不用搜索ssrc
        return RtpSplitter::onSearchPacketTail(data, len);
    }
    if (!_process) {
        throw SockException(Err_shutdown, "ssrc未获取到,无法通过ssrc恢复tcp上下文");
    }
    //搜索第一个rtp的ssrc
    auto ssrc_ptr0 = findSSRC(data, len, _ssrc);
    if (!ssrc_ptr0) {
        //未搜索到任意rtp,返回数据不够
        return nullptr;
    }
    //这两个字节是第一个rtp的长度字段
    auto rtp_len_ptr = (ssrc_ptr0 - kSSRCOffset);
    auto rtp_len = ((uint8_t *)rtp_len_ptr)[0] << 8 | ((uint8_t *)rtp_len_ptr)[1];

    //搜索第二个rtp的ssrc
    auto ssrc_ptr1 = findSSRC(ssrc_ptr0 + rtp_len, data + (ssize_t) len - ssrc_ptr0 - rtp_len, _ssrc);
    if (!ssrc_ptr1) {
        //未搜索到第二个rtp,返回数据不够
        return nullptr;
    }

    //两个ssrc的间隔正好等于rtp的长度(外加rtp长度字段),那么说明找到rtp
    auto ssrc_offset = ssrc_ptr1 - ssrc_ptr0;
    if (ssrc_offset == rtp_len + 2 || ssrc_offset == rtp_len + 4) {
        InfoL << "rtp搜索成功,tcp上下文恢复成功,丢弃的rtp残余数据为:" << rtp_len_ptr - data;
        _search_rtp_finished = true;
        //前面的数据都需要丢弃,这个是rtp的起始
        return rtp_len_ptr;
    }
    //第一个rtp长度不匹配,说明第一个找到的ssrc不是rtp,丢弃之,我们从第二个ssrc所在rtp开始搜索
    return ssrc_ptr1 - kSSRCOffset;
}

165 166
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)