RtmpPlayer.cpp 12.1 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4
 * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
xiongziliang committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25 26 27
 */

#include "RtmpPlayer.h"
xiongzilaing committed
28
#include "Rtmp/utils.h"
xzl committed
29 30
#include "Util/util.h"
#include "Util/onceToken.h"
xiongzilaing committed
31
#include "Thread/ThreadPool.h"
xiongziliang committed
32
using namespace toolkit;
xiongziliang committed
33
using namespace mediakit::Client;
xiongzilaing committed
34

xiongziliang committed
35
namespace mediakit {
xzl committed
36 37

unordered_map<string, RtmpPlayer::rtmpCMDHandle> RtmpPlayer::g_mapCmd;
38
RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {
xzl committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52
	static onceToken token([]() {
		g_mapCmd.emplace("_error",&RtmpPlayer::onCmd_result);
		g_mapCmd.emplace("_result",&RtmpPlayer::onCmd_result);
		g_mapCmd.emplace("onStatus",&RtmpPlayer::onCmd_onStatus);
		g_mapCmd.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData);
		}, []() {});

}

RtmpPlayer::~RtmpPlayer() {
	DebugL << endl;
}
void RtmpPlayer::teardown() {
	if (alive()) {
53 54 55
		_strApp.clear();
		_strStream.clear();
		_strTcUrl.clear();
56 57

		{
58 59
			lock_guard<recursive_mutex> lck(_mtxOnResultCB);
			_mapOnResultCB.clear();
60
		}
xzl committed
61
        {
62 63
            lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
            _dqOnStatusCB.clear();
xzl committed
64
        }
65 66 67
		_pBeatTimer.reset();
		_pPlayTimer.reset();
		_pMediaTimer.reset();
68 69 70
        _iSeekTo = 0;
        CLEAR_ARR(_aiFistStamp);
        CLEAR_ARR(_aiNowStamp);
xiongziliang committed
71
        reset();
xiongziliang committed
72
        shutdown(SockException(Err_shutdown,"teardown"));
xzl committed
73 74
	}
}
xiongziliang committed
75
void RtmpPlayer::play(const string &strUrl)  {
xzl committed
76
	teardown();
xiongziliang committed
77 78 79
	string strHost = FindField(strUrl.data(), "://", "/");
	_strApp = 	FindField(strUrl.data(), (strHost + "/").data(), "/");
    _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
80
    _strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
xzl committed
81

82
    if (!_strApp.size() || !_strStream.size()) {
83
        onPlayResult_l(SockException(Err_other,"rtmp url非法"));
xzl committed
84 85
        return;
    }
86
	DebugL << strHost << " " << _strApp << " " << _strStream;
xzl committed
87

xiongziliang committed
88
	auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
xzl committed
89 90 91 92 93
	if (iPort <= 0) {
        //rtmp 默认端口1935
		iPort = 1935;
	} else {
        //服务器域名
xiongziliang committed
94
		strHost = FindField(strHost.data(), NULL, ":");
xzl committed
95
	}
xiongziliang committed
96 97
	if(!(*this)[kNetAdapter].empty()){
		setNetAdapter((*this)[kNetAdapter]);
98
	}
xzl committed
99 100

	weak_ptr<RtmpPlayer> weakSelf= dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
xiongziliang committed
101
	float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
102
	_pPlayTimer.reset( new Timer(playTimeOutSec, [weakSelf]() {
xzl committed
103 104 105 106
		auto strongSelf=weakSelf.lock();
		if(!strongSelf) {
			return false;
		}
107
		strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout"));
xzl committed
108
		return false;
109
	},getPoller()));
110

111
	_metadata_got = false;
112 113 114
	startConnect(strHost, iPort , playTimeOutSec);
}
void RtmpPlayer::onErr(const SockException &ex){
115 116 117 118 119
	onPlayResult_l(ex);
}

void RtmpPlayer::onPlayResult_l(const SockException &ex) {
	WarnL << ex.getErrCode() << " " << ex.what();
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139

    if(!ex){
        //恢复rtmp接收超时定时器
        _mediaTicker.resetTime();
        weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
        int timeoutMS = (*this)[kMediaTimeoutMS].as<int>();
        _pMediaTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() {
            auto strongSelf=weakSelf.lock();
            if(!strongSelf) {
                return false;
            }
            if(strongSelf->_mediaTicker.elapsedTime()> timeoutMS) {
                //recv media timeout!
                strongSelf->onPlayResult_l(SockException(Err_timeout,"recv rtmp timeout"));
                return false;
            }
            return true;
        },getPoller()));
    }

140
	if (_pPlayTimer) {
141
	    //开始播放阶段
142 143
		_pPlayTimer.reset();
		onPlayResult(ex);
144 145 146 147 148 149 150 151 152 153
	}else {
	    //播放中途阶段
        if (ex) {
            //播放成功后异常断开回调
            onShutdown(ex);
        }else{
            //恢复播放
            onResume();
        }
    }
