RtspPlayer.cpp 28.2 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xiongziliang committed
9
 */
xiongziliang committed
10

xzl committed
11 12 13
#include <set>
#include <cmath>
#include <algorithm>
xiongziliang committed
14
#include <iomanip>
xiongziliang committed
15
#include "Common/config.h"
xiongzilaing committed
16
#include "RtspPlayer.h"
17
#include "Util/MD5.h"
18
#include "Util/util.h"
xiongziliang committed
19
#include "Util/base64.h"
xzl committed
20
#include "Network/sockutil.h"
xiongziliang committed
21
using namespace toolkit;
xiongziliang committed
22
using namespace mediakit::Client;
xzl committed
23

xiongziliang committed
24
namespace mediakit {
xzl committed
25

26
enum PlayType {
27 28 29
    type_play = 0,
    type_pause,
    type_seek
30 31
};

32
RtspPlayer::RtspPlayer(const EventPoller::Ptr &poller) : TcpClient(poller){
33
    RtpReceiver::setPoolSize(64);
xzl committed
34 35
}
RtspPlayer::~RtspPlayer(void) {
36
    DebugL << endl;
xzl committed
37 38
}
void RtspPlayer::teardown(){
39
    if (alive()) {
40
        sendRtspRequest("TEARDOWN" ,_content_base);
41 42 43
        shutdown(SockException(Err_shutdown,"teardown"));
    }

44 45 46 47 48
    _md5_nonce.clear();
    _realm.clear();
    _sdp_track.clear();
    _session_id.clear();
    _content_base.clear();
49
    RtpReceiver::clear();
xiongziliang committed
50

51 52 53 54 55 56 57 58 59 60 61
    CLEAR_ARR(_rtp_sock);
    CLEAR_ARR(_rtcp_sock);
    CLEAR_ARR(_rtp_seq_start)
    CLEAR_ARR(_rtp_recv_count)
    CLEAR_ARR(_rtp_recv_count)
    CLEAR_ARR(_rtp_seq_now)

    _play_check_timer.reset();
    _rtp_check_timer.reset();
    _cseq_send = 1;
    _on_response = nullptr;
xzl committed
62
}
xiongziliang committed
63

xiongziliang committed
64
void RtspPlayer::play(const string &strUrl){
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    RtspUrl url;
    if(!url.parse(strUrl)){
        onPlayResult_l(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl),false);
        return;
    }

    teardown();

    if (url._user.size()) {
        (*this)[kRtspUser] = url._user;
    }
    if (url._passwd.size()) {
        (*this)[kRtspPwd] = url._passwd;
        (*this)[kRtspPwdIsMD5] = false;
    }

81 82 83
    _play_url = url._url;
    _rtp_type = (Rtsp::eRtpType)(int)(*this)[kRtpType];
    DebugL << url._url << " " << (url._user.size() ? url._user : "null") << " " << (url._passwd.size() ? url._passwd : "null") << " " << _rtp_type;
84 85 86

    weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
    float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
87
    _play_check_timer.reset(new Timer(playTimeOutSec, [weakSelf]() {
88 89 90 91 92 93
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return false;
        }
        strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout"),false);
        return false;
94
    }, getPoller()));
95 96 97 98 99

    if(!(*this)[kNetAdapter].empty()){
        setNetAdapter((*this)[kNetAdapter]);
    }
    startConnect(url._host, url._port, playTimeOutSec);
100
}
101

102
void RtspPlayer::onConnect(const SockException &err){
103 104 105 106
    if(err.getErrCode() != Err_success) {
        onPlayResult_l(err,false);
        return;
    }
107
    sendOptions();
xzl committed
108 109
}

110
void RtspPlayer::onRecv(const Buffer::Ptr& buf) {
111
    if(_benchmark_mode && !_play_check_timer){
112
        //在性能测试模式下,如果rtsp握手完毕后,不再解析rtp包
113
        _rtp_recv_ticker.resetTime();
114 115
        return;
    }
116
    input(buf->data(), buf->size());
xzl committed
117
}
xiongziliang committed
118

