WebRtcTransport.cpp 42.3 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2 3 4 5 6 7 8 9 10
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
 *
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
 *
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
 */

11 12 13 14
#include <iostream>
#include <srtp2/srtp.h>

#include "RtpExt.h"
15
#include "Rtcp/Rtcp.h"
16
#include "Rtcp/RtcpFCI.h"
xiongziliang committed
17
#include "Rtsp/RtpReceiver.h"
18
#include "WebRtcTransport.h"
19

20 21 22
#include "WebRtcEchoTest.h"
#include "WebRtcPlayer.h"
#include "WebRtcPusher.h"
23

24
#define RTP_SSRC_OFFSET 1
xiongziliang committed
25 26
#define RTX_SSRC_OFFSET 2
#define RTP_CNAME "zlmediakit-rtp"
xia-chu committed
27 28 29
#define RTP_LABEL "zlmediakit-label"
#define RTP_MSLABEL "zlmediakit-mslabel"
#define RTP_MSID RTP_MSLABEL " " RTP_LABEL
xiongziliang committed
30

夏楚 committed
31
using namespace std;
32 33

namespace mediakit {
夏楚 committed
34

35
// RTC配置项目
36
namespace Rtc {
xia-chu committed
37
#define RTC_FIELD "rtc."
38 39 40 41 42 43 44 45
// rtp和rtcp接受超时时间
const string kTimeOutSec = RTC_FIELD "timeoutSec";
// 服务器外网ip
const string kExternIP = RTC_FIELD "externIP";
// 设置remb比特率,非0时关闭twcc并开启remb。该设置在rtc推流时有效,可以控制推流画质
const string kRembBitRate = RTC_FIELD "rembBitRate";
// webrtc单端口udp服务器
const string kPort = RTC_FIELD "port";
xia-chu committed
46 47 48 49

static onceToken token([]() {
    mINI::Instance()[kTimeOutSec] = 15;
    mINI::Instance()[kExternIP] = "";
50
    mINI::Instance()[kRembBitRate] = 0;
51
    mINI::Instance()[kPort] = 8000;
xia-chu committed
52 53
});

54
} // namespace RTC
xia-chu committed
55

56
static atomic<uint64_t> s_key { 0 };
57

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
static void translateIPFromEnv(std::vector<std::string> &v) {
    for (auto iter = v.begin(); iter != v.end();) {
        if (start_with(*iter, "$")) {
            auto ip = toolkit::getEnv(*iter);
            if (ip.empty()) {
                iter = v.erase(iter);
            } else {
                *iter++ = ip;
            }
        } else {
            ++iter;
        }
    }
}

ziyue committed
73
WebRtcTransport::WebRtcTransport(const EventPoller::Ptr &poller) {
74
    _poller = poller;
75
    _identifier = "zlm_" + to_string(++s_key);
76
    _packet_pool.setSize(64);
77 78
}

79
void WebRtcTransport::onCreate() {
80
    _dtls_transport = std::make_shared<RTC::DtlsTransport>(_poller, this);
81
    _ice_server = std::make_shared<RTC::IceServer>(this, _identifier, makeRandStr(24));
xia-chu committed
82 83
}

84
void WebRtcTransport::onDestory() {
85 86 87
#ifdef ENABLE_SCTP
    _sctp = nullptr;
#endif
ziyue committed
88 89
    _dtls_transport = nullptr;
    _ice_server = nullptr;
90
}
91

92
const EventPoller::Ptr &WebRtcTransport::getPoller() const {
93 94 95
    return _poller;
}

96 97
const string &WebRtcTransport::getIdentifier() const {
    return _identifier;
98 99
}

ziyue committed
100 101
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

102 103 104
void WebRtcTransport::OnIceServerSendStunPacket(
    const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) {
    sendSockData((char *)packet->GetData(), packet->GetSize(), tuple);
ziyue committed
105 106 107 108 109 110 111 112
}

void WebRtcTransport::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) {
    InfoL;
}

void WebRtcTransport::OnIceServerConnected(const RTC::IceServer *iceServer) {
    InfoL;
ziyue committed
113 114 115 116
}

void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer *iceServer) {
    InfoL;
ziyue committed
117
    if (_answer_sdp->media[0].role == DtlsRole::passive) {
ziyue committed
118
        _dtls_transport->Run(RTC::DtlsTransport::Role::SERVER);
ziyue committed
119
    } else {
ziyue committed
120
        _dtls_transport->Run(RTC::DtlsTransport::Role::CLIENT);
ziyue committed
121
    }
ziyue committed
122 123 124 125 126 127 128 129 130
}

void WebRtcTransport::OnIceServerDisconnected(const RTC::IceServer *iceServer) {
    InfoL;
}

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

void WebRtcTransport::OnDtlsTransportConnected(
131 132
    const RTC::DtlsTransport *dtlsTransport, RTC::SrtpSession::CryptoSuite srtpCryptoSuite, uint8_t *srtpLocalKey,
    size_t srtpLocalKeyLen, uint8_t *srtpRemoteKey, size_t srtpRemoteKeyLen, std::string &remoteCert) {
ziyue committed
133
    InfoL;
134 135 136 137
    _srtp_session_send = std::make_shared<RTC::SrtpSession>(
        RTC::SrtpSession::Type::OUTBOUND, srtpCryptoSuite, srtpLocalKey, srtpLocalKeyLen);
    _srtp_session_recv = std::make_shared<RTC::SrtpSession>(
        RTC::SrtpSession::Type::INBOUND, srtpCryptoSuite, srtpRemoteKey, srtpRemoteKeyLen);
138 139 140 141
#ifdef ENABLE_SCTP
    _sctp = std::make_shared<RTC::SctpAssociationImp>(getPoller(), this, 128, 128, 262144, true);
    _sctp->TransportConnected();
#endif
ziyue committed
142
    onStartWebRTC();
ziyue committed
143 144
}

145 146
void WebRtcTransport::OnDtlsTransportSendData(
    const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
147
    sendSockData((char *)data, len, nullptr);
ziyue committed
148 149
}

xia-chu committed
150 151 152 153 154 155 156 157 158 159 160 161 162 163
void WebRtcTransport::OnDtlsTransportConnecting(const RTC::DtlsTransport *dtlsTransport) {
    InfoL;
}

void WebRtcTransport::OnDtlsTransportFailed(const RTC::DtlsTransport *dtlsTransport) {
    InfoL;
    onShutdown(SockException(Err_shutdown, "dtls transport failed"));
}

void WebRtcTransport::OnDtlsTransportClosed(const RTC::DtlsTransport *dtlsTransport) {
    InfoL;
    onShutdown(SockException(Err_shutdown, "dtls close notify received"));
}

