RtpProcess.cpp 8.55 KB
Newer Older
Gemfield committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9
 */
Gemfield committed
10

11
#if defined(ENABLE_RTPPROXY)
Gemfield committed
12 13
#include "RtpProcess.h"
#include "Util/File.h"
14
#include "Http/HttpTSPlayer.h"
15
#define RTP_APP_NAME "rtp"
Gemfield committed
16

17
namespace mediakit{
Gemfield committed
18 19

static string printAddress(const struct sockaddr *addr){
20
    return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
Gemfield committed
21 22
}

23
RtpProcess::RtpProcess(const string &stream_id) {
24 25 26
    _media_info._schema = RTP_APP_NAME;
    _media_info._vhost = DEFAULT_VHOST;
    _media_info._app = RTP_APP_NAME;
27
    _media_info._streamid = stream_id;
Gemfield committed
28

29
    GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
Gemfield committed
30
    {
xiongziliang committed
31
        FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr;
Gemfield committed
32 33 34 35 36 37 38 39
        if(fp){
            _save_file_rtp.reset(fp,[](FILE *fp){
                fclose(fp);
            });
        }
    }

    {
xiongziliang committed
40
        FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".mp2", dump_dir).data(), "wb") : nullptr;
Gemfield committed
41 42 43 44 45 46 47 48
        if(fp){
            _save_file_ps.reset(fp,[](FILE *fp){
                fclose(fp);
            });
        }
    }

    {
xiongziliang committed
49
        FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr;
Gemfield committed
50 51 52 53 54 55
        if(fp){
            _save_file_video.reset(fp,[](FILE *fp){
                fclose(fp);
            });
        }
    }
56 57 58 59
    _rtp_decoder = std::make_shared<CommonRtpDecoder>(CodecInvalid,  256 * 1024);
    _rtp_decoder->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame){
        onRtpDecode((uint8_t *) frame->data(), frame->size(), frame->dts());
    }));
Gemfield committed
60 61 62
}

RtpProcess::~RtpProcess() {
63 64 65 66 67 68 69 70 71
    uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000;
    WarnP(this) << "RTP推流器("
                << _media_info._vhost << "/"
                << _media_info._app << "/"
                << _media_info._streamid
                << ")断开,耗时(s):" << duration;

    //流量统计事件广播
    GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
xiongziliang committed
72 73
    if (_total_bytes > iFlowThreshold * 1024) {
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
Gemfield committed
74
    }
75 76 77 78 79

    if (_addr) {
        delete _addr;
        _addr = nullptr;
    }
Gemfield committed
80 81
}

82
bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
83
    GET_CONFIG(bool,check_source,RtpProxy::kCheckSource);
Gemfield committed
84 85 86
    //检查源是否合法
    if(!_addr){
        _addr = new struct sockaddr;
87
        _sock = sock;
Gemfield committed
88
        memcpy(_addr,addr, sizeof(struct sockaddr));
89 90 91 92 93 94 95 96
        DebugP(this) << "bind to address:" << printAddress(_addr);
        //推流鉴权
        emitOnPublish();
    }

    if(!_muxer){
        //无权限推流
        return false;
Gemfield committed
97 98 99
    }

    if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){
100
        DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
Gemfield committed
101 102 103
        return false;
    }

xiongziliang committed
104
    _total_bytes += data_len;
xiongziliang committed
105
    bool ret = handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len);
106 107 108 109
    if(dts_out){
        *dts_out = _dts;
    }
    return ret;
Gemfield committed
110 111
}

112 113
//判断是否为ts负载
static inline bool checkTS(const uint8_t *packet, int bytes){
114
    return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE;
115 116
}

Gemfield committed
117
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
118
    if(rtp->sequence != (uint16_t)(_sequence + 1) && _sequence != 0){
xiongziliang committed
119
        WarnP(this) << "rtp丢包:" << rtp->sequence << " != " << _sequence << "+1" << ",公网环境下请使用tcp方式推流";
Gemfield committed
120 121 122 123 124 125 126 127
    }
    _sequence = rtp->sequence;
    if(_save_file_rtp){
        uint16_t  size = rtp->size() - 4;
        size = htons(size);
        fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
        fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get());
    }
128
    _rtp_decoder->inputRtp(rtp);
Gemfield committed
129 130
}

