RtspPlayer.cpp 28 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xiongziliang committed
9
 */
xiongziliang committed
10

xzl committed
11 12 13
#include <set>
#include <cmath>
#include <algorithm>
xiongziliang committed
14
#include <iomanip>
xiongziliang committed
15
#include "Common/config.h"
xiongzilaing committed
16
#include "RtspPlayer.h"
17
#include "Util/MD5.h"
18
#include "Util/util.h"
xiongziliang committed
19
#include "Util/base64.h"
xzl committed
20
#include "Network/sockutil.h"
xiongziliang committed
21
using namespace toolkit;
xiongziliang committed
22
using namespace mediakit::Client;
xzl committed
23

xiongziliang committed
24
namespace mediakit {
xzl committed
25

26
enum PlayType {
27 28 29
    type_play = 0,
    type_pause,
    type_seek
30 31
};

32
RtspPlayer::RtspPlayer(const EventPoller::Ptr &poller) : TcpClient(poller){
33
    RtpReceiver::setPoolSize(64);
xzl committed
34 35
}
RtspPlayer::~RtspPlayer(void) {
36
    DebugL << endl;
xzl committed
37 38
}
void RtspPlayer::teardown(){
39
    if (alive()) {
40
        sendRtspRequest("TEARDOWN" ,_content_base);
41 42 43
        shutdown(SockException(Err_shutdown,"teardown"));
    }

44 45 46 47 48
    _md5_nonce.clear();
    _realm.clear();
    _sdp_track.clear();
    _session_id.clear();
    _content_base.clear();
49
    RtpReceiver::clear();
xiongziliang committed
50

51 52 53 54 55 56 57 58 59 60 61
    CLEAR_ARR(_rtp_sock);
    CLEAR_ARR(_rtcp_sock);
    CLEAR_ARR(_rtp_seq_start)
    CLEAR_ARR(_rtp_recv_count)
    CLEAR_ARR(_rtp_recv_count)
    CLEAR_ARR(_rtp_seq_now)

    _play_check_timer.reset();
    _rtp_check_timer.reset();
    _cseq_send = 1;
    _on_response = nullptr;
xzl committed
62
}
xiongziliang committed
63

xiongziliang committed
64
void RtspPlayer::play(const string &strUrl){
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    RtspUrl url;
    if(!url.parse(strUrl)){
        onPlayResult_l(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl),false);
        return;
    }

    teardown();

    if (url._user.size()) {
        (*this)[kRtspUser] = url._user;
    }
    if (url._passwd.size()) {
        (*this)[kRtspPwd] = url._passwd;
        (*this)[kRtspPwdIsMD5] = false;
    }

81 82 83
    _play_url = url._url;
    _rtp_type = (Rtsp::eRtpType)(int)(*this)[kRtpType];
    DebugL << url._url << " " << (url._user.size() ? url._user : "null") << " " << (url._passwd.size() ? url._passwd : "null") << " " << _rtp_type;
84 85 86

    weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
    float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
87
    _play_check_timer.reset(new Timer(playTimeOutSec, [weakSelf]() {
88 89 90 91 92 93
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return false;
        }
        strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout"),false);
        return false;
94
    }, getPoller()));
95 96 97 98 99

    if(!(*this)[kNetAdapter].empty()){
        setNetAdapter((*this)[kNetAdapter]);
    }
    startConnect(url._host, url._port, playTimeOutSec);
100
}
101

102
void RtspPlayer::onConnect(const SockException &err){
103 104 105 106
    if(err.getErrCode() != Err_success) {
        onPlayResult_l(err,false);
        return;
    }
107
    sendOptions();
xzl committed
108 109
}

110
void RtspPlayer::onRecv(const Buffer::Ptr& buf) {
111
    if(_benchmark_mode && !_play_check_timer){
112
        //在性能测试模式下,如果rtsp握手完毕后,不再解析rtp包
113
        _rtp_recv_ticker.resetTime();
114 115
        return;
    }
116
    input(buf->data(), buf->size());
xzl committed
117
}
xiongziliang committed
118

