RtmpPlayer.cpp 13.1 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9 10 11
 */

#include "RtmpPlayer.h"
xiongzilaing committed
12
#include "Rtmp/utils.h"
xzl committed
13 14
#include "Util/util.h"
#include "Util/onceToken.h"
xiongzilaing committed
15
#include "Thread/ThreadPool.h"
xiongziliang committed
16
using namespace toolkit;
xiongziliang committed
17
using namespace mediakit::Client;
xiongzilaing committed
18

xiongziliang committed
19
namespace mediakit {
xzl committed
20

xiongziliang committed
21
RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {}
xzl committed
22 23

RtmpPlayer::~RtmpPlayer() {
24
    DebugL << endl;
xzl committed
25
}
26

xzl committed
27
void RtmpPlayer::teardown() {
28 29 30
    if (alive()) {
        shutdown(SockException(Err_shutdown,"teardown"));
    }
xiongziliang committed
31 32 33 34 35 36 37
    _app.clear();
    _stream_id.clear();
    _tc_url.clear();
    _beat_timer.reset();
    _play_timer.reset();
    _rtmp_recv_timer.reset();
    _seek_ms = 0;
38 39
    RtmpProtocol::reset();

xiongziliang committed
40 41
    CLEAR_ARR(_fist_stamp);
    CLEAR_ARR(_now_stamp);
42

xiongziliang committed
43 44 45 46
    lock_guard<recursive_mutex> lck(_mtx_on_result);
    _map_on_result.clear();
    lock_guard<recursive_mutex> lck2(_mtx_on_status);
    _deque_on_status.clear();
xzl committed
47
}
48

xiongziliang committed
49
void RtmpPlayer::play(const string &strUrl)  {
50
    teardown();
xiongziliang committed
51 52 53 54
    string host_url = FindField(strUrl.data(), "://", "/");
    _app = FindField(strUrl.data(), (host_url + "/").data(), "/");
    _stream_id = FindField(strUrl.data(), (host_url + "/" + _app + "/").data(), NULL);
    _tc_url = string("rtmp://") + host_url + "/" + _app;
xzl committed
55

xiongziliang committed
56 57
    if (!_app.size() || !_stream_id.size()) {
        onPlayResult_l(SockException(Err_other, "rtmp url非法"), false);
xzl committed
58 59
        return;
    }
xiongziliang committed
60
    DebugL << host_url << " " << _app << " " << _stream_id;
xzl committed
61

xiongziliang committed
62
    auto iPort = atoi(FindField(host_url.data(), ":", NULL).data());
63
    if (iPort <= 0) {
xzl committed
64
        //rtmp 默认端口1935
65 66
        iPort = 1935;
    } else {
xzl committed
67
        //服务器域名
xiongziliang committed
68
        host_url = FindField(host_url.data(), NULL, ":");
69
    }
xiongziliang committed
70
    if (!(*this)[kNetAdapter].empty()) {
71 72 73
        setNetAdapter((*this)[kNetAdapter]);
    }

xiongziliang committed
74 75 76 77 78
    weak_ptr<RtmpPlayer> weak_self = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
    float play_timeout_sec = (*this)[kTimeoutMS].as<int>() / 1000.0;
    _play_timer.reset(new Timer(play_timeout_sec, [weak_self]() {
        auto strong_self = weak_self.lock();
        if (!strong_self) {
79 80
            return false;
        }
xiongziliang committed
81
        strong_self->onPlayResult_l(SockException(Err_timeout, "play rtmp timeout"), false);
82
        return false;
xiongziliang committed
83
    }, getPoller()));
84 85

    _metadata_got = false;
xiongziliang committed
86
    startConnect(host_url, iPort, play_timeout_sec);
87
}
xiongziliang committed
88

89
void RtmpPlayer::onErr(const SockException &ex){
90
    //定时器_pPlayTimer为空后表明握手结束了
xiongziliang committed
91
    onPlayResult_l(ex, !_play_timer);
92 93
}

xiongziliang committed
94
void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshake_done) {
xiongziliang committed
95 96 97
    if (ex.getErrCode() == Err_shutdown) {
        //主动shutdown的,不触发回调
        return;
98 99
    }

xiongziliang committed
100
    WarnL << ex.getErrCode() << " " << ex.what();
xiongziliang committed
101
    if (!handshake_done) {
102
        //开始播放阶段
xiongziliang committed
103
        _play_timer.reset();
104 105
        //是否为性能测试模式
        _benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
xiongziliang committed
106
        onPlayResult(ex);
107 108 109 110 111 112 113 114
    } else if (ex) {
        //播放成功后异常断开回调
        onShutdown(ex);
    } else {
        //恢复播放
        onResume();
    }

xiongziliang committed
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    if (!ex) {
        //播放成功,恢复rtmp接收超时定时器
        _rtmp_recv_ticker.resetTime();
        int timeout_ms = (*this)[kMediaTimeoutMS].as<int>();
        weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
        auto lam = [weakSelf, timeout_ms]() {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return false;
            }
            if (strongSelf->_rtmp_recv_ticker.elapsedTime() > timeout_ms) {
                //接收rtmp媒体数据超时
                SockException ex(Err_timeout, "receive rtmp timeout");
                strongSelf->onPlayResult_l(ex, true);
                return false;
            }
            return true;
        };
        //创建rtmp数据接收超时检测定时器
        _rtmp_recv_timer = std::make_shared<Timer>(timeout_ms / 2000.0, lam, getPoller());
    } else {
136 137
        teardown();
    }