164 165
void WebRtcTransport::OnDtlsTransportApplicationDataReceived(
    const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
166 167 168
#ifdef ENABLE_SCTP
    _sctp->ProcessSctpData(data, len);
#else
xia-chu committed
169
    InfoL << hexdump(data, len);
170
#endif
xia-chu committed
171
}
172 173 174

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#ifdef ENABLE_SCTP
175
void WebRtcTransport::OnSctpAssociationConnecting(RTC::SctpAssociation *sctpAssociation) {
176 177 178
    TraceL;
}

179
void WebRtcTransport::OnSctpAssociationConnected(RTC::SctpAssociation *sctpAssociation) {
180 181 182
    InfoL << getIdentifier();
}

183
void WebRtcTransport::OnSctpAssociationFailed(RTC::SctpAssociation *sctpAssociation) {
184 185 186
    WarnL << getIdentifier();
}

187
void WebRtcTransport::OnSctpAssociationClosed(RTC::SctpAssociation *sctpAssociation) {
188 189 190
    InfoL << getIdentifier();
}

191 192
void WebRtcTransport::OnSctpAssociationSendData(
    RTC::SctpAssociation *sctpAssociation, const uint8_t *data, size_t len) {
193 194 195
    _dtls_transport->SendApplicationData(data, len);
}

196 197
void WebRtcTransport::OnSctpAssociationMessageReceived(
    RTC::SctpAssociation *sctpAssociation, uint16_t streamId, uint32_t ppid, const uint8_t *msg, size_t len) {
198 199 200
    InfoL << getIdentifier() << " " << streamId << " " << ppid << " " << len << " " << string((char *)msg, len);
    RTC::SctpStreamParameters params;
    params.streamId = streamId;
201
    // 回显数据
202 203 204
    _sctp->SendSctpMessage(params, ppid, msg, len);
}
#endif
ziyue committed
205 206
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

207
void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple) {
ziyue committed
208
    auto pkt = _packet_pool.obtain2();
209 210
    pkt->assign(buf, len);
    onSendSockData(std::move(pkt), true, tuple ? tuple : _ice_server->GetSelectedTuple());
ziyue committed
211 212
}

213 214
RTC::TransportTuple *WebRtcTransport::getSelectedTuple() const {
    return _ice_server->GetSelectedTuple();
215 216
}

217
void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) {
218
    auto remb = FCI_REMB::create({ ssrc }, (uint32_t)bit_rate);
219 220 221
    auto fb = RtcpFB::create(PSFBType::RTCP_PSFB_REMB, remb.data(), remb.size());
    fb->ssrc = htonl(0);
    fb->ssrc_media = htonl(ssrc);
222
    sendRtcpPacket((char *)fb.get(), fb->getSize(), true);
223 224 225 226 227 228
}

void WebRtcTransport::sendRtcpPli(uint32_t ssrc) {
    auto pli = RtcpFB::create(PSFBType::RTCP_PSFB_PLI);
    pli->ssrc = htonl(0);
    pli->ssrc_media = htonl(ssrc);
229
    sendRtcpPacket((char *)pli.get(), pli->getSize(), true);
230 231
}

232
string getFingerprint(const string &algorithm_str, const std::shared_ptr<RTC::DtlsTransport> &transport) {
ziyue committed
233 234 235 236 237 238 239 240 241
    auto algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(algorithm_str);
    for (auto &finger_prints : transport->GetLocalFingerprints()) {
        if (finger_prints.algorithm == algorithm) {
            return finger_prints.value;
        }
    }
    throw std::invalid_argument(StrPrinter << "不支持的加密算法:" << algorithm_str);
}

242 243
void WebRtcTransport::setRemoteDtlsFingerprint(const RtcSession &remote) {
    // 设置远端dtls签名
ziyue committed
244
    RTC::DtlsTransport::Fingerprint remote_fingerprint;
245 246
    remote_fingerprint.algorithm
        = RTC::DtlsTransport::GetFingerprintAlgorithm(_offer_sdp->media[0].fingerprint.algorithm);
ziyue committed
247 248 249 250
    remote_fingerprint.value = _offer_sdp->media[0].fingerprint.hash;
    _dtls_transport->SetRemoteFingerprint(remote_fingerprint);
}

251
void WebRtcTransport::onRtcConfigure(RtcConfigure &configure) const {
252
    // 开启remb后关闭twcc,因为开启twcc后remb无效
253
    GET_CONFIG(size_t, remb_bit_rate, Rtc::kRembBitRate);
xia-chu committed
254
    configure.enableTWCC(!remb_bit_rate);
255 256
}

257
std::string WebRtcTransport::getAnswerSdp(const string &offer) {
258 259 260 261
    try {
        //// 解析offer sdp ////
        _offer_sdp = std::make_shared<RtcSession>();
        _offer_sdp->loadFrom(offer);
ziyue committed
262
        onCheckSdp(SdpType::offer, *_offer_sdp);
263
        _offer_sdp->checkValid();
264 265 266 267 268 269 270
        setRemoteDtlsFingerprint(*_offer_sdp);

        //// sdp 配置 ////
        SdpAttrFingerprint fingerprint;
        fingerprint.algorithm = _offer_sdp->media[0].fingerprint.algorithm;
        fingerprint.hash = getFingerprint(fingerprint.algorithm, _dtls_transport);
        RtcConfigure configure;
271 272
        configure.setDefaultSetting(
            _ice_server->GetUsernameFragment(), _ice_server->GetPassword(), RtpDirection::sendrecv, fingerprint);
273 274 275 276
        onRtcConfigure(configure);

        //// 生成answer sdp ////
        _answer_sdp = configure.createAnswer(*_offer_sdp);
277
        onCheckSdp(SdpType::answer, *_answer_sdp);
278
        _answer_sdp->checkValid();
279 280 281 282 283
        return _answer_sdp->toString();
    } catch (exception &ex) {
        onShutdown(SockException(Err_shutdown, ex.what()));
        throw;
    }
ziyue committed
284 285
}

286
static bool is_dtls(char *buf) {
287 288 289
    return ((*buf > 19) && (*buf < 64));
}

290
static bool is_rtp(char *buf) {
291
    RtpHeader *header = (RtpHeader *)buf;
292 293 294
    return ((header->pt < 64) || (header->pt >= 96));
}

295
static bool is_rtcp(char *buf) {
296
    RtpHeader *header = (RtpHeader *)buf;
297 298 299
    return ((header->pt >= 64) && (header->pt < 96));
}

300
static string getPeerAddress(RTC::TransportTuple *tuple) {
xiongziliang committed
301
    return SockUtil::inet_ntoa(tuple);
ziyue committed
302 303
}