xzl committed
119
void RtspPlayer::onErr(const SockException &ex) {
120
    //定时器_pPlayTimer为空后表明握手结束了
121
    onPlayResult_l(ex,!_play_check_timer);
xzl committed
122
}
xiongziliang committed
123

124 125
// from live555
bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) {
126
    if(!_realm.empty()){
127 128 129 130
        //已经认证过了
        return false;
    }

xiongziliang committed
131 132 133 134 135 136 137 138
    char *realm = new char[paramsStr.size()];
    char *nonce = new char[paramsStr.size()];
    char *stale = new char[paramsStr.size()];
    onceToken token(nullptr,[&](){
        delete[] realm;
        delete[] nonce;
        delete[] stale;
    });
139 140

    if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) {
141 142
        _realm = (const char *)realm;
        _md5_nonce = (const char *)nonce;
143 144 145
        return true;
    }
    if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) {
146 147
        _realm = (const char *)realm;
        _md5_nonce = (const char *)nonce;
148 149 150
        return true;
    }
    if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) {
151
        _realm = (const char *)realm;
152 153 154 155
        return true;
    }
    return false;
}
xiongziliang committed
156

157
bool RtspPlayer::handleResponse(const string &cmd, const Parser &parser){
158 159 160
    string authInfo = parser["WWW-Authenticate"];
    //发送DESCRIBE命令后的回复
    if ((parser.Url() == "401") && handleAuthenticationFailure(authInfo)) {
161
        sendOptions();
162
        return false;
163 164 165 166 167 168 169
    }
    if(parser.Url() == "302" || parser.Url() == "301"){
        auto newUrl = parser["Location"];
        if(newUrl.empty()){
            throw std::runtime_error("未找到Location字段(跳转url)");
        }
        play(newUrl);
170
        return false;
171 172
    }
    if (parser.Url() != "200") {
173
        throw std::runtime_error(StrPrinter << cmd << ":" << parser.Url() << " " << parser.Tail() << endl);
174
    }
175 176
    return true;
}
177

178 179 180 181 182
void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
    if (!handleResponse("DESCRIBE", parser)) {
        return;
    }
    _content_base = parser["Content-Base"];
183 184
    if(_content_base.empty()){
        _content_base = _play_url;
185
    }
186 187
    if (_content_base.back() == '/') {
        _content_base.pop_back();
188 189
    }

190 191
    SdpParser sdpParser(parser.Content());
    //解析sdp
192
    _sdp_track = sdpParser.getAvailableTrack();
193
    auto title = sdpParser.getTrack(TrackTitle);
194
    bool is_play_back = false;
195
    if(title && title->_duration ){
196
        is_play_back = true;
xiongziliang committed
197
    }
xiongziliang committed
198

199
    if (_sdp_track.empty()) {
200 201 202 203 204
        throw std::runtime_error("无有效的Sdp Track");
    }
    if (!onCheckSDP(sdpParser.toString())) {
        throw std::runtime_error("onCheckSDP faied");
    }
xiongziliang committed
205

206
    sendSetup(0);
xzl committed
207
}
208

xiongziliang committed
209
//有必要的情况下创建udp端口
210
void RtspPlayer::createUdpSockIfNecessary(int track_idx){
211 212
    auto &rtpSockRef = _rtp_sock[track_idx];
    auto &rtcpSockRef = _rtcp_sock[track_idx];
213 214 215 216
    if (!rtpSockRef || !rtcpSockRef) {
        auto pr = makeSockPair(getPoller(), get_local_ip());
        rtpSockRef = pr.first;
        rtcpSockRef = pr.second;
217
    }
218 219
}