131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
const char *RtpProcess::onSearchPacketTail(const char *packet,int bytes){
    try {
        auto ret = _decoder->input((uint8_t *) packet, bytes);
        if (ret > 0) {
            return packet + ret;
        }
        return nullptr;
    } catch (std::exception &ex) {
        InfoL << "解析ps或ts异常: bytes=" << bytes
              << " ,exception=" << ex.what()
              << " ,hex=" << hexdump((uint8_t *) packet, bytes);
        return nullptr;
    }
}

146
void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp) {
Gemfield committed
147 148 149 150
    if(_save_file_ps){
        fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get());
    }

151
    if (!_decoder) {
xiongziliang committed
152
        //创建解码器
153
        if (checkTS(packet, bytes)) {
xiongziliang committed
154
            //猜测是ts负载
155
            InfoP(this) << "judged to be TS";
156
            _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this);
157
        } else {
xiongziliang committed
158
            //猜测是ps负载
159
            InfoP(this) << "judged to be PS";
160
            _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, this);
xiongziliang committed
161 162 163
        }
    }

164
    if (_decoder) {
165
        HttpRequestSplitter::input((char *) packet, bytes);
Gemfield committed
166 167 168
    }
}

169
void  RtpProcess::inputFrame(const Frame::Ptr &frame){
170
    _last_rtp_time.resetTime();
171 172 173
    _dts = frame->dts();
    if (_save_file_video && frame->getTrackType() == TrackVideo) {
        fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
xiongziliang committed
174
    }
175
    _muxer->inputFrame(frame);
xiongziliang committed
176 177
}

178 179
void  RtpProcess::addTrack(const Track::Ptr & track){
    _muxer->addTrack(track);
Gemfield committed
180 181 182
}

bool RtpProcess::alive() {
183
    GET_CONFIG(int,timeoutSec,RtpProxy::kTimeoutSec)
Gemfield committed
184 185 186 187 188 189
    if(_last_rtp_time.elapsedTime() / 1000 < timeoutSec){
        return true;
    }
    return false;
}

190 191 192 193 194 195 196 197 198 199
void RtpProcess::onDetach(){
    if(_on_detach){
        _on_detach();
    }
}

void RtpProcess::setOnDetach(const function<void()> &cb) {
    _on_detach = cb;
}

xiongziliang committed
200 201 202
string RtpProcess::get_peer_ip() {
    if(_addr){
        return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
203
    }
xiongziliang committed
204
    return "0.0.0.0";
Gemfield committed
205 206 207
}

uint16_t RtpProcess::get_peer_port() {
208 209 210
    if(!_addr){
        return 0;
    }
Gemfield committed
211 212
    return ntohs(((struct sockaddr_in *) _addr)->sin_port);
}
213

xiongziliang committed
214
string RtpProcess::get_local_ip() {
215
    if(_sock){
xiongziliang committed
216
        return _sock->get_local_ip();
217
    }
xiongziliang committed
218
    return "0.0.0.0";
219 220 221
}

uint16_t RtpProcess::get_local_port() {
222 223 224
    if(_sock){
       return _sock->get_local_port();
    }
225 226 227 228 229 230 231
    return 0;
}

string RtpProcess::getIdentifier() const{
    return _media_info._streamid;
}

232
int RtpProcess::totalReaderCount(){
233
    return _muxer ? _muxer->totalReaderCount() : 0;
234 235 236
}

void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
237 238 239 240 241 242 243 244 245
    if(_muxer){
        _muxer->setMediaListener(listener);
    }else{
        _listener = listener;
    }
}

void RtpProcess::emitOnPublish() {
    weak_ptr<RtpProcess> weak_self = shared_from_this();
246
    Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) {
247 248 249 250 251 252 253 254
        auto strongSelf = weak_self.lock();
        if (!strongSelf) {
            return;
        }
        if (err.empty()) {
            strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost,
                                                                         strongSelf->_media_info._app,
                                                                         strongSelf->_media_info._streamid, 0,
255
                                                                         true, true, enableHls, enableMP4);
256 257 258 259 260 261 262 263 264 265 266 267 268
            strongSelf->_muxer->setMediaListener(strongSelf->_listener);
            InfoP(strongSelf) << "允许RTP推流";
        } else {
            WarnP(strongSelf) << "禁止RTP推流:" << err;
        }
    };

    //触发推流鉴权事件
    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
    if(!flag){
        //该事件无人监听,默认不鉴权
        GET_CONFIG(bool, toHls, General::kPublishToHls);
        GET_CONFIG(bool, toMP4, General::kPublishToMP4);
269
        invoker("", toHls, toMP4);
270
    }
271 272
}

273 274
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)