138
}
xiongziliang committed
139

140
void RtmpPlayer::onConnect(const SockException &err){
xiongziliang committed
141
    if (err.getErrCode() != Err_success) {
142 143 144
        onPlayResult_l(err, false);
        return;
    }
xiongziliang committed
145 146 147 148
    weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
    startClientSession([weakSelf]() {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
xzl committed
149 150
            return;
        }
151 152
        strongSelf->send_connect();
    });
xzl committed
153
}
xiongziliang committed
154

xiongziliang committed
155
void RtmpPlayer::onRecv(const Buffer::Ptr &buf){
156
    try {
xiongziliang committed
157
        if (_benchmark_mode && !_play_timer) {
158
            //在性能测试模式下,如果rtmp握手完毕后,不再解析rtmp包
xiongziliang committed
159
            _rtmp_recv_ticker.resetTime();
160 161
            return;
        }
xiongziliang committed
162
        onParseRtmp(buf->data(), buf->size());
163 164
    } catch (exception &e) {
        SockException ex(Err_other, e.what());
165
        //定时器_pPlayTimer为空后表明握手结束了
xiongziliang committed
166
        onPlayResult_l(ex, !_play_timer);
167
    }
xzl committed
168 169 170
}

void RtmpPlayer::pause(bool bPause) {
171
    send_pause(bPause);
xzl committed
172 173 174
}

inline void RtmpPlayer::send_connect() {
175
    AMFValue obj(AMF_OBJECT);
xiongziliang committed
176 177
    obj.set("app", _app);
    obj.set("tcUrl", _tc_url);
178 179 180 181 182 183
    //未使用代理
    obj.set("fpad", false);
    //参考librtmp,什么作用?
    obj.set("capabilities", 15);
    //SUPPORT_VID_CLIENT_SEEK 支持seek
    obj.set("videoFunction", 1);
xzl committed
184
    //只支持aac
xiongziliang committed
185
    obj.set("audioCodecs", (double) (0x0400));
xzl committed
186
    //只支持H264
xiongziliang committed
187
    obj.set("videoCodecs", (double) (0x0080));
188
    sendInvoke("connect", obj);
xiongziliang committed
189
    addOnResultCB([this](AMFDecoder &dec) {
190 191 192 193 194
        //TraceL << "connect result";
        dec.load<AMFValue>();
        auto val = dec.load<AMFValue>();
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
xiongziliang committed
195 196
        if (level != "status") {
            throw std::runtime_error(StrPrinter << "connect 失败:" << level << " " << code << endl);
197 198 199
        }
        send_createStream();
    });
xzl committed
200 201 202
}

inline void RtmpPlayer::send_createStream() {
203 204
    AMFValue obj(AMF_NULL);
    sendInvoke("createStream", obj);
xiongziliang committed
205
    addOnResultCB([this](AMFDecoder &dec) {
206 207
        //TraceL << "createStream result";
        dec.load<AMFValue>();
xiongziliang committed
208
        _stream_index = dec.load<int>();
209 210
        send_play();
    });
xzl committed
211 212 213
}

inline void RtmpPlayer::send_play() {
214
    AMFEncoder enc;
xiongziliang committed
215
    enc << "play" << ++_send_req_id << nullptr << _stream_id << (double) _stream_index;
216
    sendRequest(MSG_CMD, enc.data());
xiongziliang committed
217
    auto fun = [this](AMFValue &val) {
218 219 220
        //TraceL << "play onStatus";
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
xiongziliang committed
221 222
        if (level != "status") {
            throw std::runtime_error(StrPrinter << "play 失败:" << level << " " << code << endl);
223 224 225 226
        }
    };
    addOnStatusCB(fun);
    addOnStatusCB(fun);
xzl committed
227 228
}

