RtpProcess.cpp 7.15 KB
Newer Older
Gemfield committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9
 */
Gemfield committed
10

11
#if defined(ENABLE_RTPPROXY)
xiongziliang committed
12
#include "GB28181Process.h"
Gemfield committed
13
#include "RtpProcess.h"
xiongziliang committed
14
#include "RtpSplitter.h"
Gemfield committed
15
#include "Util/File.h"
16
#include "Http/HttpTSPlayer.h"
xiongziliang committed
17

18
#define RTP_APP_NAME "rtp"
Gemfield committed
19

xiongziliang committed
20
namespace mediakit {
Gemfield committed
21

xiongziliang committed
22
static string printAddress(const struct sockaddr *addr) {
23
    return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
Gemfield committed
24 25
}

26
RtpProcess::RtpProcess(const string &stream_id) {
27 28 29
    _media_info._schema = RTP_APP_NAME;
    _media_info._vhost = DEFAULT_VHOST;
    _media_info._app = RTP_APP_NAME;
30
    _media_info._streamid = stream_id;
Gemfield committed
31

xiongziliang committed
32
    GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
Gemfield committed
33
    {
xiongziliang committed
34
        FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr;
xiongziliang committed
35 36
        if (fp) {
            _save_file_rtp.reset(fp, [](FILE *fp) {
Gemfield committed
37 38 39 40 41 42
                fclose(fp);
            });
        }
    }

    {
xiongziliang committed
43
        FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr;
xiongziliang committed
44 45
        if (fp) {
            _save_file_video.reset(fp, [](FILE *fp) {
Gemfield committed
46 47 48 49 50 51 52
                fclose(fp);
            });
        }
    }
}

RtpProcess::~RtpProcess() {
xiongziliang committed
53
    uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000;
54 55 56 57 58 59 60 61
    WarnP(this) << "RTP推流器("
                << _media_info._vhost << "/"
                << _media_info._app << "/"
                << _media_info._streamid
                << ")断开,耗时(s):" << duration;

    //流量统计事件广播
    GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
xiongziliang committed
62 63
    if (_total_bytes > iFlowThreshold * 1024) {
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
Gemfield committed
64
    }
65 66 67 68 69

    if (_addr) {
        delete _addr;
        _addr = nullptr;
    }
Gemfield committed
70 71
}

xiongziliang committed
72 73
bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, int len, const struct sockaddr *addr, uint32_t *dts_out) {
    GET_CONFIG(bool, check_source, RtpProxy::kCheckSource);
Gemfield committed
74
    //检查源是否合法
xiongziliang committed
75
    if (!_addr) {
Gemfield committed
76
        _addr = new struct sockaddr;
77
        _sock = sock;
xiongziliang committed
78
        memcpy(_addr, addr, sizeof(struct sockaddr));
79 80 81 82 83
        DebugP(this) << "bind to address:" << printAddress(_addr);
        //推流鉴权
        emitOnPublish();
    }

xiongziliang committed
84
    if (!_muxer) {
85 86
        //无权限推流
        return false;
Gemfield committed
87 88
    }

xiongziliang committed
89
    if (check_source && memcmp(_addr, addr, sizeof(struct sockaddr)) != 0) {
90
        DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
Gemfield committed
91 92 93
        return false;
    }

xiongziliang committed
94 95 96
    _total_bytes += len;
    if (_save_file_rtp) {
        uint16_t size = len;
Gemfield committed
97 98
        size = htons(size);
        fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
xiongziliang committed
99
        fwrite((uint8_t *) data, len, 1, _save_file_rtp.get());
Gemfield committed
100
    }
xiongziliang committed
101 102
    if (!_process) {
        _process = std::make_shared<GB28181Process>(_media_info, this);
xiongziliang committed
103
    }
104 105 106 107 108 109 110 111

    GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
    if (!_muxer->isEnabled() && !dts_out && dump_dir.empty()) {
        //无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据
        _last_frame_time.resetTime();
        return false;
    }

xiongziliang committed
112 113 114
    bool ret = _process ? _process->inputRtp(is_udp, data, len) : false;
    if (dts_out) {
        *dts_out = _dts;
Gemfield committed
115
    }
xiongziliang committed
116
    return ret;
Gemfield committed
117 118
}