154 155 156 157

	if(ex){
		teardown();
	}
158 159 160 161 162 163 164
}
void RtmpPlayer::onConnect(const SockException &err){
	if(err.getErrCode()!=Err_success) {
		onPlayResult_l(err);
		return;
	}
	weak_ptr<RtmpPlayer> weakSelf= dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
xzl committed
165 166 167 168 169 170 171 172
	startClientSession([weakSelf](){
        auto strongSelf=weakSelf.lock();
		if(!strongSelf) {
            return;
        }
		strongSelf->send_connect();
	});
}
173
void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){
xzl committed
174 175 176 177
	try {
		onParseRtmp(pBuf->data(), pBuf->size());
	} catch (exception &e) {
		SockException ex(Err_other, e.what());
178
		onPlayResult_l(ex);
xzl committed
179 180 181 182 183 184 185 186 187
	}
}

void RtmpPlayer::pause(bool bPause) {
	send_pause(bPause);
}

inline void RtmpPlayer::send_connect() {
	AMFValue obj(AMF_OBJECT);
188 189
	obj.set("app", _strApp);
	obj.set("tcUrl", _strTcUrl);
190
	//未使用代理
xzl committed
191
	obj.set("fpad", false);
192
	//参考librtmp,什么作用?
xzl committed
193
	obj.set("capabilities", 15);
194
	//SUPPORT_VID_CLIENT_SEEK 支持seek
xzl committed
195 196
	obj.set("videoFunction", 1);
    //只支持aac
197
    obj.set("audioCodecs", (double)(0x0400));
xzl committed
198
    //只支持H264
199
    obj.set("videoCodecs", (double)(0x0080));
xzl committed
200 201 202 203 204 205 206 207
	sendInvoke("connect", obj);
	addOnResultCB([this](AMFDecoder &dec){
		//TraceL << "connect result";
		dec.load<AMFValue>();
		auto val = dec.load<AMFValue>();
		auto level = val["level"].as_string();
		auto code = val["code"].as_string();
		if(level != "status"){
208
			throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl);
xzl committed
209 210 211 212 213 214 215 216 217 218 219
		}
		send_createStream();
	});
}

inline void RtmpPlayer::send_createStream() {
	AMFValue obj(AMF_NULL);
	sendInvoke("createStream", obj);
	addOnResultCB([this](AMFDecoder &dec){
		//TraceL << "createStream result";
		dec.load<AMFValue>();
220
		_ui32StreamId = dec.load<int>();
xzl committed
221 222 223 224 225 226
		send_play();
	});
}

inline void RtmpPlayer::send_play() {
	AMFEncoder enc;
227
	enc << "play" << ++_iReqID  << nullptr << _strStream << (double)_ui32StreamId;
xzl committed
228 229 230 231 232 233
	sendRequest(MSG_CMD, enc.data());
	auto fun = [this](AMFValue &val){
		//TraceL << "play onStatus";
		auto level = val["level"].as_string();
		auto code = val["code"].as_string();
		if(level != "status"){
234
			throw std::runtime_error(StrPrinter <<"play 失败:" << level << " " << code << endl);
xzl committed
235 236 237 238 239 240 241 242
		}
	};
	addOnStatusCB(fun);
	addOnStatusCB(fun);
}

inline void RtmpPlayer::send_pause(bool bPause) {
	AMFEncoder enc;
243
	enc << "pause" << ++_iReqID  << nullptr << bPause;
xzl committed
244 245 246 247 248 249 250
	sendRequest(MSG_CMD, enc.data());
	auto fun = [this,bPause](AMFValue &val){
        //TraceL << "pause onStatus";
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
        if(level != "status") {
            if(!bPause){
251
                throw std::runtime_error(StrPrinter <<"pause 恢复播放失败:" << level << " " << code << endl);
xzl committed
252 253
            }
        }else{
254
            _bPaused = bPause;
xzl committed
255
            if(!bPause){
256
                onPlayResult_l(SockException(Err_success, "rtmp resum success"));
xzl committed
257 258
            }else{
                //暂停播放
259
                _pMediaTimer.reset();
xzl committed
260 261 262 263 264
            }
        }
	};
	addOnStatusCB(fun);

265
	_pBeatTimer.reset();
xzl committed
266 267
	if(bPause){
		weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
268
		_pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0,[weakSelf](){
xzl committed
269 270 271 272 273 274 275
			auto strongSelf = weakSelf.lock();
			if (!strongSelf){
				return false;
			}
			uint32_t timeStamp = ::time(NULL);
			strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp);
			return true;
276
		},getPoller()));
xzl committed
277 278 279 280 281
	}
}