304
void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple) {
305 306
    if (RTC::StunPacket::IsStun((const uint8_t *)buf, len)) {
        std::unique_ptr<RTC::StunPacket> packet(RTC::StunPacket::Parse((const uint8_t *)buf, len));
Johnny committed
307
        if (!packet) {
308 309 310
            WarnL << "parse stun error" << std::endl;
            return;
        }
Johnny committed
311
        _ice_server->ProcessStunPacket(packet.get(), tuple);
312 313
        return;
    }
ziyue committed
314
    if (is_dtls(buf)) {
315
        _dtls_transport->ProcessDtlsData((uint8_t *)buf, len);
316 317
        return;
    }
ziyue committed
318
    if (is_rtp(buf)) {
ziyue committed
319 320 321 322
        if (!_srtp_session_recv) {
            WarnL << "received rtp packet when dtls not completed from:" << getPeerAddress(tuple);
            return;
        }
323
        if (_srtp_session_recv->DecryptSrtp((uint8_t *)buf, &len)) {
324
            onRtp(buf, len, _ticker.createdTime());
ziyue committed
325
        }
ziyue committed
326 327 328
        return;
    }
    if (is_rtcp(buf)) {
ziyue committed
329 330 331 332
        if (!_srtp_session_recv) {
            WarnL << "received rtcp packet when dtls not completed from:" << getPeerAddress(tuple);
            return;
        }
333
        if (_srtp_session_recv->DecryptSrtcp((uint8_t *)buf, &len)) {
ziyue committed
334 335
            onRtcp(buf, len);
        }
ziyue committed
336 337
        return;
    }
338 339
}

340
void WebRtcTransport::sendRtpPacket(const char *buf, int len, bool flush, void *ctx) {
ziyue committed
341
    if (_srtp_session_send) {
ziyue committed
342
        auto pkt = _packet_pool.obtain2();
343 344
        // 预留rtx加入的两个字节
        pkt->setCapacity((size_t)len + SRTP_MAX_TRAILER_LEN + 2);
345 346 347 348 349
        pkt->assign(buf, len);
        onBeforeEncryptRtp(pkt->data(), len, ctx);
        if (_srtp_session_send->EncryptRtp(reinterpret_cast<uint8_t *>(pkt->data()), &len)) {
            pkt->setSize(len);
            onSendSockData(std::move(pkt), flush);
350
        }
351 352 353
    }
}

354
void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void *ctx) {
xiongziliang committed
355
    if (_srtp_session_send) {
ziyue committed
356
        auto pkt = _packet_pool.obtain2();
357 358
        // 预留rtx加入的两个字节
        pkt->setCapacity((size_t)len + SRTP_MAX_TRAILER_LEN + 2);
359 360 361 362 363
        pkt->assign(buf, len);
        onBeforeEncryptRtcp(pkt->data(), len, ctx);
        if (_srtp_session_send->EncryptRtcp(reinterpret_cast<uint8_t *>(pkt->data()), &len)) {
            pkt->setSize(len);
            onSendSockData(std::move(pkt), flush);
364
        }
xiongziliang committed
365 366 367
    }
}

368 369
///////////////////////////////////////////////////////////////////////////////////

370
void WebRtcTransportImp::onCreate() {
xia-chu committed
371
    WebRtcTransport::onCreate();
372 373
    registerSelf();

374
    weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
375
    GET_CONFIG(float, timeoutSec, Rtc::kTimeOutSec);
376 377 378 379 380 381 382 383
    _timer = std::make_shared<Timer>(
        timeoutSec / 2,
        [weak_self]() {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                return false;
            }
            if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) {
384
                strong_self->onShutdown(SockException(Err_timeout, "接受rtp/rtcp/datachannel超时"));
385 386 387 388
            }
            return true;
        },
        getPoller());
389

390
    _twcc_ctx.setOnSendTwccCB([this](uint32_t ssrc, string fci) { onSendTwcc(ssrc, fci); });
391 392
}

393 394 395 396 397 398 399 400 401
void WebRtcTransportImp::OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) {
    WebRtcTransport::OnDtlsTransportApplicationDataReceived(dtlsTransport, data, len);
#ifdef ENABLE_SCTP
    if (_answer_sdp->isOnlyDatachannel()) {
        _alive_ticker.resetTime();
    }
#endif
}

402 403
WebRtcTransportImp::WebRtcTransportImp(const EventPoller::Ptr &poller)
    : WebRtcTransport(poller) {
404
    InfoL << getIdentifier();
xia-chu committed
405
}
xia-chu committed
406

xia-chu committed
407
WebRtcTransportImp::~WebRtcTransportImp() {
408
    InfoL << getIdentifier();
xia-chu committed
409 410
}

ziyue committed
411 412
void WebRtcTransportImp::onDestory() {
    WebRtcTransport::onDestory();
413
    unregisterSelf();
414 415
}

416
void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::TransportTuple *tuple) {
417
    if (!_selected_session) {
418
        WarnL << "send data failed:" << buf->size();
419 420
        return;
    }
421
    // 一次性发送一帧的rtp数据,提高网络io性能
422 423
    _selected_session->setSendFlushFlag(flush);
    _selected_session->send(std::move(buf));
424 425
}

xiongziliang committed
426 427
///////////////////////////////////////////////////////////////////

428
bool WebRtcTransportImp::canSendRtp() const {
429
    for (auto &m : _answer_sdp->media) {
430 431 432 433 434
        if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::sendonly) {
            return true;
        }
    }
    return false;
xiongziliang committed
435 436
}

437
bool WebRtcTransportImp::canRecvRtp() const {
438
    for (auto &m : _answer_sdp->media) {
439 440 441 442 443
        if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::recvonly) {
            return true;
        }
    }
    return false;
xiongziliang committed
444 445
}