xzl committed
220
//发送SETUP命令
221 222 223
void RtspPlayer::sendSetup(unsigned int track_idx) {
    _on_response = std::bind(&RtspPlayer::handleResSETUP, this, placeholders::_1, track_idx);
    auto &track = _sdp_track[track_idx];
224 225
    auto baseUrl = _content_base + "/" + track->_control_surffix;
    switch (_rtp_type) {
226 227 228 229 230 231 232 233 234
        case Rtsp::RTP_TCP: {
            sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1});
        }
            break;
        case Rtsp::RTP_MULTICAST: {
            sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"});
        }
            break;
        case Rtsp::RTP_UDP: {
235
            createUdpSockIfNecessary(track_idx);
236 237
            sendRtspRequest("SETUP", baseUrl, {"Transport",
                                               StrPrinter << "RTP/AVP;unicast;client_port="
238 239
                                                          << _rtp_sock[track_idx]->get_local_port() << "-"
                                                          << _rtcp_sock[track_idx]->get_local_port()});
240 241 242 243 244
        }
            break;
        default:
            break;
    }
xzl committed
245 246
}

247
void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) {
248 249 250 251
    if (parser.Url() != "200") {
        throw std::runtime_error(
        StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl);
    }
252
    if (track_idx == 0) {
253 254 255
        _session_id = parser["Session"];
        _session_id.append(";");
        _session_id = FindField(_session_id.data(), nullptr, ";");
256 257 258 259
    }

    auto strTransport = parser["Transport"];
    if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){
260
        _rtp_type = Rtsp::RTP_TCP;
261
    }else if(strTransport.find("multicast") != string::npos){
262
        _rtp_type = Rtsp::RTP_MULTICAST;
263
    }else{
264
        _rtp_type = Rtsp::RTP_UDP;
265 266
    }

267
    RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP);
268

269
    if(_rtp_type == Rtsp::RTP_TCP)  {
270
        string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-");
271
        _sdp_track[track_idx]->_interleaved = atoi(interleaved.data());
272
    }else{
273
        const char *strPos = (_rtp_type == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ;
274 275
        auto port_str = FindField((strTransport + ";").data(), strPos, ";");
        uint16_t rtp_port = atoi(FindField(port_str.data(), NULL, "-").data());
276
        uint16_t rtcp_port = atoi(FindField(port_str.data(), "-",NULL).data());
277 278
        auto &pRtpSockRef = _rtp_sock[track_idx];
        auto &pRtcpSockRef = _rtcp_sock[track_idx];
xiongziliang committed
279

280
        if (_rtp_type == Rtsp::RTP_MULTICAST) {
281 282
            //udp组播
            auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";");
283
            pRtpSockRef.reset(new Socket(getPoller()));
284 285 286 287 288 289 290 291 292
            if (!pRtpSockRef->bindUdpSock(rtp_port, multiAddr.data())) {
                pRtpSockRef.reset();
                throw std::runtime_error("open udp sock err");
            }
            auto fd = pRtpSockRef->rawFD();
            if (-1 == SockUtil::joinMultiAddrFilter(fd, multiAddr.data(), get_peer_ip().data(),get_local_ip().data())) {
                SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data());
            }
        } else {
293
            createUdpSockIfNecessary(track_idx);
294 295 296 297 298 299 300 301 302 303
            //udp单播
            struct sockaddr_in rtpto;
            rtpto.sin_port = ntohs(rtp_port);
            rtpto.sin_family = AF_INET;
            rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
            pRtpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
            //发送rtp打洞包
            pRtpSockRef->send("\xce\xfa\xed\xfe", 4);

            //设置rtcp发送目标,为后续发送rtcp做准备
304 305 306 307
            rtpto.sin_port = ntohs(rtcp_port);
            rtpto.sin_family = AF_INET;
            rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
            pRtcpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
308
        }
309 310 311 312

        auto srcIP = inet_addr(get_peer_ip().data());
        weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
        //设置rtp over udp接收回调处理函数
313
        pRtpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
314 315 316 317 318
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
            }
            if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
319
                WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
320 321
                return;
            }
322 323
            strongSelf->handleOneRtp(track_idx, strongSelf->_sdp_track[track_idx]->_type,
                                     strongSelf->_sdp_track[track_idx]->_samplerate, (unsigned char *) buf->data(), buf->size());
