RtmpPlayer.cpp 12.9 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9 10 11
 */

#include "RtmpPlayer.h"
xiongzilaing committed
12
#include "Rtmp/utils.h"
xzl committed
13 14
#include "Util/util.h"
#include "Util/onceToken.h"
xiongzilaing committed
15
#include "Thread/ThreadPool.h"
xiongziliang committed
16
using namespace toolkit;
xiongziliang committed
17
using namespace mediakit::Client;
xiongzilaing committed
18

xiongziliang committed
19
namespace mediakit {
xzl committed
20

21
RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {
xzl committed
22 23 24
}

RtmpPlayer::~RtmpPlayer() {
25
    DebugL << endl;
xzl committed
26
}
27

xzl committed
28
void RtmpPlayer::teardown() {
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
    if (alive()) {
        shutdown(SockException(Err_shutdown,"teardown"));
    }
    _strApp.clear();
    _strStream.clear();
    _strTcUrl.clear();
    _pBeatTimer.reset();
    _pPlayTimer.reset();
    _pMediaTimer.reset();
    _iSeekTo = 0;
    RtmpProtocol::reset();

    CLEAR_ARR(_aiFistStamp);
    CLEAR_ARR(_aiNowStamp);

    lock_guard<recursive_mutex> lck(_mtxOnResultCB);
    _mapOnResultCB.clear();
    lock_guard<recursive_mutex> lck2(_mtxOnStatusCB);
    _dqOnStatusCB.clear();
xzl committed
48
}
49

xiongziliang committed
50
void RtmpPlayer::play(const string &strUrl)  {
51 52 53
    teardown();
    string strHost = FindField(strUrl.data(), "://", "/");
    _strApp = 	FindField(strUrl.data(), (strHost + "/").data(), "/");
xiongziliang committed
54
    _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
55
    _strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
xzl committed
56

57
    if (!_strApp.size() || !_strStream.size()) {
58
        onPlayResult_l(SockException(Err_other,"rtmp url非法"),false);
xzl committed
59 60
        return;
    }
61
    DebugL << strHost << " " << _strApp << " " << _strStream;
xzl committed
62

63 64
    auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
    if (iPort <= 0) {
xzl committed
65
        //rtmp 默认端口1935
66 67
        iPort = 1935;
    } else {
xzl committed
68
        //服务器域名
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
        strHost = FindField(strHost.data(), NULL, ":");
    }
    if(!(*this)[kNetAdapter].empty()){
        setNetAdapter((*this)[kNetAdapter]);
    }

    weak_ptr<RtmpPlayer> weakSelf= dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
    float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
    _pPlayTimer.reset( new Timer(playTimeOutSec, [weakSelf]() {
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return false;
        }
        strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout"),false);
        return false;
    },getPoller()));

    _metadata_got = false;
    startConnect(strHost, iPort , playTimeOutSec);
88 89
}
void RtmpPlayer::onErr(const SockException &ex){
90
    //定时器_pPlayTimer为空后表明握手结束了
91
    onPlayResult_l(ex, !_pPlayTimer);
92 93
}

94
void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) {
95
    WarnL << ex.getErrCode() << " " << ex.what();
96 97

    if(!ex){
98
        //播放成功,恢复rtmp接收超时定时器
99 100 101
        _mediaTicker.resetTime();
        weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
        int timeoutMS = (*this)[kMediaTimeoutMS].as<int>();
102
        //创建rtmp数据接收超时检测定时器
103 104 105 106 107 108
        _pMediaTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() {
            auto strongSelf=weakSelf.lock();
            if(!strongSelf) {
                return false;
            }
            if(strongSelf->_mediaTicker.elapsedTime()> timeoutMS) {
109 110
                //接收rtmp媒体数据超时
                strongSelf->onPlayResult_l(SockException(Err_timeout,"receive rtmp timeout"),true);
111 112 113 114 115 116
                return false;
            }
            return true;
        },getPoller()));
    }

117 118 119 120
    if (!handshakeCompleted) {
        //开始播放阶段
        _pPlayTimer.reset();
        onPlayResult(ex);
121 122
        //是否为性能测试模式
        _benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
123 124 125 126 127 128 129 130 131 132 133
    } else if (ex) {
        //播放成功后异常断开回调
        onShutdown(ex);
    } else {
        //恢复播放
        onResume();
    }

    if(ex){
        teardown();
    }
134 135
}
void RtmpPlayer::onConnect(const SockException &err){
136 137 138 139 140 141
    if(err.getErrCode() != Err_success) {
        onPlayResult_l(err, false);
        return;
    }
    weak_ptr<RtmpPlayer> weakSelf= dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
    startClientSession([weakSelf](){
xzl committed
142
        auto strongSelf=weakSelf.lock();
143
        if(!strongSelf) {
xzl committed
144 145
            return;
        }
146 147
        strongSelf->send_connect();
    });
xzl committed
148
}
149
void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){
150
    try {
151 152 153 154 155
        if(_benchmark_mode && !_pPlayTimer){
            //在性能测试模式下,如果rtmp握手完毕后,不再解析rtmp包
            _mediaTicker.resetTime();
            return;
        }
156 157 158
        onParseRtmp(pBuf->data(), pBuf->size());
    } catch (exception &e) {
        SockException ex(Err_other, e.what());
159
        //定时器_pPlayTimer为空后表明握手结束了
160 161
        onPlayResult_l(ex, !_pPlayTimer);
    }
xzl committed
162 163 164
}