ziyue committed
446
void WebRtcTransportImp::onStartWebRTC() {
447
    // 获取ssrc和pt相关信息,届时收到rtp和rtcp时分别可以根据pt和ssrc找到相关的信息
448
    for (auto &m_answer : _answer_sdp->media) {
449 450 451
        if (m_answer.type == TrackApplication) {
            continue;
        }
452
        auto m_offer = _offer_sdp->getMedia(m_answer.type);
ziyue committed
453
        auto track = std::make_shared<MediaTrack>();
xiongziliang committed
454

ziyue committed
455 456 457 458 459
        track->media = &m_answer;
        track->answer_ssrc_rtp = m_answer.getRtpSSRC();
        track->answer_ssrc_rtx = m_answer.getRtxSSRC();
        track->offer_ssrc_rtp = m_offer->getRtpSSRC();
        track->offer_ssrc_rtx = m_offer->getRtxSSRC();
460
        track->plan_rtp = &m_answer.plan[0];
ziyue committed
461
        track->plan_rtx = m_answer.getRelatedRtxPlan(track->plan_rtp->pt);
462
        track->rtcp_context_send = std::make_shared<RtcpContextForSend>();
xiongziliang committed
463

464
        // rtp track type --> MediaTrack
465
        if (m_answer.direction == RtpDirection::sendonly || m_answer.direction == RtpDirection::sendrecv) {
466
            // 该类型的track 才支持发送
467 468
            _type_to_track[m_answer.type] = track;
        }
469
        // send ssrc --> MediaTrack
ziyue committed
470 471
        _ssrc_to_track[track->answer_ssrc_rtp] = track;
        _ssrc_to_track[track->answer_ssrc_rtx] = track;
xiongziliang committed
472

473
        // recv ssrc --> MediaTrack
ziyue committed
474 475
        _ssrc_to_track[track->offer_ssrc_rtp] = track;
        _ssrc_to_track[track->offer_ssrc_rtx] = track;
xiongziliang committed
476

477 478 479
        // rtp pt --> MediaTrack
        _pt_to_track.emplace(
            track->plan_rtp->pt, std::unique_ptr<WrappedMediaTrack>(new WrappedRtpTrack(track, _twcc_ctx, *this)));
ziyue committed
480
        if (track->plan_rtx) {
481
            // rtx pt --> MediaTrack
wxf committed
482
            _pt_to_track.emplace(track->plan_rtx->pt, std::unique_ptr<WrappedMediaTrack>(new WrappedRtxTrack(track)));
xiongziliang committed
483
        }
484
        // 记录rtp ext类型与id的关系,方便接收或发送rtp时修改rtp ext id
485 486 487
        track->rtp_ext_ctx = std::make_shared<RtpExtContext>(*m_offer);
        weak_ptr<MediaTrack> weak_track = track;
        track->rtp_ext_ctx->setOnGetRtp([this, weak_track](uint8_t pt, uint32_t ssrc, const string &rid) {
488
            // ssrc --> MediaTrack
489 490 491
            auto track = weak_track.lock();
            assert(track);
            _ssrc_to_track[ssrc] = std::move(track);
492
            InfoL << "get rtp, pt:" << (int)pt << ", ssrc:" << ssrc << ", rid:" << rid;
493 494 495 496
        });

        size_t index = 0;
        for (auto &ssrc : m_offer->rtp_ssrc_sim) {
497
            // 记录ssrc对应的MediaTrack
498 499
            _ssrc_to_track[ssrc.ssrc] = track;
            if (m_offer->rtp_rids.size() > index) {
500
                // 支持firefox的simulcast, 提前映射好ssrc和rid的关系
501
                track->rtp_ext_ctx->setRid(ssrc.ssrc, m_offer->rtp_rids[index]);
502 503 504 505 506
            } else {
                // SDP munging没有rid, 它通过group-ssrc:SIM给出ssrc列表;
                // 系统又要有rid,这里手工生成rid,并为其绑定ssrc
                std::string rid = "r" + std::to_string(index);
                track->rtp_ext_ctx->setRid(ssrc.ssrc, rid);
507
                if (ssrc.rtx_ssrc) {
508
                    track->rtp_ext_ctx->setRid(ssrc.rtx_ssrc, rid);
509
                }
510
            }
511
            ++index;
xia-chu committed
512
        }
xiongziliang committed
513
    }
xiongziliang committed
514 515
}

516
void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) {
517
    // 修改answer sdp的ip、端口信息
518
    GET_CONFIG_FUNC(std::vector<std::string>, extern_ips, Rtc::kExternIP, [](string str) {
519
        std::vector<std::string> ret;
520
        if (str.length()) {
521
            ret = split(str, ",");
522 523
        }
        translateIPFromEnv(ret);
524 525
        return ret;
    });
xia-chu committed
526 527
    for (auto &m : sdp.media) {
        m.addr.reset();
528
        m.addr.address = extern_ips.empty() ? SockUtil::get_local_ip() : extern_ips[0];
xia-chu committed
529 530
        m.rtcp_addr.reset();
        m.rtcp_addr.address = m.addr.address;
531

532
        GET_CONFIG(uint16_t, local_port, Rtc::kPort);
533
        m.rtcp_addr.port = local_port;
xia-chu committed
534 535 536 537
        m.port = m.rtcp_addr.port;
        sdp.origin.address = m.addr.address;
    }

ziyue committed
538
    if (!canSendRtp()) {
539
        // 设置我们发送的rtp的ssrc
ziyue committed
540 541
        return;
    }
xiongziliang committed
542

ziyue committed
543 544 545 546
    for (auto &m : sdp.media) {
        if (m.type == TrackApplication) {
            continue;
        }
547
        if (!m.rtp_rtx_ssrc.empty()) {
548
            // 已经生成了ssrc
549 550
            continue;
        }
551
        // 添加answer sdp的ssrc信息
xia-chu committed
552
        m.rtp_rtx_ssrc.emplace_back();
553
        auto &ssrc = m.rtp_rtx_ssrc.back();
554
        // 发送的ssrc我们随便定义,因为在发送rtp时会修改为此值
555
        ssrc.ssrc = m.type + RTP_SSRC_OFFSET;
556 557 558 559
        ssrc.cname = RTP_CNAME;
        ssrc.label = RTP_LABEL;
        ssrc.mslabel = RTP_MSLABEL;
        ssrc.msid = RTP_MSID;
xia-chu committed
560 561

        if (m.getRelatedRtxPlan(m.plan[0].pt)) {
562
            // rtx ssrc
563
            ssrc.rtx_ssrc = ssrc.ssrc + RTX_SSRC_OFFSET;
xiongziliang committed
564
        }
ziyue committed
565
    }
566 567
}

568 569
void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) {
    switch (type) {
570 571 572 573 574 575 576 577
    case SdpType::answer:
        onCheckAnswer(sdp);
        break;
    case SdpType::offer:
        break;
    default: /*不可达*/
        assert(0);
        break;
578 579 580
    }
}

581 582
SdpAttrCandidate::Ptr
makeIceCandidate(std::string ip, uint16_t port, uint32_t priority = 100, std::string proto = "udp") {
ziyue committed
583
    auto candidate = std::make_shared<SdpAttrCandidate>();
584
    // rtp端口
ziyue committed
585
    candidate->component = 1;
586 587
    candidate->transport = proto;
    candidate->foundation = proto + "candidate";
588
    // 优先级,单candidate时随便
589 590 591
    candidate->priority = priority;
    candidate->address = ip;
    candidate->port = port;
ziyue committed
592 593 594 595
    candidate->type = "host";
    return candidate;
}

596 597
void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
    WebRtcTransport::onRtcConfigure(configure);
598

599
    GET_CONFIG(uint16_t, local_port, Rtc::kPort);
600
    // 添加接收端口candidate信息
601
    GET_CONFIG_FUNC(std::vector<std::string>, extern_ips, Rtc::kExternIP, [](string str) {
602
        std::vector<std::string> ret;
603
        if (str.length()) {
604
            ret = split(str, ",");
605 606
        }
        translateIPFromEnv(ret);
607 608 609 610 611
        return ret;
    });
    if (extern_ips.empty()) {
        std::string localIp = SockUtil::get_local_ip();
        configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "udp"));
612
    } else {
613 614 615 616 617 618 619 620 621
        const uint32_t delta = 10;
        uint32_t priority = 100 + delta * extern_ips.size();
        for (auto ip : extern_ips) {
            configure.addCandidate(*makeIceCandidate(ip, local_port, priority, "udp"));
            priority -= delta;
        }
    }
}