xzl committed
119
void RtspPlayer::onErr(const SockException &ex) {
120
    //定时器_pPlayTimer为空后表明握手结束了
121
    onPlayResult_l(ex,!_play_check_timer);
xzl committed
122
}
xiongziliang committed
123

124 125
// from live555
bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) {
126
    if(!_realm.empty()){
127 128 129 130
        //已经认证过了
        return false;
    }

xiongziliang committed
131 132 133 134 135 136 137 138
    char *realm = new char[paramsStr.size()];
    char *nonce = new char[paramsStr.size()];
    char *stale = new char[paramsStr.size()];
    onceToken token(nullptr,[&](){
        delete[] realm;
        delete[] nonce;
        delete[] stale;
    });
139 140

    if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) {
141 142
        _realm = (const char *)realm;
        _md5_nonce = (const char *)nonce;
143 144 145
        return true;
    }
    if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) {
146 147
        _realm = (const char *)realm;
        _md5_nonce = (const char *)nonce;
148 149 150
        return true;
    }
    if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) {
151
        _realm = (const char *)realm;
152 153 154 155
        return true;
    }
    return false;
}
xiongziliang committed
156

157
bool RtspPlayer::handleResponse(const string &cmd, const Parser &parser){
158 159 160
    string authInfo = parser["WWW-Authenticate"];
    //发送DESCRIBE命令后的回复
    if ((parser.Url() == "401") && handleAuthenticationFailure(authInfo)) {
161
        sendOptions();
162
        return false;
163 164 165 166 167 168 169
    }
    if(parser.Url() == "302" || parser.Url() == "301"){
        auto newUrl = parser["Location"];
        if(newUrl.empty()){
            throw std::runtime_error("未找到Location字段(跳转url)");
        }
        play(newUrl);
170
        return false;
171 172
    }
    if (parser.Url() != "200") {
173
        throw std::runtime_error(StrPrinter << cmd << ":" << parser.Url() << " " << parser.Tail() << endl);
174
    }
175 176
    return true;
}
177

178 179 180 181 182
void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
    if (!handleResponse("DESCRIBE", parser)) {
        return;
    }
    _content_base = parser["Content-Base"];
183 184
    if(_content_base.empty()){
        _content_base = _play_url;
185
    }
186 187
    if (_content_base.back() == '/') {
        _content_base.pop_back();
188 189
    }

190 191
    SdpParser sdpParser(parser.Content());
    //解析sdp
192
    _sdp_track = sdpParser.getAvailableTrack();
193
    auto title = sdpParser.getTrack(TrackTitle);
194
    bool is_play_back = false;
195
    if(title && title->_duration ){
196
        is_play_back = true;
xiongziliang committed
197
    }
xiongziliang committed
198

199
    if (_sdp_track.empty()) {
200 201 202 203 204
        throw std::runtime_error("无有效的Sdp Track");
    }
    if (!onCheckSDP(sdpParser.toString())) {
        throw std::runtime_error("onCheckSDP faied");
    }
xiongziliang committed
205

206
    sendSetup(0);
xzl committed
207
}
208

xiongziliang committed
209
//有必要的情况下创建udp端口
210
void RtspPlayer::createUdpSockIfNecessary(int track_idx){
211 212
    auto &rtpSockRef = _rtp_sock[track_idx];
    auto &rtcpSockRef = _rtcp_sock[track_idx];
213
    if (!rtpSockRef || !rtcpSockRef) {
214 215
        std::pair<Socket::Ptr, Socket::Ptr> pr = std::make_pair(createSocket(), createSocket());
        makeSockPair(pr, get_local_ip());
216 217
        rtpSockRef = pr.first;
        rtcpSockRef = pr.second;
218
    }
219 220
}