324 325 326 327
        });

        if(pRtcpSockRef) {
            //设置rtcp over udp接收回调处理函数
328
            pRtcpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
329 330 331 332 333
                auto strongSelf = weakSelf.lock();
                if (!strongSelf) {
                    return;
                }
                if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
334
                    WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
335 336
                    return;
                }
337
                strongSelf->onRtcpPacket(track_idx, strongSelf->_sdp_track[track_idx], (unsigned char *) buf->data(), buf->size());
338 339
            });
        }
340 341
    }

342
    if (track_idx < _sdp_track.size() - 1) {
343
        //需要继续发送SETUP命令
344
        sendSetup(track_idx + 1);
345 346 347 348
        return;
    }
    //所有setup命令发送完毕
    //发送play命令
349
    sendPause(type_play, 0);
xzl committed
350
}
351

352
void RtspPlayer::sendDescribe() {
353
    //发送DESCRIBE命令后处理函数:handleResDESCRIBE
354 355
    _on_response = std::bind(&RtspPlayer::handleResDESCRIBE, this, placeholders::_1);
    sendRtspRequest("DESCRIBE", _play_url, {"Accept", "application/sdp"});
356 357
}

358
void RtspPlayer::sendOptions(){
359
    _on_response = [this](const Parser& parser){
360 361
        if (!handleResponse("OPTIONS", parser)) {
            return;
362 363 364 365 366 367 368 369 370 371 372
        }
        //获取服务器支持的命令
        _supported_cmd.clear();
        auto public_val = split(parser["Public"],",");
        for(auto &cmd : public_val){
            trim(cmd);
            _supported_cmd.emplace(cmd);
        }
        //发送Describe请求,获取sdp
        sendDescribe();
    };
373
    sendRtspRequest("OPTIONS", _play_url);
374 375 376
}

void RtspPlayer::sendKeepAlive(){
377
    _on_response = [this](const Parser& parser){};
378 379
    if(_supported_cmd.find("GET_PARAMETER") != _supported_cmd.end()){
        //支持GET_PARAMETER,用此命令保活
380
        sendRtspRequest("GET_PARAMETER", _play_url);
381 382
    }else{
        //不支持GET_PARAMETER,用OPTIONS命令保活
383
        sendRtspRequest("OPTIONS", _play_url);
384
    }
385 386
}