xiongziliang committed
622 623
///////////////////////////////////////////////////////////////////

624 625 626
class RtpChannel
    : public RtpTrackImp
    , public std::enable_shared_from_this<RtpChannel> {
xiongziliang committed
627
public:
628 629 630
    RtpChannel(EventPoller::Ptr poller, RtpTrackImp::OnSorted cb, function<void(const FCI_NACK &nack)> on_nack) {
        _poller = std::move(poller);
        _on_nack = std::move(on_nack);
ziyue committed
631
        setOnSorted(std::move(cb));
632

633
        _nack_ctx.setOnNack([this](const FCI_NACK &nack) { onNack(nack); });
xiongziliang committed
634 635
    }

636
    ~RtpChannel() override = default;
xiongziliang committed
637

638 639
    RtpPacket::Ptr inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len, bool is_rtx) {
        auto rtp = RtpTrack::inputRtp(type, sample_rate, ptr, len);
640 641 642 643 644 645
        if (!rtp) {
            return rtp;
        }
        auto seq = rtp->getSeq();
        _nack_ctx.received(seq, is_rtx);
        if (!is_rtx) {
646
            // 统计rtp接受情况,便于生成nack rtcp包
647
            _rtcp_context.onRtp(seq, rtp->getStamp(), rtp->ntp_stamp, sample_rate, len);
648
        }
649
        return rtp;
xiongziliang committed
650 651
    }

652
    Buffer::Ptr createRtcpRR(RtcpHeader *sr, uint32_t ssrc) {
ziyue committed
653
        _rtcp_context.onRtcp(sr);
ziyue committed
654
        return _rtcp_context.createRtcpRR(ssrc, getSSRC());
xiongziliang committed
655 656
    }

657
    float getLossRate() {
658
        auto expected = _rtcp_context.getExpectedPacketsInterval();
659
        if (!expected) {
660
            return -1;
661
        }
662
        return _rtcp_context.geLostInterval() * 100 / expected;
663 664 665
    }

private:
666
    void starNackTimer() {
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
        if (_delay_task) {
            return;
        }
        weak_ptr<RtpChannel> weak_self = shared_from_this();
        _delay_task = _poller->doDelayTask(10, [weak_self]() -> uint64_t {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                return 0;
            }
            auto ret = strong_self->_nack_ctx.reSendNack();
            if (!ret) {
                strong_self->_delay_task = nullptr;
            }
            return ret;
        });
    }

    void onNack(const FCI_NACK &nack) {
        _on_nack(nack);
        starNackTimer();
    }

xiongziliang committed
689
private:
ziyue committed
690
    NackContext _nack_ctx;
691
    RtcpContextForRecv _rtcp_context;
692
    EventPoller::Ptr _poller;
夏楚 committed
693
    EventPoller::DelayTask::Ptr _delay_task;
694
    function<void(const FCI_NACK &nack)> _on_nack;
xiongziliang committed
695 696
};

697
std::shared_ptr<RtpChannel> MediaTrack::getRtpChannel(uint32_t ssrc) const {
ziyue committed
698
    auto it_chn = rtp_channel.find(rtp_ext_ctx->getRid(ssrc));
699 700 701 702 703 704
    if (it_chn == rtp_channel.end()) {
        return nullptr;
    }
    return it_chn->second;
}

705
float WebRtcTransportImp::getLossRate(TrackType type) {
706 707 708 709 710 711
    for (auto &pr : _ssrc_to_track) {
        auto ssrc = pr.first;
        auto &track = pr.second;
        auto rtp_chn = track->getRtpChannel(ssrc);
        if (rtp_chn) {
            if (track->media && type == track->media->type) {
712 713
                return rtp_chn->getLossRate();
            }
714
        }
715
    }
716
    return -1;
717 718
}

ziyue committed
719
void WebRtcTransportImp::onRtcp(const char *buf, size_t len) {
xia-chu committed
720
    _bytes_usage += len;
721
    auto rtcps = RtcpHeader::loadFromBytes((char *)buf, len);
xiongziliang committed
722
    for (auto rtcp : rtcps) {
723 724
        switch ((RtcpType)rtcp->pt) {
        case RtcpType::RTCP_SR: {
725
            _alive_ticker.resetTime();
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751
            // 对方汇报rtp发送情况
            RtcpSR *sr = (RtcpSR *)rtcp;
            auto it = _ssrc_to_track.find(sr->ssrc);
            if (it != _ssrc_to_track.end()) {
                auto &track = it->second;
                auto rtp_chn = track->getRtpChannel(sr->ssrc);
                if (!rtp_chn) {
                    WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
                } else {
                    // InfoL << "接收丢包率,ssrc:" << sr->ssrc << ",loss rate(%):" << rtp_chn->getLossRate();
                    // 设置rtp时间戳与ntp时间戳的对应关系
                    rtp_chn->setNtpStamp(sr->rtpts, sr->getNtpUnixStampMS());
                    auto rr = rtp_chn->createRtcpRR(sr, track->answer_ssrc_rtp);
                    sendRtcpPacket(rr->data(), rr->size(), true);
                }
            } else {
                WarnL << "未识别的sr rtcp包:" << rtcp->dumpString();
            }
            break;
        }
        case RtcpType::RTCP_RR: {
            _alive_ticker.resetTime();
            // 对方汇报rtp接收情况
            RtcpRR *rr = (RtcpRR *)rtcp;
            for (auto item : rr->getItemList()) {
                auto it = _ssrc_to_track.find(item->ssrc);
752
                if (it != _ssrc_to_track.end()) {
ziyue committed
753
                    auto &track = it->second;
754 755 756
                    track->rtcp_context_send->onRtcp(rtcp);
                    auto sr = track->rtcp_context_send->createRtcpSR(track->answer_ssrc_rtp);
                    sendRtcpPacket(sr->data(), sr->size(), true);
xia-chu committed
757
                } else {
758
                    WarnL << "未识别的rr rtcp包:" << rtcp->dumpString();
xiongziliang committed
759 760
                }
            }
761 762 763 764 765 766 767 768 769 770
            break;
        }
        case RtcpType::RTCP_BYE: {
            // 对方汇报停止发送rtp
            RtcpBye *bye = (RtcpBye *)rtcp;
            for (auto ssrc : bye->getSSRC()) {
                auto it = _ssrc_to_track.find(*ssrc);
                if (it == _ssrc_to_track.end()) {
                    WarnL << "未识别的bye rtcp包:" << rtcp->dumpString();
                    continue;
xiongziliang committed
771
                }
772 773
                _ssrc_to_track.erase(it);
            }
774
            onRtcpBye();
775 776 777 778 779 780
            onShutdown(SockException(Err_eof, "rtcp bye message received"));
            break;
        }
        case RtcpType::RTCP_PSFB:
        case RtcpType::RTCP_RTPFB: {
            if ((RtcpType)rtcp->pt == RtcpType::RTCP_PSFB) {
xiongziliang committed
781 782
                break;
            }
783 784 785 786 787 788 789 790
            // RTPFB
            switch ((RTPFBType)rtcp->report_count) {
            case RTPFBType::RTCP_RTPFB_NACK: {
                RtcpFB *fb = (RtcpFB *)rtcp;
                auto it = _ssrc_to_track.find(fb->ssrc_media);
                if (it == _ssrc_to_track.end()) {
                    WarnL << "未识别的 rtcp包:" << rtcp->dumpString();
                    return;
xia-chu committed
791
                }
792 793 794 795 796 797
                auto &track = it->second;
                auto &fci = fb->getFci<FCI_NACK>();
                track->nack_list.forEach(fci, [&](const RtpPacket::Ptr &rtp) {
                    // rtp重传
                    onSendRtp(rtp, true, true);
                });
xiongziliang committed
798 799
                break;
            }
800
            default:
xia-chu committed
801 802
                break;
            }
803 804 805 806 807
            break;
        }
        case RtcpType::RTCP_XR: {
            RtcpXRRRTR *xr = (RtcpXRRRTR *)rtcp;
            if (xr->bt != 4) {
808 809
                break;
            }
810 811 812 813 814 815 816 817 818 819 820 821 822 823
            auto it = _ssrc_to_track.find(xr->ssrc);
            if (it == _ssrc_to_track.end()) {
                WarnL << "未识别的 rtcp包:" << rtcp->dumpString();
                return;
            }
            auto &track = it->second;
            track->rtcp_context_send->onRtcp(rtcp);
            auto xrdlrr = track->rtcp_context_send->createRtcpXRDLRR(track->answer_ssrc_rtp, track->answer_ssrc_rtp);
            sendRtcpPacket(xrdlrr->data(), xrdlrr->size(), true);

            break;
        }
        default:
            break;
xiongziliang committed
824 825 826 827
        }
    }
}