xzl committed
221
//发送SETUP命令
222 223 224
void RtspPlayer::sendSetup(unsigned int track_idx) {
    _on_response = std::bind(&RtspPlayer::handleResSETUP, this, placeholders::_1, track_idx);
    auto &track = _sdp_track[track_idx];
225
    auto baseUrl = _content_base + "/" + track->_control_surffix;
226 227 228
    if (track->_control.find("://") != string::npos) {
        baseUrl = track->_control;
    }
229
    switch (_rtp_type) {
230 231 232 233 234 235 236 237 238
        case Rtsp::RTP_TCP: {
            sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1});
        }
            break;
        case Rtsp::RTP_MULTICAST: {
            sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"});
        }
            break;
        case Rtsp::RTP_UDP: {
239
            createUdpSockIfNecessary(track_idx);
240 241
            sendRtspRequest("SETUP", baseUrl, {"Transport",
                                               StrPrinter << "RTP/AVP;unicast;client_port="
242 243
                                                          << _rtp_sock[track_idx]->get_local_port() << "-"
                                                          << _rtcp_sock[track_idx]->get_local_port()});
244 245 246 247 248
        }
            break;
        default:
            break;
    }
xzl committed
249 250
}

251
void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) {
252 253 254 255
    if (parser.Url() != "200") {
        throw std::runtime_error(
        StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl);
    }
256
    if (track_idx == 0) {
257 258 259
        _session_id = parser["Session"];
        _session_id.append(";");
        _session_id = FindField(_session_id.data(), nullptr, ";");
260 261 262 263
    }

    auto strTransport = parser["Transport"];
    if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){
264
        _rtp_type = Rtsp::RTP_TCP;
265
    }else if(strTransport.find("multicast") != string::npos){
266
        _rtp_type = Rtsp::RTP_MULTICAST;
267
    }else{
268
        _rtp_type = Rtsp::RTP_UDP;
269 270
    }

271
    RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP);
272

273
    if(_rtp_type == Rtsp::RTP_TCP)  {
274
        string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-");
275
        _sdp_track[track_idx]->_interleaved = atoi(interleaved.data());
276
    }else{
277
        const char *strPos = (_rtp_type == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ;
278 279
        auto port_str = FindField((strTransport + ";").data(), strPos, ";");
        uint16_t rtp_port = atoi(FindField(port_str.data(), NULL, "-").data());
280
        uint16_t rtcp_port = atoi(FindField(port_str.data(), "-",NULL).data());
281 282
        auto &pRtpSockRef = _rtp_sock[track_idx];
        auto &pRtcpSockRef = _rtcp_sock[track_idx];
xiongziliang committed
283

284
        if (_rtp_type == Rtsp::RTP_MULTICAST) {
285 286
            //udp组播
            auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";");
287
            pRtpSockRef = createSocket();
288 289 290 291 292 293 294 295 296
            if (!pRtpSockRef->bindUdpSock(rtp_port, multiAddr.data())) {
                pRtpSockRef.reset();
                throw std::runtime_error("open udp sock err");
            }
            auto fd = pRtpSockRef->rawFD();
            if (-1 == SockUtil::joinMultiAddrFilter(fd, multiAddr.data(), get_peer_ip().data(),get_local_ip().data())) {
                SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data());
            }
        } else {
297
            createUdpSockIfNecessary(track_idx);
298 299 300 301 302 303 304 305 306 307
            //udp单播
            struct sockaddr_in rtpto;
            rtpto.sin_port = ntohs(rtp_port);
            rtpto.sin_family = AF_INET;
            rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
            pRtpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
            //发送rtp打洞包
            pRtpSockRef->send("\xce\xfa\xed\xfe", 4);

            //设置rtcp发送目标,为后续发送rtcp做准备
308 309 310 311
            rtpto.sin_port = ntohs(rtcp_port);
            rtpto.sin_family = AF_INET;
            rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
            pRtcpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
312
        }
313 314 315 316

        auto srcIP = inet_addr(get_peer_ip().data());
        weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
        //设置rtp over udp接收回调处理函数
317
        pRtpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
318 319 320 321 322
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
            }
            if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
323
                WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
324 325
                return;
            }
326 327
            strongSelf->handleOneRtp(track_idx, strongSelf->_sdp_track[track_idx]->_type,
                                     strongSelf->_sdp_track[track_idx]->_samplerate, (unsigned char *) buf->data(), buf->size());