xiongziliang committed
119 120
void RtpProcess::inputFrame(const Frame::Ptr &frame) {
    _last_frame_time.resetTime();
121 122 123
    _dts = frame->dts();
    if (_save_file_video && frame->getTrackType() == TrackVideo) {
        fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
xiongziliang committed
124
    }
125
    _muxer->inputFrame(frame);
xiongziliang committed
126 127
}

xiongziliang committed
128
void RtpProcess::addTrack(const Track::Ptr &track) {
129
    _muxer->addTrack(track);
Gemfield committed
130 131
}

132 133 134 135
void RtpProcess::addTrackCompleted() {
    _muxer->addTrackCompleted();
}

Gemfield committed
136
bool RtpProcess::alive() {
xiongziliang committed
137 138
    GET_CONFIG(int, timeoutSec, RtpProxy::kTimeoutSec)
    if (_last_frame_time.elapsedTime() / 1000 < timeoutSec) {
Gemfield committed
139 140 141 142 143
        return true;
    }
    return false;
}

xiongziliang committed
144 145
void RtpProcess::onDetach() {
    if (_on_detach) {
146 147 148 149 150 151 152 153
        _on_detach();
    }
}

void RtpProcess::setOnDetach(const function<void()> &cb) {
    _on_detach = cb;
}

xiongziliang committed
154
string RtpProcess::get_peer_ip() {
xiongziliang committed
155
    if (_addr) {
xiongziliang committed
156
        return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
157
    }
xiongziliang committed
158
    return "0.0.0.0";
Gemfield committed
159 160 161
}

uint16_t RtpProcess::get_peer_port() {
xiongziliang committed
162
    if (!_addr) {
163 164
        return 0;
    }
Gemfield committed
165 166
    return ntohs(((struct sockaddr_in *) _addr)->sin_port);
}
167

xiongziliang committed
168
string RtpProcess::get_local_ip() {
xiongziliang committed
169
    if (_sock) {
xiongziliang committed
170
        return _sock->get_local_ip();
171
    }
xiongziliang committed
172
    return "0.0.0.0";
173 174 175
}

uint16_t RtpProcess::get_local_port() {
xiongziliang committed
176 177
    if (_sock) {
        return _sock->get_local_port();
178
    }
179 180 181
    return 0;
}

xiongziliang committed
182
string RtpProcess::getIdentifier() const {
183 184 185
    return _media_info._streamid;
}

xiongziliang committed
186
int RtpProcess::totalReaderCount() {
187
    return _muxer ? _muxer->totalReaderCount() : 0;
188 189
}

xiongziliang committed
190
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener) {
191
    setDelegate(listener);
192 193 194 195
}

void RtpProcess::emitOnPublish() {
    weak_ptr<RtpProcess> weak_self = shared_from_this();
196
    Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) {
197 198 199 200 201 202 203 204
        auto strongSelf = weak_self.lock();
        if (!strongSelf) {
            return;
        }
        if (err.empty()) {
            strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost,
                                                                         strongSelf->_media_info._app,
                                                                         strongSelf->_media_info._streamid, 0,
205
                                                                         true, true, enableHls, enableMP4);
206
            strongSelf->_muxer->setMediaListener(strongSelf);
207 208 209 210 211 212 213 214
            InfoP(strongSelf) << "允许RTP推流";
        } else {
            WarnP(strongSelf) << "禁止RTP推流:" << err;
        }
    };

    //触发推流鉴权事件
    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
xiongziliang committed
215
    if (!flag) {
216 217 218
        //该事件无人监听,默认不鉴权
        GET_CONFIG(bool, toHls, General::kPublishToHls);
        GET_CONFIG(bool, toMP4, General::kPublishToMP4);
219
        invoker("", toHls, toMP4);
220
    }
221 222
}

223 224 225 226 227 228 229 230 231 232 233 234
MediaOriginType RtpProcess::getOriginType(MediaSource &sender) const{
    return MediaOriginType::rtp_push;
}

string RtpProcess::getOriginUrl(MediaSource &sender) const {
    return _media_info._full_url;
}

std::shared_ptr<SockInfo> RtpProcess::getOriginSock(MediaSource &sender) const{
    return const_cast<RtpProcess *>(this)->shared_from_this();
}

235 236
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)