387
void RtspPlayer::sendPause(int type , uint32_t seekMS){
388
    _on_response = std::bind(&RtspPlayer::handleResPAUSE, this, placeholders::_1, type);
389 390 391
    //开启或暂停rtsp
    switch (type){
        case type_pause:
392
            sendRtspRequest("PAUSE", _content_base);
393 394
            break;
        case type_play:
395
            sendRtspRequest("PLAY", _content_base);
396 397
            break;
        case type_seek:
398
            sendRtspRequest("PLAY", _content_base, {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"});
399 400 401
            break;
        default:
            WarnL << "unknown type : " << type;
402
            _on_response = nullptr;
403 404
            break;
    }
xzl committed
405
}
xiongziliang committed
406

407 408
void RtspPlayer::pause(bool pause_flag) {
    sendPause(pause_flag ? type_pause : type_seek, getProgressMilliSecond());
xzl committed
409 410
}

411
void RtspPlayer::handleResPAUSE(const Parser& parser,int type) {
412 413 414 415 416 417 418
    if (parser.Url() != "200") {
        switch (type) {
            case type_pause:
                WarnL << "Pause failed:" << parser.Url() << " " << parser.Tail() << endl;
                break;
            case type_play:
                WarnL << "Play failed:" << parser.Url() << " " << parser.Tail() << endl;
419
                onPlayResult_l(SockException(Err_shutdown, StrPrinter << "rtsp play failed:" << parser.Url() << " " << parser.Tail() ), !_play_check_timer);
420 421 422 423 424 425 426 427 428 429
                break;
            case type_seek:
                WarnL << "Seek failed:" << parser.Url() << " " << parser.Tail() << endl;
                break;
        }
        return;
    }

    if (type == type_pause) {
        //暂停成功!
430
        _rtp_check_timer.reset();
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
        return;
    }

    //play或seek成功
    uint32_t iSeekTo = 0;
    //修正时间轴
    auto strRange = parser["Range"];
    if (strRange.size()) {
        auto strStart = FindField(strRange.data(), "npt=", "-");
        if (strStart == "now") {
            strStart = "0";
        }
        iSeekTo = 1000 * atof(strStart.data());
        DebugL << "seekTo(ms):" << iSeekTo;
    }
446 447

    onPlayResult_l(SockException(Err_success, type == type_seek ? "resum rtsp success" : "rtsp play success"), !_play_check_timer);
xzl committed
448 449
}

450 451
void RtspPlayer::onWholeRtspPacket(Parser &parser) {
    try {
452 453 454 455
        decltype(_on_response) func;
        _on_response.swap(func);
        if(func){
            func(parser);
456 457 458
        }
        parser.Clear();
    } catch (std::exception &err) {
459
        //定时器_pPlayTimer为空后表明握手结束了
460
        onPlayResult_l(SockException(Err_other, err.what()),!_play_check_timer);
xzl committed
461 462 463
    }
}

464 465 466
void RtspPlayer::onRtpPacket(const char *data, uint64_t len) {
    int trackIdx = -1;
    uint8_t interleaved = data[1];
467
    if(interleaved %2 == 0){
468
        trackIdx = getTrackIndexByInterleaved(interleaved);
xiongziliang committed
469
        handleOneRtp(trackIdx, _sdp_track[trackIdx]->_type, _sdp_track[trackIdx]->_samplerate, (unsigned char *)data + 4, len - 4);
470 471
    }else{
        trackIdx = getTrackIndexByInterleaved(interleaved - 1);
472
        onRtcpPacket(trackIdx, _sdp_track[trackIdx], (unsigned char *) data + 4, len - 4);
473
    }
xzl committed
474 475
}

xiongziliang committed
476
//此处预留rtcp处理函数
477
void RtspPlayer::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){}
xiongziliang committed
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536

#if 0
//改代码提取自FFmpeg,参考之
// Receiver Report
    avio_w8(pb, (RTP_VERSION << 6) + 1); /* 1 report block */
    avio_w8(pb, RTCP_RR);
    avio_wb16(pb, 7); /* length in words - 1 */
    // our own SSRC: we use the server's SSRC + 1 to avoid conflicts
    avio_wb32(pb, s->ssrc + 1);
    avio_wb32(pb, s->ssrc); // server SSRC
    // some placeholders we should really fill...
    // RFC 1889/p64
    extended_max          = stats->cycles + stats->max_seq;
    expected              = extended_max - stats->base_seq;
    lost                  = expected - stats->received;
    lost                  = FFMIN(lost, 0xffffff); // clamp it since it's only 24 bits...
    expected_interval     = expected - stats->expected_prior;
    stats->expected_prior = expected;
    received_interval     = stats->received - stats->received_prior;
    stats->received_prior = stats->received;
    lost_interval         = expected_interval - received_interval;
    if (expected_interval == 0 || lost_interval <= 0)
        fraction = 0;
    else
        fraction = (lost_interval << 8) / expected_interval;

    fraction = (fraction << 24) | lost;

    avio_wb32(pb, fraction); /* 8 bits of fraction, 24 bits of total packets lost */
    avio_wb32(pb, extended_max); /* max sequence received */
    avio_wb32(pb, stats->jitter >> 4); /* jitter */

    if (s->last_rtcp_ntp_time == AV_NOPTS_VALUE) {
        avio_wb32(pb, 0); /* last SR timestamp */
        avio_wb32(pb, 0); /* delay since last SR */
    } else {
        uint32_t middle_32_bits   = s->last_rtcp_ntp_time >> 16; // this is valid, right? do we need to handle 64 bit values special?
        uint32_t delay_since_last = av_rescale(av_gettime_relative() - s->last_rtcp_reception_time,
                                               65536, AV_TIME_BASE);

        avio_wb32(pb, middle_32_bits); /* last SR timestamp */
        avio_wb32(pb, delay_since_last); /* delay since last SR */
    }

    // CNAME
    avio_w8(pb, (RTP_VERSION << 6) + 1); /* 1 report block */
    avio_w8(pb, RTCP_SDES);
    len = strlen(s->hostname);
    avio_wb16(pb, (7 + len + 3) / 4); /* length in words - 1 */
    avio_wb32(pb, s->ssrc + 1);
    avio_w8(pb, 0x01);
    avio_w8(pb, len);
    avio_write(pb, s->hostname, len);
    avio_w8(pb, 0); /* END */
    // padding
    for (len = (7 + len) % 4; len % 4; len++)
        avio_w8(pb, 0);
#endif

537
void RtspPlayer::sendReceiverReport(bool over_tcp, int track_idx){
xiongziliang committed
538 539 540
    static const char s_cname[] = "ZLMediaKitRtsp";
    uint8_t aui8Rtcp[4 + 32 + 10 + sizeof(s_cname) + 1] = {0};
    uint8_t *pui8Rtcp_RR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_RR + 32;
541 542
    auto &track = _sdp_track[track_idx];
    auto &counter = _rtcp_counter[track_idx];
xiongziliang committed
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567

    aui8Rtcp[0] = '$';
    aui8Rtcp[1] = track->_interleaved + 1;
    aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >>  8;
    aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;

    pui8Rtcp_RR[0] = 0x81;/* 1 report block */
    pui8Rtcp_RR[1] = 0xC9;//RTCP_RR
    pui8Rtcp_RR[2] = 0x00;
    pui8Rtcp_RR[3] = 0x07;/* length in words - 1 */

    uint32_t ssrc=htonl(track->_ssrc + 1);
    // our own SSRC: we use the server's SSRC + 1 to avoid conflicts
    memcpy(&pui8Rtcp_RR[4], &ssrc, 4);
    ssrc=htonl(track->_ssrc);
    // server SSRC
    memcpy(&pui8Rtcp_RR[8], &ssrc, 4);

    //FIXME: 8 bits of fraction, 24 bits of total packets lost
    pui8Rtcp_RR[12] = 0x00;
    pui8Rtcp_RR[13] = 0x00;
    pui8Rtcp_RR[14] = 0x00;
    pui8Rtcp_RR[15] = 0x00;

    //FIXME: max sequence received
568
    int cycleCount = getCycleCount(track_idx);
xiongziliang committed
569 570 571 572 573
    pui8Rtcp_RR[16] = cycleCount >> 8;
    pui8Rtcp_RR[17] = cycleCount & 0xFF;
    pui8Rtcp_RR[18] = counter.pktCnt >> 8;
    pui8Rtcp_RR[19] = counter.pktCnt & 0xFF;

574
    uint32_t  jitter = htonl(getJitterSize(track_idx));
xiongziliang committed
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
    //FIXME: jitter
    memcpy(pui8Rtcp_RR + 20, &jitter , 4);
    /* last SR timestamp */
    memcpy(pui8Rtcp_RR + 24, &counter.lastTimeStamp, 4);
    uint32_t msInc = htonl(ntohl(counter.timeStamp) - ntohl(counter.lastTimeStamp));
    /* delay since last SR */
    memcpy(pui8Rtcp_RR + 28, &msInc, 4);

    // CNAME
    pui8Rtcp_SDES[0] = 0x81;
    pui8Rtcp_SDES[1] = 0xCA;
    pui8Rtcp_SDES[2] = 0x00;
    pui8Rtcp_SDES[3] = 0x06;

    memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

    pui8Rtcp_SDES[8] = 0x01;
    pui8Rtcp_SDES[9] = 0x0f;
    memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
    pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;

596
    if (over_tcp) {
xiongziliang committed
597
        send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
598 599
    } else if (_rtcp_sock[track_idx]) {
        _rtcp_sock[track_idx]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
xiongziliang committed
600
    }
601
}
xzl committed
602

xiongziliang committed
603
void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
604
    //统计丢包率
605 606 607
    if (_rtp_seq_start[trackidx] == 0 || rtppt->sequence < _rtp_seq_start[trackidx]) {
        _rtp_seq_start[trackidx] = rtppt->sequence;
        _rtp_recv_count[trackidx] = 0;
608
    }
609 610 611
    _rtp_recv_count[trackidx] ++;
    _rtp_seq_now[trackidx] = rtppt->sequence;
    _stamp[trackidx] = rtppt->timeStamp;
612
    //计算相对时间戳
613
    onRecvRTP_l(rtppt, _sdp_track[trackidx]);
xzl committed
614
}
xiongziliang committed
615