328 329 330 331
        });

        if(pRtcpSockRef) {
            //设置rtcp over udp接收回调处理函数
332
            pRtcpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
333 334 335 336 337
                auto strongSelf = weakSelf.lock();
                if (!strongSelf) {
                    return;
                }
                if (((struct sockaddr_in *) addr)->sin_addr.s_addr != srcIP) {
338
                    WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
339 340
                    return;
                }
341
                strongSelf->onRtcpPacket(track_idx, strongSelf->_sdp_track[track_idx], (unsigned char *) buf->data(), buf->size());
342 343
            });
        }
344 345
    }

346
    if (track_idx < _sdp_track.size() - 1) {
347
        //需要继续发送SETUP命令
348
        sendSetup(track_idx + 1);
349 350 351 352
        return;
    }
    //所有setup命令发送完毕
    //发送play命令
353
    sendPause(type_play, 0);
xzl committed
354
}
355

356
void RtspPlayer::sendDescribe() {
357
    //发送DESCRIBE命令后处理函数:handleResDESCRIBE
358 359
    _on_response = std::bind(&RtspPlayer::handleResDESCRIBE, this, placeholders::_1);
    sendRtspRequest("DESCRIBE", _play_url, {"Accept", "application/sdp"});
360 361
}

362
void RtspPlayer::sendOptions(){
363
    _on_response = [this](const Parser& parser){
364 365
        if (!handleResponse("OPTIONS", parser)) {
            return;
366 367 368 369 370 371 372 373 374 375 376
        }
        //获取服务器支持的命令
        _supported_cmd.clear();
        auto public_val = split(parser["Public"],",");
        for(auto &cmd : public_val){
            trim(cmd);
            _supported_cmd.emplace(cmd);
        }
        //发送Describe请求,获取sdp
        sendDescribe();
    };
377
    sendRtspRequest("OPTIONS", _play_url);
378 379 380
}

void RtspPlayer::sendKeepAlive(){
381
    _on_response = [this](const Parser& parser){};
382 383
    if(_supported_cmd.find("GET_PARAMETER") != _supported_cmd.end()){
        //支持GET_PARAMETER,用此命令保活
384
        sendRtspRequest("GET_PARAMETER", _play_url);
385 386
    }else{
        //不支持GET_PARAMETER,用OPTIONS命令保活
387
        sendRtspRequest("OPTIONS", _play_url);
388
    }
389 390
}