xiongziliang committed
229
inline void RtmpPlayer::send_pause(bool pause) {
230
    AMFEncoder enc;
xiongziliang committed
231
    enc << "pause" << ++_send_req_id << nullptr << pause;
232
    sendRequest(MSG_CMD, enc.data());
xiongziliang committed
233
    auto fun = [this, pause](AMFValue &val) {
xzl committed
234 235 236
        //TraceL << "pause onStatus";
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
xiongziliang committed
237
        if (level != "status") {
xiongziliang committed
238
            if (!pause) {
xiongziliang committed
239
                throw std::runtime_error(StrPrinter << "pause 恢复播放失败:" << level << " " << code << endl);
xzl committed
240
            }
xiongziliang committed
241
        } else {
xiongziliang committed
242 243
            _paused = pause;
            if (!pause) {
244
                onPlayResult_l(SockException(Err_success, "resum rtmp success"), true);
xiongziliang committed
245
            } else {
xzl committed
246
                //暂停播放
xiongziliang committed
247
                _rtmp_recv_timer.reset();
xzl committed
248 249
            }
        }
250 251 252
    };
    addOnStatusCB(fun);

xiongziliang committed
253
    _beat_timer.reset();
xiongziliang committed
254
    if (pause) {
255
        weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
xiongziliang committed
256
        _beat_timer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0, [weakSelf]() {
257
            auto strongSelf = weakSelf.lock();
xiongziliang committed
258
            if (!strongSelf) {
259 260 261 262 263
                return false;
            }
            uint32_t timeStamp = ::time(NULL);
            strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp);
            return true;
xiongziliang committed
264
        }, getPoller()));
265
    }
xzl committed
266 267 268
}

void RtmpPlayer::onCmd_result(AMFDecoder &dec){
xiongziliang committed
269 270 271 272
    auto req_id = dec.load<int>();
    lock_guard<recursive_mutex> lck(_mtx_on_result);
    auto it = _map_on_result.find(req_id);
    if (it != _map_on_result.end()) {
273
        it->second(dec);
xiongziliang committed
274 275
        _map_on_result.erase(it);
    } else {
276 277
        WarnL << "unhandled _result";
    }
xzl committed
278
}
xiongziliang committed
279

xzl committed
280
void RtmpPlayer::onCmd_onStatus(AMFDecoder &dec) {
281
    AMFValue val;
xiongziliang committed
282
    while (true) {
283
        val = dec.load<AMFValue>();
xiongziliang committed
284
        if (val.type() == AMF_OBJECT) {
285 286 287
            break;
        }
    }
xiongziliang committed
288
    if (val.type() != AMF_OBJECT) {
289 290
        throw std::runtime_error("onStatus:the result object was not found");
    }
xiongziliang committed
291 292 293 294 295 296

    lock_guard<recursive_mutex> lck(_mtx_on_status);
    if (_deque_on_status.size()) {
        _deque_on_status.front()(val);
        _deque_on_status.pop_front();
    } else {
297 298
        auto level = val["level"];
        auto code = val["code"].as_string();
xiongziliang committed
299 300 301
        if (level.type() == AMF_STRING) {
            if (level.as_string() != "status") {
                throw std::runtime_error(StrPrinter << "onStatus 失败:" << level.as_string() << " " << code << endl);
302 303 304
            }
        }
        //WarnL << "unhandled onStatus:" << code;
xzl committed
305 306 307 308
    }
}

void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) {
309 310
    //TraceL;
    auto val = dec.load<AMFValue>();
xiongziliang committed
311
    if (!onCheckMeta(val)) {
312 313 314
        throw std::runtime_error("onCheckMeta failed");
    }
    _metadata_got = true;
xzl committed
315 316
}

xiongziliang committed
317 318
void RtmpPlayer::onStreamDry(uint32_t stream_index) {
    //TraceL << stream_index;
xiongziliang committed
319
    onPlayResult_l(SockException(Err_other, "rtmp stream dry"), true);
xzl committed
320 321
}