616
float RtspPlayer::getPacketLossRate(TrackType type) const{
617 618 619 620 621 622 623
    try {
        auto track_idx = getTrackIndexByTrackType(type);
        if (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1 == 0) {
            return 0;
        }
        return 1.0 - (double) _rtp_recv_count[track_idx] / (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1);
    } catch (...) {
624 625
        uint64_t totalRecv = 0;
        uint64_t totalSend = 0;
626 627 628
        for (unsigned int i = 0; i < _sdp_track.size(); i++) {
            totalRecv += _rtp_recv_count[i];
            totalSend += (_rtp_seq_now[i] - _rtp_seq_start[i] + 1);
629
        }
630
        if (totalSend == 0) {
631 632
            return 0;
        }
633
        return 1.0 - (double) totalRecv / totalSend;
634
    }
xzl committed
635 636
}

637
uint32_t RtspPlayer::getProgressMilliSecond() const{
638
    return MAX(_stamp[0],_stamp[1]);
xzl committed
639
}
xiongziliang committed
640

641
void RtspPlayer::seekToMilliSecond(uint32_t ms) {
642
    sendPause(type_seek,ms);
xzl committed
643
}
644

645
void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header) {
646 647 648 649 650 651 652 653 654 655 656
    string key;
    StrCaseMap header_map;
    int i = 0;
    for(auto &val : header){
        if(++i % 2 == 0){
            header_map.emplace(key,val);
        }else{
            key = val;
        }
    }
    sendRtspRequest(cmd,url,header_map);
xiongziliang committed
657
}
xiongziliang committed
658