void RtmpPlayer::pause(bool bPause) {
165
    send_pause(bPause);
xzl committed
166 167 168
}

inline void RtmpPlayer::send_connect() {
169 170 171 172 173 174 175 176 177
    AMFValue obj(AMF_OBJECT);
    obj.set("app", _strApp);
    obj.set("tcUrl", _strTcUrl);
    //未使用代理
    obj.set("fpad", false);
    //参考librtmp,什么作用?
    obj.set("capabilities", 15);
    //SUPPORT_VID_CLIENT_SEEK 支持seek
    obj.set("videoFunction", 1);
xzl committed
178
    //只支持aac
179
    obj.set("audioCodecs", (double)(0x0400));
xzl committed
180
    //只支持H264
181
    obj.set("videoCodecs", (double)(0x0080));
182 183 184 185 186 187 188 189 190 191 192 193
    sendInvoke("connect", obj);
    addOnResultCB([this](AMFDecoder &dec){
        //TraceL << "connect result";
        dec.load<AMFValue>();
        auto val = dec.load<AMFValue>();
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
        if(level != "status"){
            throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl);
        }
        send_createStream();
    });
xzl committed
194 195 196
}

inline void RtmpPlayer::send_createStream() {
197 198 199 200 201 202 203 204
    AMFValue obj(AMF_NULL);
    sendInvoke("createStream", obj);
    addOnResultCB([this](AMFDecoder &dec){
        //TraceL << "createStream result";
        dec.load<AMFValue>();
        _ui32StreamId = dec.load<int>();
        send_play();
    });
xzl committed
205 206 207
}

inline void RtmpPlayer::send_play() {
208 209 210 211 212 213 214 215 216 217 218 219 220
    AMFEncoder enc;
    enc << "play" << ++_iReqID  << nullptr << _strStream << (double)_ui32StreamId;
    sendRequest(MSG_CMD, enc.data());
    auto fun = [this](AMFValue &val){
        //TraceL << "play onStatus";
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
        if(level != "status"){
            throw std::runtime_error(StrPrinter <<"play 失败:" << level << " " << code << endl);
        }
    };
    addOnStatusCB(fun);
    addOnStatusCB(fun);
xzl committed
221 222 223
}

inline void RtmpPlayer::send_pause(bool bPause) {
224 225 226 227
    AMFEncoder enc;
    enc << "pause" << ++_iReqID  << nullptr << bPause;
    sendRequest(MSG_CMD, enc.data());
    auto fun = [this,bPause](AMFValue &val){
xzl committed
228 229 230 231 232
        //TraceL << "pause onStatus";
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
        if(level != "status") {
            if(!bPause){
233
                throw std::runtime_error(StrPrinter <<"pause 恢复播放失败:" << level << " " << code << endl);
xzl committed
234 235
            }
        }else{
236
            _bPaused = bPause;
xzl committed
237
            if(!bPause){
238
                onPlayResult_l(SockException(Err_success, "resum rtmp success"), true);
xzl committed
239 240
            }else{
                //暂停播放
241
                _pMediaTimer.reset();
xzl committed
242 243
            }
        }
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
    };
    addOnStatusCB(fun);

    _pBeatTimer.reset();
    if(bPause){
        weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
        _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0,[weakSelf](){
            auto strongSelf = weakSelf.lock();
            if (!strongSelf){
                return false;
            }
            uint32_t timeStamp = ::time(NULL);
            strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp);
            return true;
        },getPoller()));
    }
xzl committed
260 261 262
}

void RtmpPlayer::onCmd_result(AMFDecoder &dec){
263 264 265 266 267 268 269 270 271
    auto iReqId = dec.load<int>();
    lock_guard<recursive_mutex> lck(_mtxOnResultCB);
    auto it = _mapOnResultCB.find(iReqId);
    if(it != _mapOnResultCB.end()){
        it->second(dec);
        _mapOnResultCB.erase(it);
    }else{
        WarnL << "unhandled _result";
    }
xzl committed
272 273
}
void RtmpPlayer::onCmd_onStatus(AMFDecoder &dec) {
274 275 276 277 278 279 280 281 282 283
    AMFValue val;
    while(true){
        val = dec.load<AMFValue>();
        if(val.type() == AMF_OBJECT){
            break;
        }
    }
    if(val.type() != AMF_OBJECT){
        throw std::runtime_error("onStatus:the result object was not found");
    }
xzl committed
284
    
285
    lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
286 287 288 289 290 291 292 293 294 295 296 297
    if(_dqOnStatusCB.size()){
        _dqOnStatusCB.front()(val);
        _dqOnStatusCB.pop_front();
    }else{
        auto level = val["level"];
        auto code = val["code"].as_string();
        if(level.type() == AMF_STRING){
            if(level.as_string() != "status"){
                throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl);
            }
        }
        //WarnL << "unhandled onStatus:" << code;
xzl committed
298 299 300 301
    }
}