828 829
///////////////////////////////////////////////////////////////////

830
void WebRtcTransportImp::createRtpChannel(const string &rid, uint32_t ssrc, MediaTrack &track) {
831
    // rid --> RtpReceiverImp
832
    auto &ref = track.rtp_channel[rid];
833
    weak_ptr<WebRtcTransportImp> weak_self = dynamic_pointer_cast<WebRtcTransportImp>(shared_from_this());
834 835 836 837 838 839 840 841 842
    ref = std::make_shared<RtpChannel>(
        getPoller(), [&track, this, rid](RtpPacket::Ptr rtp) mutable { onSortedRtp(track, rid, std::move(rtp)); },
        [&track, weak_self, ssrc](const FCI_NACK &nack) mutable {
            // nack发送可能由定时器异步触发
            auto strong_self = weak_self.lock();
            if (strong_self) {
                strong_self->onSendNack(track, nack, ssrc);
            }
        });
843
    InfoL << "create rtp receiver of ssrc:" << ssrc << ", rid:" << rid << ", codec:" << track.plan_rtp->codec;
844
}
845

846 847 848 849
void WebRtcTransportImp::updateTicker() {
    _alive_ticker.resetTime();
}

850
void WebRtcTransportImp::onRtp(const char *buf, size_t len, uint64_t stamp_ms) {
851 852 853
    _bytes_usage += len;
    _alive_ticker.resetTime();

854 855
    RtpHeader *rtp = (RtpHeader *)buf;
    // 根据接收到的rtp的pt信息,找到该流的信息
856 857
    auto it = _pt_to_track.find(rtp->pt);
    if (it == _pt_to_track.end()) {
858
        WarnL << "unknown rtp pt:" << (int)rtp->pt;
xiongziliang committed
859 860
        return;
    }
861 862 863 864
    it->second->inputRtp(buf, len, stamp_ms, rtp);
}

void WrappedRtpTrack::inputRtp(const char *buf, size_t len, uint64_t stamp_ms, RtpHeader *rtp) {
865 866 867 868 869 870 871
#if 0
    auto seq = ntohs(rtp->seq);
    if (track->media->type == TrackVideo && seq % 100 == 0) {
        //此处模拟接受丢包
        return;
    }
#endif
872

873 874
    auto ssrc = ntohl(rtp->ssrc);

875
    // 修改ext id至统一
876
    string rid;
877
    auto twcc_ext = track->rtp_ext_ctx->changeRtpExtId(rtp, true, &rid, RtpExtType::transport_cc);
878 879

    if (twcc_ext) {
880
        _twcc_ctx.onRtp(ssrc, twcc_ext.getTransportCCSeq(), stamp_ms);
881
    }
882

ziyue committed
883
    auto &ref = track->rtp_channel[rid];
884
    if (!ref) {
885
        _transport.createRtpChannel(rid, ssrc, *track);
886 887
    }

888 889
    // 解析并排序rtp
    ref->inputRtp(track->media->type, track->plan_rtp->sample_rate, (uint8_t *)buf, len, false);
890 891 892
}

void WrappedRtxTrack::inputRtp(const char *buf, size_t len, uint64_t stamp_ms, RtpHeader *rtp) {
893
    // 修改ext id至统一
894 895 896 897 898
    string rid;
    track->rtp_ext_ctx->changeRtpExtId(rtp, true, &rid, RtpExtType::transport_cc);

    auto &ref = track->rtp_channel[rid];
    if (!ref) {
899 900 901
        // 再接收到对应的rtp前,丢弃rtx包
        WarnL << "unknown rtx rtp, rid:" << rid << ", ssrc:" << ntohl(rtp->ssrc) << ", codec:" << track->plan_rtp->codec
              << ", seq:" << ntohs(rtp->seq);
902
        return;
xia-chu committed
903 904
    }

905 906
    // 这里是rtx重传包
    //  https://datatracker.ietf.org/doc/html/rfc4588#section-4
907 908 909 910 911
    auto payload = rtp->getPayloadData();
    auto size = rtp->getPayloadSize(len);
    if (size < 2) {
        return;
    }
912

913
    // 前两个字节是原始的rtp的seq
914
    auto origin_seq = payload[0] << 8 | payload[1];
915
    // rtx 转换为 rtp
ziyue committed
916
    rtp->pt = track->plan_rtp->pt;
917
    rtp->seq = htons(origin_seq);
ziyue committed
918
    rtp->ssrc = htonl(ref->getSSRC());
919

920
    memmove((uint8_t *)buf + 2, buf, payload - (uint8_t *)buf);
921 922
    buf += 2;
    len -= 2;
923
    ref->inputRtp(track->media->type, track->plan_rtp->sample_rate, (uint8_t *)buf, len, true);
xiongziliang committed
924 925
}

ziyue committed
926
void WebRtcTransportImp::onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc) {
xia-chu committed
927
    auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_NACK, &nack, FCI_NACK::kSize);