659
void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) {
660
    auto header = header_const;
661
    header.emplace("CSeq",StrPrinter << _cseq_send++);
xiongziliang committed
662
    header.emplace("User-Agent",SERVER_NAME);
663

664 665
    if(!_session_id.empty()){
        header.emplace("Session", _session_id);
666 667
    }

668 669
    if(!_realm.empty() && !(*this)[kRtspUser].empty()){
        if(!_md5_nonce.empty()){
670 671 672 673 674 675 676 677 678 679 680
            //MD5认证
            /*
            response计算方法如下:
            RTSP客户端应该使用username + password并计算response如下:
            (1)当password为MD5编码,则
                response = md5( password:nonce:md5(public_method:url)  );
            (2)当password为ANSI字符串,则
                response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
             */
            string encrypted_pwd = (*this)[kRtspPwd];
            if(!(*this)[kRtspPwdIsMD5].as<bool>()){
681
                encrypted_pwd = MD5((*this)[kRtspUser] + ":" + _realm + ":" + encrypted_pwd).hexdigest();
682
            }
683
            auto response = MD5(encrypted_pwd + ":" + _md5_nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest();
684 685 686
            _StrPrinter printer;
            printer << "Digest ";
            printer << "username=\"" << (*this)[kRtspUser] << "\", ";
687 688
            printer << "realm=\"" << _realm << "\", ";
            printer << "nonce=\"" << _md5_nonce << "\", ";
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
            printer << "uri=\"" << url << "\", ";
            printer << "response=\"" << response << "\"";
            header.emplace("Authorization",printer);
        }else if(!(*this)[kRtspPwdIsMD5].as<bool>()){
            //base64认证
            string authStr = StrPrinter << (*this)[kRtspUser] << ":" << (*this)[kRtspPwd];
            char authStrBase64[1024] = {0};
            av_base64_encode(authStrBase64,sizeof(authStrBase64),(uint8_t *)authStr.data(),authStr.size());
            header.emplace("Authorization",StrPrinter << "Basic " << authStrBase64 );
        }
    }

    _StrPrinter printer;
    printer << cmd << " " << url << " RTSP/1.0\r\n";
    for (auto &pr : header){
        printer << pr.first << ": " << pr.second << "\r\n";
    }
xiongziliang committed
706
    SockSender::send(printer << "\r\n");
707 708
}

709
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) {
710
    _rtp_recv_ticker.resetTime();
711
    onRecvRTP(rtp, track);
xiongziliang committed
712

713 714 715 716
    int track_idx = getTrackIndexByTrackType(rtp->type);
    RtcpCounter &counter = _rtcp_counter[track_idx];
    counter.pktCnt = rtp->sequence;
    auto &ticker = _rtcp_send_ticker[track_idx];
xiongziliang committed
717 718 719 720
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        counter.lastTimeStamp = counter.timeStamp;
        //直接保存网络字节序
721
        memcpy(&counter.timeStamp, rtp->data() + 8, 4);
722
        if (counter.lastTimeStamp != 0) {
723
            sendReceiverReport(_rtp_type == Rtsp::RTP_TCP, track_idx);
xiongziliang committed
724 725
            ticker.resetTime();
        }
726 727

        //有些rtsp服务器需要rtcp保活,有些需要发送信令保活
728
        if (track_idx == 0) {
729 730 731
            //只需要发送一次心跳信令包
            sendKeepAlive();
        }
xiongziliang committed
732
    }