391
void RtspPlayer::sendPause(int type , uint32_t seekMS){
392
    _on_response = std::bind(&RtspPlayer::handleResPAUSE, this, placeholders::_1, type);
393 394 395
    //开启或暂停rtsp
    switch (type){
        case type_pause:
396
            sendRtspRequest("PAUSE", _content_base);
397 398
            break;
        case type_play:
399
            sendRtspRequest("PLAY", _content_base);
400 401
            break;
        case type_seek:
402
            sendRtspRequest("PLAY", _content_base, {"Range",StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-"});
403 404 405
            break;
        default:
            WarnL << "unknown type : " << type;
406
            _on_response = nullptr;
407 408
            break;
    }
xzl committed
409
}
xiongziliang committed
410

411 412
void RtspPlayer::pause(bool pause_flag) {
    sendPause(pause_flag ? type_pause : type_seek, getProgressMilliSecond());
xzl committed
413 414
}

415
void RtspPlayer::handleResPAUSE(const Parser& parser,int type) {
416 417 418 419 420 421 422
    if (parser.Url() != "200") {
        switch (type) {
            case type_pause:
                WarnL << "Pause failed:" << parser.Url() << " " << parser.Tail() << endl;
                break;
            case type_play:
                WarnL << "Play failed:" << parser.Url() << " " << parser.Tail() << endl;
423
                onPlayResult_l(SockException(Err_shutdown, StrPrinter << "rtsp play failed:" << parser.Url() << " " << parser.Tail() ), !_play_check_timer);
424 425 426 427 428 429 430 431 432 433
                break;
            case type_seek:
                WarnL << "Seek failed:" << parser.Url() << " " << parser.Tail() << endl;
                break;
        }
        return;
    }

    if (type == type_pause) {
        //暂停成功!
434
        _rtp_check_timer.reset();
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
        return;
    }

    //play或seek成功
    uint32_t iSeekTo = 0;
    //修正时间轴
    auto strRange = parser["Range"];
    if (strRange.size()) {
        auto strStart = FindField(strRange.data(), "npt=", "-");
        if (strStart == "now") {
            strStart = "0";
        }
        iSeekTo = 1000 * atof(strStart.data());
        DebugL << "seekTo(ms):" << iSeekTo;
    }
450 451

    onPlayResult_l(SockException(Err_success, type == type_seek ? "resum rtsp success" : "rtsp play success"), !_play_check_timer);
xzl committed
452 453
}

454 455
void RtspPlayer::onWholeRtspPacket(Parser &parser) {
    try {
456 457 458 459
        decltype(_on_response) func;
        _on_response.swap(func);
        if(func){
            func(parser);
460 461 462
        }
        parser.Clear();
    } catch (std::exception &err) {
463
        //定时器_pPlayTimer为空后表明握手结束了
464
        onPlayResult_l(SockException(Err_other, err.what()),!_play_check_timer);
xzl committed
465 466 467
    }
}

468 469 470
void RtspPlayer::onRtpPacket(const char *data, uint64_t len) {
    int trackIdx = -1;
    uint8_t interleaved = data[1];
471
    if(interleaved %2 == 0){
472
        trackIdx = getTrackIndexByInterleaved(interleaved);
xiongziliang committed
473
        handleOneRtp(trackIdx, _sdp_track[trackIdx]->_type, _sdp_track[trackIdx]->_samplerate, (unsigned char *)data + 4, len - 4);
474 475
    }else{
        trackIdx = getTrackIndexByInterleaved(interleaved - 1);
476
        onRtcpPacket(trackIdx, _sdp_track[trackIdx], (unsigned char *) data + 4, len - 4);
477
    }
xzl committed
478 479
}

xiongziliang committed
480
//此处预留rtcp处理函数
481
void RtspPlayer::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){}
xiongziliang committed
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540

#if 0
//改代码提取自FFmpeg,参考之
// Receiver Report
    avio_w8(pb, (RTP_VERSION << 6) + 1); /* 1 report block */
    avio_w8(pb, RTCP_RR);
    avio_wb16(pb, 7); /* length in words - 1 */
    // our own SSRC: we use the server's SSRC + 1 to avoid conflicts
    avio_wb32(pb, s->ssrc + 1);
    avio_wb32(pb, s->ssrc); // server SSRC
    // some placeholders we should really fill...
    // RFC 1889/p64
    extended_max          = stats->cycles + stats->max_seq;
    expected              = extended_max - stats->base_seq;
    lost                  = expected - stats->received;
    lost                  = FFMIN(lost, 0xffffff); // clamp it since it's only 24 bits...
    expected_interval     = expected - stats->expected_prior;
    stats->expected_prior = expected;
    received_interval     = stats->received - stats->received_prior;
    stats->received_prior = stats->received;
    lost_interval         = expected_interval - received_interval;
    if (expected_interval == 0 || lost_interval <= 0)
        fraction = 0;
    else
        fraction = (lost_interval << 8) / expected_interval;

    fraction = (fraction << 24) | lost;

    avio_wb32(pb, fraction); /* 8 bits of fraction, 24 bits of total packets lost */
    avio_wb32(pb, extended_max); /* max sequence received */
    avio_wb32(pb, stats->jitter >> 4); /* jitter */

    if (s->last_rtcp_ntp_time == AV_NOPTS_VALUE) {
        avio_wb32(pb, 0); /* last SR timestamp */
        avio_wb32(pb, 0); /* delay since last SR */
    } else {
        uint32_t middle_32_bits   = s->last_rtcp_ntp_time >> 16; // this is valid, right? do we need to handle 64 bit values special?
        uint32_t delay_since_last = av_rescale(av_gettime_relative() - s->last_rtcp_reception_time,
                                               65536, AV_TIME_BASE);

        avio_wb32(pb, middle_32_bits); /* last SR timestamp */
        avio_wb32(pb, delay_since_last); /* delay since last SR */
    }

    // CNAME
    avio_w8(pb, (RTP_VERSION << 6) + 1); /* 1 report block */
    avio_w8(pb, RTCP_SDES);
    len = strlen(s->hostname);
    avio_wb16(pb, (7 + len + 3) / 4); /* length in words - 1 */
    avio_wb32(pb, s->ssrc + 1);
    avio_w8(pb, 0x01);
    avio_w8(pb, len);
    avio_write(pb, s->hostname, len);
    avio_w8(pb, 0); /* END */
    // padding
    for (len = (7 + len) % 4; len % 4; len++)
        avio_w8(pb, 0);
#endif

541
void RtspPlayer::sendReceiverReport(bool over_tcp, int track_idx){
xiongziliang committed
542 543 544
    static const char s_cname[] = "ZLMediaKitRtsp";
    uint8_t aui8Rtcp[4 + 32 + 10 + sizeof(s_cname) + 1] = {0};
    uint8_t *pui8Rtcp_RR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_RR + 32;
545 546
    auto &track = _sdp_track[track_idx];
    auto &counter = _rtcp_counter[track_idx];
xiongziliang committed
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571

    aui8Rtcp[0] = '$';
    aui8Rtcp[1] = track->_interleaved + 1;
    aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >>  8;
    aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;

    pui8Rtcp_RR[0] = 0x81;/* 1 report block */
    pui8Rtcp_RR[1] = 0xC9;//RTCP_RR
    pui8Rtcp_RR[2] = 0x00;
    pui8Rtcp_RR[3] = 0x07;/* length in words - 1 */

    uint32_t ssrc=htonl(track->_ssrc + 1);
    // our own SSRC: we use the server's SSRC + 1 to avoid conflicts
    memcpy(&pui8Rtcp_RR[4], &ssrc, 4);
    ssrc=htonl(track->_ssrc);
    // server SSRC
    memcpy(&pui8Rtcp_RR[8], &ssrc, 4);

    //FIXME: 8 bits of fraction, 24 bits of total packets lost
    pui8Rtcp_RR[12] = 0x00;
    pui8Rtcp_RR[13] = 0x00;
    pui8Rtcp_RR[14] = 0x00;
    pui8Rtcp_RR[15] = 0x00;

    //FIXME: max sequence received
572
    int cycleCount = getCycleCount(track_idx);
xiongziliang committed
573 574 575 576 577
    pui8Rtcp_RR[16] = cycleCount >> 8;
    pui8Rtcp_RR[17] = cycleCount & 0xFF;
    pui8Rtcp_RR[18] = counter.pktCnt >> 8;
    pui8Rtcp_RR[19] = counter.pktCnt & 0xFF;

578
    uint32_t  jitter = htonl(getJitterSize(track_idx));
xiongziliang committed
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
    //FIXME: jitter
    memcpy(pui8Rtcp_RR + 20, &jitter , 4);
    /* last SR timestamp */
    memcpy(pui8Rtcp_RR + 24, &counter.lastTimeStamp, 4);
    uint32_t msInc = htonl(ntohl(counter.timeStamp) - ntohl(counter.lastTimeStamp));
    /* delay since last SR */
    memcpy(pui8Rtcp_RR + 28, &msInc, 4);

    // CNAME
    pui8Rtcp_SDES[0] = 0x81;
    pui8Rtcp_SDES[1] = 0xCA;
    pui8Rtcp_SDES[2] = 0x00;
    pui8Rtcp_SDES[3] = 0x06;

    memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

    pui8Rtcp_SDES[8] = 0x01;
    pui8Rtcp_SDES[9] = 0x0f;
    memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
    pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;

600
    if (over_tcp) {
xiongziliang committed
601
        send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
602 603
    } else if (_rtcp_sock[track_idx]) {
        _rtcp_sock[track_idx]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
xiongziliang committed
604
    }
605
}
xzl committed
606

xiongziliang committed
607
void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
608
    //统计丢包率
609 610 611
    if (_rtp_seq_start[trackidx] == 0 || rtppt->sequence < _rtp_seq_start[trackidx]) {
        _rtp_seq_start[trackidx] = rtppt->sequence;
        _rtp_recv_count[trackidx] = 0;
612
    }
613 614 615
    _rtp_recv_count[trackidx] ++;
    _rtp_seq_now[trackidx] = rtppt->sequence;
    _stamp[trackidx] = rtppt->timeStamp;
616
    //计算相对时间戳
617
    onRecvRTP_l(rtppt, _sdp_track[trackidx]);
xzl committed
618
}
xiongziliang committed
619