夏楚 committed
928
    rtcp->ssrc = htonl(track.answer_ssrc_rtp);
929
    rtcp->ssrc_media = htonl(ssrc);
930
    sendRtcpPacket((char *)rtcp.get(), rtcp->getSize(), true);
xia-chu committed
931 932
}

933 934
void WebRtcTransportImp::onSendTwcc(uint32_t ssrc, const string &twcc_fci) {
    auto rtcp = RtcpFB::create(RTPFBType::RTCP_RTPFB_TWCC, twcc_fci.data(), twcc_fci.size());
夏楚 committed
935
    rtcp->ssrc = htonl(0);
936
    rtcp->ssrc_media = htonl(ssrc);
937
    sendRtcpPacket((char *)rtcp.get(), rtcp->getSize(), true);
938 939
}

xiongziliang committed
940
///////////////////////////////////////////////////////////////////
941

ziyue committed
942 943
void WebRtcTransportImp::onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp) {
    if (track.media->type == TrackVideo && _pli_ticker.elapsedTime() > 2000) {
944
        // 定期发送pli请求关键帧,方便非rtc等协议
xiongziliang committed
945
        _pli_ticker.resetTime();
xia-chu committed
946
        sendRtcpPli(rtp->getSSRC());
xia-chu committed
947

948
        // 开启remb,则发送remb包调节比特率
949
        GET_CONFIG(size_t, remb_bit_rate, Rtc::kRembBitRate);
950
        if (remb_bit_rate && _answer_sdp->supportRtcpFb(SdpConst::kRembRtcpFb)) {
xia-chu committed
951
            sendRtcpRemb(rtp->getSSRC(), remb_bit_rate);
xia-chu committed
952
        }
xiongziliang committed
953
    }
xia-chu committed
954

955
    onRecvRtp(track, rid, std::move(rtp));
xia-chu committed
956 957
}

958
///////////////////////////////////////////////////////////////////
959

960
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx) {
ziyue committed
961 962
    auto &track = _type_to_track[rtp->type];
    if (!track) {
963
        // 忽略,对方不支持该编码类型
964 965
        return;
    }
966
    if (!rtx) {
967 968 969 970
        // 统计rtp发送情况,好做sr汇报
        track->rtcp_context_send->onRtp(
            rtp->getSeq(), rtp->getStamp(), rtp->ntp_stamp, rtp->sample_rate,
            rtp->size() - RtpPacket::kRtpTcpHeaderSize);
971
        track->nack_list.pushBack(rtp);
ziyue committed
972
#if 0
xia-chu committed
973
        //此处模拟发送丢包
xiongziliang committed
974
        if (rtp->type == TrackVideo && rtp->getSeq() % 100 == 0) {
xia-chu committed
975 976 977
            return;
        }
#endif
978
    } else {
979 980
        // 发送rtx重传包
        // TraceL << "send rtx rtp:" << rtp->getSeq();
981
    }
982
    pair<bool /*rtx*/, MediaTrack *> ctx { rtx, track.get() };
ziyue committed
983
    sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, &ctx);
xia-chu committed
984
    _bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize;
985
}
xia-chu committed
986

987
void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx) {
988 989
    auto pr = (pair<bool /*rtx*/, MediaTrack *> *)ctx;
    auto header = (RtpHeader *)buf;
990

ziyue committed
991
    if (!pr->first || !pr->second->plan_rtx) {
992
        // 普通的rtp,或者不支持rtx, 修改目标pt和ssrc
ziyue committed
993
        pr->second->rtp_ext_ctx->changeRtpExtId(header, false);
ziyue committed
994
        header->pt = pr->second->plan_rtp->pt;
xiongziliang committed
995
        header->ssrc = htonl(pr->second->answer_ssrc_rtp);
ziyue committed
996
    } else {
997
        // 重传的rtp, rtx
ziyue committed
998
        pr->second->rtp_ext_ctx->changeRtpExtId(header, false);
ziyue committed
999
        header->pt = pr->second->plan_rtx->pt;
xiongziliang committed
1000
        if (pr->second->answer_ssrc_rtx) {
1001
            // 有rtx单独的ssrc,有些情况下,浏览器支持rtx,但是未指定rtx单独的ssrc
xiongziliang committed
1002
            header->ssrc = htonl(pr->second->answer_ssrc_rtx);
1003
        } else {
1004
            // 未单独指定rtx的ssrc,那么使用rtp的ssrc
1005
            header->ssrc = htonl(pr->second->answer_ssrc_rtp);
ziyue committed
1006
        }
1007

ziyue committed
1008
        auto origin_seq = ntohs(header->seq);
1009
        // seq跟原来的不一样
ziyue committed
1010 1011 1012
        header->seq = htons(_rtx_seq[pr->second->media->type]);
        ++_rtx_seq[pr->second->media->type];

ziyue committed
1013 1014 1015
        auto payload = header->getPayloadData();
        auto payload_size = header->getPayloadSize(len);
        if (payload_size) {
1016 1017
            // rtp负载后移两个字节,这两个字节用于存放osn
            // https://datatracker.ietf.org/doc/html/rfc4588#section-4
ziyue committed
1018 1019 1020 1021 1022 1023
            memmove(payload + 2, payload, payload_size);
        }
        payload[0] = origin_seq >> 8;
        payload[1] = origin_seq & 0xFF;
        len += 2;
    }
1024 1025
}

1026
void WebRtcTransportImp::onShutdown(const SockException &ex) {
1027
    WarnL << ex.what();
1028
    unrefSelf();
1029 1030 1031 1032 1033
    for (auto &pr : _history_sessions) {
        auto session = pr.second.lock();
        if (session) {
            session->shutdown(ex);
        }
1034
    }
xia-chu committed
1035 1036
}

1037
void WebRtcTransportImp::setSession(Session::Ptr session) {
1038
    _history_sessions.emplace(session.get(), session);
1039
    if (_selected_session) {
1040 1041 1042
        InfoL << "rtc network changed: " << _selected_session->get_peer_ip() << ":"
              << _selected_session->get_peer_port() << " -> " << session->get_peer_ip() << ":"
              << session->get_peer_port() << ", id:" << getIdentifier();
1043
    }
1044
    _selected_session = std::move(session);
1045
    unrefSelf();
1046 1047
}

1048
const Session::Ptr &WebRtcTransportImp::getSession() const {
1049
    return _selected_session;
1050 1051
}

1052
uint64_t WebRtcTransportImp::getBytesUsage() const {
1053
    return _bytes_usage;
1054 1055
}

1056
uint64_t WebRtcTransportImp::getDuration() const {
1057
    return _alive_ticker.createdTime() / 1000;
1058 1059
}

1060 1061
void WebRtcTransportImp::onRtcpBye(){}

1062
/////////////////////////////////////////////////////////////////////////////////////////////
1063 1064

void WebRtcTransportImp::registerSelf() {
1065
    _self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
ziyue committed
1066
    WebRtcTransportManager::Instance().addItem(getIdentifier(), _self);
1067 1068
}