void RtmpPlayer::onCmd_result(AMFDecoder &dec){
	auto iReqId = dec.load<int>();
282 283 284
	lock_guard<recursive_mutex> lck(_mtxOnResultCB);
	auto it = _mapOnResultCB.find(iReqId);
	if(it != _mapOnResultCB.end()){
xzl committed
285
		it->second(dec);
286
		_mapOnResultCB.erase(it);
xzl committed
287 288 289 290 291 292 293 294 295 296 297 298 299
	}else{
		WarnL << "unhandled _result";
	}
}
void RtmpPlayer::onCmd_onStatus(AMFDecoder &dec) {
	AMFValue val;
	while(true){
		val = dec.load<AMFValue>();
		if(val.type() == AMF_OBJECT){
			break;
		}
	}
	if(val.type() != AMF_OBJECT){
300
		throw std::runtime_error("onStatus:the result object was not found");
xzl committed
301 302
	}
    
303 304 305 306
    lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
	if(_dqOnStatusCB.size()){
		_dqOnStatusCB.front()(val);
		_dqOnStatusCB.pop_front();
xzl committed
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
	}else{
		auto level = val["level"];
		auto code = val["code"].as_string();
		if(level.type() == AMF_STRING){
			if(level.as_string() != "status"){
				throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl);
			}
		}
		//WarnL << "unhandled onStatus:" << code;
    }
}

void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) {
	//TraceL;
	auto val = dec.load<AMFValue>();
	if(!onCheckMeta(val)){
323
		throw std::runtime_error("onCheckMeta failed");
xzl committed
324
	}
325
	_metadata_got = true;
xzl committed
326 327 328 329
}

void RtmpPlayer::onStreamDry(uint32_t ui32StreamId) {
	//TraceL << ui32StreamId;
330
	onPlayResult_l(SockException(Err_other,"rtmp stream dry"));
xzl committed
331 332
}

333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) {
	_mediaTicker.resetTime();
	if(!_pPlayTimer){
		//已经触发了onPlayResult事件,直接触发onMediaData事件
		onMediaData(packet);
		return;
	}

	if(packet->isCfgFrame()){
		//输入配置帧以便初始化完成各个track
		onMediaData(packet);
	}else{
		//先触发onPlayResult事件,这个时候解码器才能初始化完毕
		onPlayResult_l(SockException(Err_success,"play rtmp success"));
		//触发onPlayResult事件后,再把帧数据输入到解码器
		onMediaData(packet);
	}
}

xzl committed
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372

void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) {
	switch (chunkData.typeId) {
		case MSG_CMD:
		case MSG_CMD3:
		case MSG_DATA:
		case MSG_DATA3: {
			AMFDecoder dec(chunkData.strBuf, 0);
			std::string type = dec.load<std::string>();
			auto it = g_mapCmd.find(type);
			if(it != g_mapCmd.end()){
				auto fun = it->second;
				(this->*fun)(dec);
			}else{
				WarnL << "can not support cmd:" << type;
			}
		}
			break;
		case MSG_AUDIO:
		case MSG_VIDEO: {
            auto idx = chunkData.typeId%2;
373
            if (_aNowStampTicker[idx].elapsedTime() > 500) {
374
				//计算播放进度时间轴用
375
                _aiNowStamp[idx] = chunkData.timeStamp;
xzl committed
376
            }
377 378 379 380
			if(!_metadata_got){
				if(!onCheckMeta(TitleMeta().getMetadata())){
					throw std::runtime_error("onCheckMeta failed");
				}
381
				_metadata_got = true;
382
			}
383
			onMediaData_l(std::make_shared<RtmpPacket>(std::move(chunkData)));
xzl committed
384 385 386 387 388 389 390 391
		}
			break;
		default:
			//WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
			break;
		}
}

392 393
uint32_t RtmpPlayer::getProgressMilliSecond() const{
	uint32_t iTime[2] = {0,0};
xzl committed
394
    for(auto i = 0 ;i < 2 ;i++){
395
        iTime[i] = _aiNowStamp[i] - _aiFistStamp[i];
xzl committed
396
    }
397
    return _iSeekTo + MAX(iTime[0],iTime[1]);
xzl committed
398
}
399
void RtmpPlayer::seekToMilliSecond(uint32_t seekMS){
400
    if (_bPaused) {
xzl committed
401 402 403
        pause(false);
    }
    AMFEncoder enc;
404
    enc << "seek" << ++_iReqID << nullptr << seekMS * 1.0;
xzl committed
405
    sendRequest(MSG_CMD, enc.data());
406
    addOnStatusCB([this,seekMS](AMFValue &val) {
xzl committed
407
        //TraceL << "seek result";
408 409
        _aNowStampTicker[0].resetTime();
        _aNowStampTicker[1].resetTime();
410
		int iTimeInc = seekMS - getProgressMilliSecond();
xzl committed
411
        for(auto i = 0 ;i < 2 ;i++){
412 413
            _aiFistStamp[i] = _aiNowStamp[i] + iTimeInc;
            _aiNowStamp[i] = _aiFistStamp[i];
xzl committed
414
        }
415
        _iSeekTo = seekMS;
xzl committed
416 417 418 419
    });

}

xiongziliang committed
420
} /* namespace mediakit */
xzl committed
421