620
float RtspPlayer::getPacketLossRate(TrackType type) const{
621 622 623 624 625 626 627
    try {
        auto track_idx = getTrackIndexByTrackType(type);
        if (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1 == 0) {
            return 0;
        }
        return 1.0 - (double) _rtp_recv_count[track_idx] / (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1);
    } catch (...) {
628 629
        uint64_t totalRecv = 0;
        uint64_t totalSend = 0;
630 631 632
        for (unsigned int i = 0; i < _sdp_track.size(); i++) {
            totalRecv += _rtp_recv_count[i];
            totalSend += (_rtp_seq_now[i] - _rtp_seq_start[i] + 1);
633
        }
634
        if (totalSend == 0) {
635 636
            return 0;
        }
637
        return 1.0 - (double) totalRecv / totalSend;
638
    }
xzl committed
639 640
}

641
uint32_t RtspPlayer::getProgressMilliSecond() const{
642
    return MAX(_stamp[0],_stamp[1]);
xzl committed
643
}
xiongziliang committed
644

645
void RtspPlayer::seekToMilliSecond(uint32_t ms) {
646
    sendPause(type_seek,ms);
xzl committed
647
}
648

649
void RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header) {
650 651 652 653 654 655 656 657 658 659 660
    string key;
    StrCaseMap header_map;
    int i = 0;
    for(auto &val : header){
        if(++i % 2 == 0){
            header_map.emplace(key,val);
        }else{
            key = val;
        }
    }
    sendRtspRequest(cmd,url,header_map);
xiongziliang committed
661
}
xiongziliang committed
662