1069 1070 1071 1072 1073 1074
void WebRtcTransportImp::unrefSelf() {
    _self = nullptr;
}

void WebRtcTransportImp::unregisterSelf() {
    unrefSelf();
ziyue committed
1075
    WebRtcTransportManager::Instance().removeItem(getIdentifier());
1076 1077
}

ziyue committed
1078
WebRtcTransportManager &WebRtcTransportManager::Instance() {
1079 1080
    static WebRtcTransportManager s_instance;
    return s_instance;
1081
}
ziyue committed
1082

1083
void WebRtcTransportManager::addItem(const string &key, const WebRtcTransportImp::Ptr &ptr) {
1084 1085 1086
    lock_guard<mutex> lck(_mtx);
    _map[key] = ptr;
}
ziyue committed
1087

1088 1089 1090
WebRtcTransportImp::Ptr WebRtcTransportManager::getItem(const string &key) {
    if (key.empty()) {
        return nullptr;
1091
    }
1092 1093 1094 1095 1096 1097 1098
    lock_guard<mutex> lck(_mtx);
    auto it = _map.find(key);
    if (it == _map.end()) {
        return nullptr;
    }
    return it->second.lock();
}
ziyue committed
1099

1100
void WebRtcTransportManager::removeItem(const string &key) {
1101 1102
    lock_guard<mutex> lck(_mtx);
    _map.erase(key);
1103
}
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116

//////////////////////////////////////////////////////////////////////////////////////////////

WebRtcPluginManager &WebRtcPluginManager::Instance() {
    static WebRtcPluginManager s_instance;
    return s_instance;
}

void WebRtcPluginManager::registerPlugin(const string &type, Plugin cb) {
    lock_guard<mutex> lck(_mtx_creator);
    _map_creator[type] = std::move(cb);
}

1117 1118
void WebRtcPluginManager::getAnswerSdp(
    Session &sender, const string &type, const string &offer, const WebRtcArgs &args, const onCreateRtc &cb) {
1119 1120 1121 1122 1123 1124 1125 1126 1127
    lock_guard<mutex> lck(_mtx_creator);
    auto it = _map_creator.find(type);
    if (it == _map_creator.end()) {
        cb(WebRtcException(SockException(Err_other, "the type can not supported")));
        return;
    }
    it->second(sender, offer, args, cb);
}

1128 1129
void echo_plugin(
    Session &sender, const string &offer, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
1130 1131 1132
    cb(*WebRtcEchoTest::create(EventPollerPool::Instance().getPoller()));
}

1133 1134
void push_plugin(
    Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
1135
    MediaInfo info(args["url"]);
1136 1137
    Broadcast::PublishAuthInvoker invoker = [cb, offer_sdp,
                                             info](const string &err, const ProtocolOption &option) mutable {
1138 1139 1140 1141
        if (!err.empty()) {
            cb(WebRtcException(SockException(Err_other, err)));
            return;
        }
ziyue committed
1142 1143 1144 1145 1146 1147 1148

        RtspMediaSourceImp::Ptr push_src;
        std::shared_ptr<void> push_src_ownership;
        auto src = MediaSource::find(RTSP_SCHEMA, info._vhost, info._app, info._streamid);
        auto push_failed = (bool)src;

        while (src) {
1149
            // 尝试断连后继续推流
ziyue committed
1150 1151
            auto rtsp_src = dynamic_pointer_cast<RtspMediaSourceImp>(src);
            if (!rtsp_src) {
1152
                // 源不是rtsp推流产生的
ziyue committed
1153 1154 1155 1156
                break;
            }
            auto ownership = rtsp_src->getOwnership();
            if (!ownership) {
1157
                // 获取推流源所有权失败
ziyue committed
1158 1159 1160 1161 1162 1163 1164 1165 1166
                break;
            }
            push_src = std::move(rtsp_src);
            push_src_ownership = std::move(ownership);
            push_failed = false;
            break;
        }

        if (push_failed) {
1167 1168 1169 1170
            cb(WebRtcException(SockException(Err_other, "already publishing")));
            return;
        }

ziyue committed
1171 1172 1173
        if (!push_src) {
            push_src = std::make_shared<RtspMediaSourceImp>(info._vhost, info._app, info._streamid);
            push_src_ownership = push_src->getOwnership();
1174
            push_src->setProtocolOption(option);
ziyue committed
1175
        }
1176 1177
        auto rtc
            = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info, option);
1178 1179 1180 1181
        push_src->setListener(rtc);
        cb(*rtc);
    };

1182 1183 1184
    // rtsp推流需要鉴权
    auto flag = NoticeCenter::Instance().emitEvent(
        Broadcast::kBroadcastMediaPublish, MediaOriginType::rtc_push, info, invoker, static_cast<SockInfo &>(sender));
1185
    if (!flag) {
1186
        // 该事件无人监听,默认不鉴权
1187
        invoker("", ProtocolOption());
1188 1189 1190
    }
}

1191 1192
void play_plugin(
    Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb) {
1193 1194 1195 1196 1197 1198 1199 1200
    MediaInfo info(args["url"]);
    auto session_ptr = sender.shared_from_this();
    Broadcast::AuthInvoker invoker = [cb, offer_sdp, info, session_ptr](const string &err) mutable {
        if (!err.empty()) {
            cb(WebRtcException(SockException(Err_other, err)));
            return;
        }

1201
        // webrtc播放的是rtsp的源
1202 1203 1204 1205 1206 1207 1208
        info._schema = RTSP_SCHEMA;
        MediaSource::findAsync(info, session_ptr, [=](const MediaSource::Ptr &src_in) mutable {
            auto src = dynamic_pointer_cast<RtspMediaSource>(src_in);
            if (!src) {
                cb(WebRtcException(SockException(Err_other, "stream not found")));
                return;
            }
1209
            // 还原成rtc,目的是为了hook时识别哪种播放协议
1210 1211 1212 1213 1214 1215
            info._schema = RTC_SCHEMA;
            auto rtc = WebRtcPlayer::create(EventPollerPool::Instance().getPoller(), src, info);
            cb(*rtc);
        });
    };

1216 1217 1218
    // 广播通用播放url鉴权事件
    auto flag = NoticeCenter::Instance().emitEvent(
        Broadcast::kBroadcastMediaPlayed, info, invoker, static_cast<SockInfo &>(sender));
1219
    if (!flag) {
1220
        // 该事件无人监听,默认不鉴权
1221 1222 1223 1224
        invoker("");
    }
}

1225
static onceToken s_rtc_auto_register([]() {
1226 1227 1228 1229
    WebRtcPluginManager::Instance().registerPlugin("echo", echo_plugin);
    WebRtcPluginManager::Instance().registerPlugin("push", push_plugin);
    WebRtcPluginManager::Instance().registerPlugin("play", play_plugin);
});
1230 1231

}// namespace mediakit