void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) {
302 303 304 305 306 307
    //TraceL;
    auto val = dec.load<AMFValue>();
    if(!onCheckMeta(val)){
        throw std::runtime_error("onCheckMeta failed");
    }
    _metadata_got = true;
xzl committed
308 309 310
}

void RtmpPlayer::onStreamDry(uint32_t ui32StreamId) {
311 312
    //TraceL << ui32StreamId;
    onPlayResult_l(SockException(Err_other,"rtmp stream dry"), true);
xzl committed
313 314
}

315
void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) {
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
    _mediaTicker.resetTime();
    if(!_pPlayTimer){
        //已经触发了onPlayResult事件,直接触发onMediaData事件
        onMediaData(packet);
        return;
    }

    if(packet->isCfgFrame()){
        //输入配置帧以便初始化完成各个track
        onMediaData(packet);
    }else{
        //先触发onPlayResult事件,这个时候解码器才能初始化完毕
        onPlayResult_l(SockException(Err_success,"play rtmp success"), false);
        //触发onPlayResult事件后,再把帧数据输入到解码器
        onMediaData(packet);
    }
332 333
}

xzl committed
334 335

void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) {
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
    typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec);
    static unordered_map<string, rtmp_func_ptr> s_func_map;
    static onceToken token([]() {
        s_func_map.emplace("_error",&RtmpPlayer::onCmd_result);
        s_func_map.emplace("_result",&RtmpPlayer::onCmd_result);
        s_func_map.emplace("onStatus",&RtmpPlayer::onCmd_onStatus);
        s_func_map.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData);
    }, []() {});

    switch (chunkData.typeId) {
        case MSG_CMD:
        case MSG_CMD3:
        case MSG_DATA:
        case MSG_DATA3: {
            AMFDecoder dec(chunkData.strBuf, 0);
            std::string type = dec.load<std::string>();
            auto it = s_func_map.find(type);
            if(it != s_func_map.end()){
                auto fun = it->second;
                (this->*fun)(dec);
            }else{
                WarnL << "can not support cmd:" << type;
            }
        }
            break;
        case MSG_AUDIO:
        case MSG_VIDEO: {
xzl committed
363
            auto idx = chunkData.typeId%2;
364
            if (_aNowStampTicker[idx].elapsedTime() > 500) {
365
                //计算播放进度时间轴用
366
                _aiNowStamp[idx] = chunkData.timeStamp;
xzl committed
367
            }
368 369 370 371 372 373 374 375 376 377 378 379 380
            if(!_metadata_got){
                if(!onCheckMeta(TitleMeta().getMetadata())){
                    throw std::runtime_error("onCheckMeta failed");
                }
                _metadata_got = true;
            }
            onMediaData_l(std::make_shared<RtmpPacket>(std::move(chunkData)));
        }
            break;
        default:
            //WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
            break;
        }
xzl committed
381 382
}

383
uint32_t RtmpPlayer::getProgressMilliSecond() const{
384
    uint32_t iTime[2] = {0,0};
xzl committed
385
    for(auto i = 0 ;i < 2 ;i++){
386
        iTime[i] = _aiNowStamp[i] - _aiFistStamp[i];
xzl committed
387
    }
388
    return _iSeekTo + MAX(iTime[0],iTime[1]);
xzl committed
389
}
390
void RtmpPlayer::seekToMilliSecond(uint32_t seekMS){
391
    if (_bPaused) {
xzl committed
392 393 394
        pause(false);
    }
    AMFEncoder enc;
395
    enc << "seek" << ++_iReqID << nullptr << seekMS * 1.0;
xzl committed
396
    sendRequest(MSG_CMD, enc.data());
397
    addOnStatusCB([this,seekMS](AMFValue &val) {
xzl committed
398
        //TraceL << "seek result";
399 400
        _aNowStampTicker[0].resetTime();
        _aNowStampTicker[1].resetTime();
401
        int iTimeInc = seekMS - getProgressMilliSecond();
xzl committed
402
        for(auto i = 0 ;i < 2 ;i++){
403 404
            _aiFistStamp[i] = _aiNowStamp[i] + iTimeInc;
            _aiNowStamp[i] = _aiFistStamp[i];
xzl committed
405
        }
406
        _iSeekTo = seekMS;
xzl committed
407 408 409 410
    });

}

xiongziliang committed
411
} /* namespace mediakit */
xzl committed
412