xiongziliang committed
322
void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &chunk_data) {
xiongziliang committed
323 324
    _rtmp_recv_ticker.resetTime();
    if (!_play_timer) {
325
        //已经触发了onPlayResult事件,直接触发onMediaData事件
xiongziliang committed
326
        onMediaData(chunk_data);
327 328 329
        return;
    }

xiongziliang committed
330
    if (chunk_data->isCfgFrame()) {
331
        //输入配置帧以便初始化完成各个track
xiongziliang committed
332
        onMediaData(chunk_data);
xiongziliang committed
333
    } else {
334
        //先触发onPlayResult事件,这个时候解码器才能初始化完毕
xiongziliang committed
335
        onPlayResult_l(SockException(Err_success, "play rtmp success"), false);
336
        //触发onPlayResult事件后,再把帧数据输入到解码器
xiongziliang committed
337
        onMediaData(chunk_data);
338
    }
339 340
}

xzl committed
341

xiongziliang committed
342
void RtmpPlayer::onRtmpChunk(RtmpPacket &chunk_data) {
343 344 345
    typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec);
    static unordered_map<string, rtmp_func_ptr> s_func_map;
    static onceToken token([]() {
xiongziliang committed
346 347 348 349 350
        s_func_map.emplace("_error", &RtmpPlayer::onCmd_result);
        s_func_map.emplace("_result", &RtmpPlayer::onCmd_result);
        s_func_map.emplace("onStatus", &RtmpPlayer::onCmd_onStatus);
        s_func_map.emplace("onMetaData", &RtmpPlayer::onCmd_onMetaData);
    });
351

xiongziliang committed
352
    switch (chunk_data.type_id) {
353 354 355 356
        case MSG_CMD:
        case MSG_CMD3:
        case MSG_DATA:
        case MSG_DATA3: {
xiongziliang committed
357
            AMFDecoder dec(chunk_data.buffer, 0);
358 359
            std::string type = dec.load<std::string>();
            auto it = s_func_map.find(type);
xiongziliang committed
360
            if (it != s_func_map.end()) {
361 362
                auto fun = it->second;
                (this->*fun)(dec);
xiongziliang committed
363
            } else {
364 365 366
                WarnL << "can not support cmd:" << type;
            }
            break;
xiongziliang committed
367 368
        }

369 370
        case MSG_AUDIO:
        case MSG_VIDEO: {
xiongziliang committed
371
            auto idx = chunk_data.type_id % 2;
xiongziliang committed
372
            if (_now_stamp_ticker[idx].elapsedTime() > 500) {
373
                //计算播放进度时间轴用
xiongziliang committed
374
                _now_stamp[idx] = chunk_data.time_stamp;
xzl committed
375
            }
xiongziliang committed
376 377
            if (!_metadata_got) {
                if (!onCheckMeta(TitleMeta().getMetadata())) {
378 379 380 381
                    throw std::runtime_error("onCheckMeta failed");
                }
                _metadata_got = true;
            }
xiongziliang committed
382
            onMediaData_l(std::make_shared<RtmpPacket>(std::move(chunk_data)));
383 384
            break;
        }
xiongziliang committed
385 386 387

        default: break;
    }
xzl committed
388 389
}

390
uint32_t RtmpPlayer::getProgressMilliSecond() const{
xiongziliang committed
391 392 393
    uint32_t stamp[2] = {0, 0};
    for (auto i = 0; i < 2; i++) {
        stamp[i] = _now_stamp[i] - _fist_stamp[i];
xzl committed
394
    }
xiongziliang committed
395
    return _seek_ms + MAX(stamp[0], stamp[1]);
xzl committed
396
}
xiongziliang committed
397

398
void RtmpPlayer::seekToMilliSecond(uint32_t seekMS){
xiongziliang committed
399
    if (_paused) {
xzl committed
400 401 402
        pause(false);
    }
    AMFEncoder enc;
xiongziliang committed
403
    enc << "seek" << ++_send_req_id << nullptr << seekMS * 1.0;
xzl committed
404
    sendRequest(MSG_CMD, enc.data());
xiongziliang committed
405
    addOnStatusCB([this, seekMS](AMFValue &val) {
xzl committed
406
        //TraceL << "seek result";
xiongziliang committed
407 408
        _now_stamp_ticker[0].resetTime();
        _now_stamp_ticker[1].resetTime();
409
        int iTimeInc = seekMS - getProgressMilliSecond();
xiongziliang committed
410 411 412
        for (auto i = 0; i < 2; i++) {
            _fist_stamp[i] = _now_stamp[i] + iTimeInc;
            _now_stamp[i] = _fist_stamp[i];
xzl committed
413
        }
xiongziliang committed
414
        _seek_ms = seekMS;
xzl committed
415 416 417
    });
}

xiongziliang committed
418
} /* namespace mediakit */