663
void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) {
664
    auto header = header_const;
665
    header.emplace("CSeq",StrPrinter << _cseq_send++);
xiongziliang committed
666
    header.emplace("User-Agent",SERVER_NAME);
667

668 669
    if(!_session_id.empty()){
        header.emplace("Session", _session_id);
670 671
    }

672 673
    if(!_realm.empty() && !(*this)[kRtspUser].empty()){
        if(!_md5_nonce.empty()){
674 675 676 677 678 679 680 681 682 683 684
            //MD5认证
            /*
            response计算方法如下:
            RTSP客户端应该使用username + password并计算response如下:
            (1)当password为MD5编码,则
                response = md5( password:nonce:md5(public_method:url)  );
            (2)当password为ANSI字符串,则
                response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
             */
            string encrypted_pwd = (*this)[kRtspPwd];
            if(!(*this)[kRtspPwdIsMD5].as<bool>()){
685
                encrypted_pwd = MD5((*this)[kRtspUser] + ":" + _realm + ":" + encrypted_pwd).hexdigest();
686
            }
687
            auto response = MD5(encrypted_pwd + ":" + _md5_nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest();
688 689 690
            _StrPrinter printer;
            printer << "Digest ";
            printer << "username=\"" << (*this)[kRtspUser] << "\", ";
691 692
            printer << "realm=\"" << _realm << "\", ";
            printer << "nonce=\"" << _md5_nonce << "\", ";
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709
            printer << "uri=\"" << url << "\", ";
            printer << "response=\"" << response << "\"";
            header.emplace("Authorization",printer);
        }else if(!(*this)[kRtspPwdIsMD5].as<bool>()){
            //base64认证
            string authStr = StrPrinter << (*this)[kRtspUser] << ":" << (*this)[kRtspPwd];
            char authStrBase64[1024] = {0};
            av_base64_encode(authStrBase64,sizeof(authStrBase64),(uint8_t *)authStr.data(),authStr.size());
            header.emplace("Authorization",StrPrinter << "Basic " << authStrBase64 );
        }
    }

    _StrPrinter printer;
    printer << cmd << " " << url << " RTSP/1.0\r\n";
    for (auto &pr : header){
        printer << pr.first << ": " << pr.second << "\r\n";
    }
710 711
    printer << "\r\n";
    SockSender::send(std::move(printer));
712 713
}

714
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) {
715
    _rtp_recv_ticker.resetTime();
716
    onRecvRTP(rtp, track);
xiongziliang committed
717

718 719 720 721
    int track_idx = getTrackIndexByTrackType(rtp->type);
    RtcpCounter &counter = _rtcp_counter[track_idx];
    counter.pktCnt = rtp->sequence;
    auto &ticker = _rtcp_send_ticker[track_idx];
xiongziliang committed
722 723 724 725
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        counter.lastTimeStamp = counter.timeStamp;
        //直接保存网络字节序
726
        memcpy(&counter.timeStamp, rtp->data() + 8, 4);
727
        if (counter.lastTimeStamp != 0) {
728
            sendReceiverReport(_rtp_type == Rtsp::RTP_TCP, track_idx);
xiongziliang committed
729 730
            ticker.resetTime();
        }
731 732

        //有些rtsp服务器需要rtcp保活,有些需要发送信令保活
733
        if (track_idx == 0) {
734 735 736
            //只需要发送一次心跳信令包
            sendKeepAlive();
        }
xiongziliang committed
737
    }