733
}
xiongziliang committed
734

xiongziliang committed
735
void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshake_done) {
xiongziliang committed
736 737 738
    if (ex.getErrCode() == Err_shutdown) {
        //主动shutdown的,不触发回调
        return;
739 740
    }

xiongziliang committed
741
    WarnL << ex.getErrCode() << " " << ex.what();
xiongziliang committed
742
    if (!handshake_done) {
743
        //开始播放阶段
744
        _play_check_timer.reset();
745
        onPlayResult(ex);
746 747
        //是否为性能测试模式
        _benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
748 749 750 751 752 753
    } else if (ex) {
        //播放成功后异常断开回调
        onShutdown(ex);
    } else {
        //恢复播放
        onResume();
754 755
    }

xiongziliang committed
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
    if (!ex) {
        //播放成功,恢复rtp接收超时定时器
        _rtp_recv_ticker.resetTime();
        int timeoutMS = (*this)[kMediaTimeoutMS].as<int>();
        weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
        auto lam = [weakSelf, timeoutMS]() {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return false;
            }
            if (strongSelf->_rtp_recv_ticker.elapsedTime() > timeoutMS) {
                //接收rtp媒体数据包超时
                strongSelf->onPlayResult_l(SockException(Err_timeout, "receive rtp timeout"), true);
                return false;
            }
            return true;
        };
        //创建rtp数据接收超时检测定时器
        _rtp_check_timer = std::make_shared<Timer>(timeoutMS / 2000.0, lam, getPoller());
    } else {
776 777
        teardown();
    }
778 779
}

780 781 782
int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const {
    for (unsigned int i = 0; i < _sdp_track.size(); i++) {
        if (_sdp_track[i]->_interleaved == interleaved) {
783 784 785
            return i;
        }
    }
786
    if (_sdp_track.size() == 1) {
787 788
        return 0;
    }
789
    throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
xiongziliang committed
790 791
}

792
int RtspPlayer::getTrackIndexByTrackType(TrackType track_type) const {
793
    for (unsigned int i = 0; i < _sdp_track.size(); i++) {
794
        if (_sdp_track[i]->_type == track_type) {
795 796 797
            return i;
        }
    }
798
    if (_sdp_track.size() == 1) {
799 800
        return 0;
    }
801
    throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) track_type);
802 803
}

xiongziliang committed
804
} /* namespace mediakit */