738
}
xiongziliang committed
739

xiongziliang committed
740
void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshake_done) {
xiongziliang committed
741 742 743
    if (ex.getErrCode() == Err_shutdown) {
        //主动shutdown的,不触发回调
        return;
744 745
    }

xiongziliang committed
746
    WarnL << ex.getErrCode() << " " << ex.what();
xiongziliang committed
747
    if (!handshake_done) {
748
        //开始播放阶段
749
        _play_check_timer.reset();
750
        onPlayResult(ex);
751 752
        //是否为性能测试模式
        _benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
753 754 755 756 757 758
    } else if (ex) {
        //播放成功后异常断开回调
        onShutdown(ex);
    } else {
        //恢复播放
        onResume();
759 760
    }

xiongziliang committed
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
    if (!ex) {
        //播放成功,恢复rtp接收超时定时器
        _rtp_recv_ticker.resetTime();
        int timeoutMS = (*this)[kMediaTimeoutMS].as<int>();
        weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
        auto lam = [weakSelf, timeoutMS]() {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return false;
            }
            if (strongSelf->_rtp_recv_ticker.elapsedTime() > timeoutMS) {
                //接收rtp媒体数据包超时
                strongSelf->onPlayResult_l(SockException(Err_timeout, "receive rtp timeout"), true);
                return false;
            }
            return true;
        };
        //创建rtp数据接收超时检测定时器
        _rtp_check_timer = std::make_shared<Timer>(timeoutMS / 2000.0, lam, getPoller());
    } else {
781 782
        teardown();
    }
783 784
}

785 786 787
int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const {
    for (unsigned int i = 0; i < _sdp_track.size(); i++) {
        if (_sdp_track[i]->_interleaved == interleaved) {
788 789 790
            return i;
        }
    }
791
    if (_sdp_track.size() == 1) {
792 793
        return 0;
    }
794
    throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
xiongziliang committed
795 796
}

797
int RtspPlayer::getTrackIndexByTrackType(TrackType track_type) const {
798
    for (unsigned int i = 0; i < _sdp_track.size(); i++) {
799
        if (_sdp_track[i]->_type == track_type) {
800 801 802
            return i;
        }
    }
803
    if (_sdp_track.size() == 1) {
804 805
        return 0;
    }
806
    throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) track_type);
807 808
}

xiongziliang committed
809
} /* namespace mediakit */