RtmpPusher.cpp 9.16 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4
 * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
xiongziliang committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25 26 27
 */
#include "RtmpPusher.h"
#include "Rtmp/utils.h"
xiongzilaing committed
28 29 30
#include "Util/util.h"
#include "Util/onceToken.h"
#include "Thread/ThreadPool.h"
xiongziliang committed
31
using namespace toolkit;
xiongziliang committed
32
using namespace mediakit::Client;
xzl committed
33

xiongziliang committed
34
namespace mediakit {
xzl committed
35

36
RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){
37
	_pMediaSrc=src;
xzl committed
38 39 40 41 42 43 44 45
}

RtmpPusher::~RtmpPusher() {
	teardown();
	DebugL << endl;
}
void RtmpPusher::teardown() {
	if (alive()) {
46 47 48
		_strApp.clear();
		_strStream.clear();
		_strTcUrl.clear();
xzl committed
49
		{
50 51
			lock_guard<recursive_mutex> lck(_mtxOnResultCB);
			_mapOnResultCB.clear();
xzl committed
52
		}
xzl committed
53
        {
54 55
            lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
            _dqOnStatusCB.clear();
xzl committed
56
        }
57
		_pPublishTimer.reset();
xiongziliang committed
58
        reset();
xiongziliang committed
59
        shutdown(SockException(Err_shutdown,"teardown"));
xzl committed
60 61 62
	}
}

63 64
void RtmpPusher::onPublishResult(const SockException &ex,bool handshakeCompleted) {
	if(!handshakeCompleted){
65 66 67 68 69 70 71 72 73 74
		//播放结果回调
		_pPublishTimer.reset();
		if(_onPublished){
			_onPublished(ex);
		}
	} else {
		//播放成功后异常断开回调
		if(_onShutdown){
			_onShutdown(ex);
		}
xiongziliang committed
75 76 77 78 79 80 81
	}

	if(ex){
		teardown();
	}
}

xiongziliang committed
82
void RtmpPusher::publish(const string &strUrl)  {
xzl committed
83
	teardown();
xiongziliang committed
84 85 86
	string strHost = FindField(strUrl.data(), "://", "/");
	_strApp = 	FindField(strUrl.data(), (strHost + "/").data(), "/");
    _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
87
    _strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
xzl committed
88

89
    if (!_strApp.size() || !_strStream.size()) {
90
        onPublishResult(SockException(Err_other,"rtmp url非法"),false);
xzl committed
91 92
        return;
    }
93
	DebugL << strHost << " " << _strApp << " " << _strStream;
xzl committed
94

xiongziliang committed
95
	auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
xzl committed
96 97 98 99 100
	if (iPort <= 0) {
        //rtmp 默认端口1935
		iPort = 1935;
	} else {
        //服务器域名
xiongziliang committed
101
		strHost = FindField(strHost.data(), NULL, ":");
xzl committed
102
	}
xiongziliang committed
103 104

    weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
105 106
    float publishTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
    _pPublishTimer.reset( new Timer(publishTimeOutSec,  [weakSelf]() {
xiongziliang committed
107 108 109 110
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return false;
        }
111
        strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"), false);
xiongziliang committed
112 113 114 115 116 117 118
        return false;
    },getPoller()));

    if(!(*this)[kNetAdapter].empty()){
        setNetAdapter((*this)[kNetAdapter]);
    }

xzl committed
119 120 121 122
	startConnect(strHost, iPort);
}

void RtmpPusher::onErr(const SockException &ex){
123 124
	//定时器_pPublishTimer为空后表明握手结束了
	onPublishResult(ex,!_pPublishTimer);
xzl committed
125 126
}
void RtmpPusher::onConnect(const SockException &err){
xiongziliang committed
127
	if(err) {
128
		onPublishResult(err,false);
xzl committed
129 130
		return;
	}
131 132 133
	//推流器不需要多大的接收缓存,节省内存占用
	_sock->setReadBuffer(std::make_shared<BufferRaw>(1 * 1024));

xzl committed
134 135 136 137 138 139
	weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
	startClientSession([weakSelf](){
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return;
        }
xiongziliang committed
140 141

        strongSelf->sendChunkSize(60000);
xzl committed
142 143 144
        strongSelf->send_connect();
	});
}
145
void RtmpPusher::onRecv(const Buffer::Ptr &pBuf){
xzl committed
146 147 148 149
	try {
		onParseRtmp(pBuf->data(), pBuf->size());
	} catch (exception &e) {
		SockException ex(Err_other, e.what());
150 151
		//定时器_pPublishTimer为空后表明握手结束了
		onPublishResult(ex,!_pPublishTimer);
xzl committed
152 153 154 155 156 157
	}
}


inline void RtmpPusher::send_connect() {
	AMFValue obj(AMF_OBJECT);
158
	obj.set("app", _strApp);
xzl committed
159
	obj.set("type", "nonprivate");
160 161
	obj.set("tcUrl", _strTcUrl);
	obj.set("swfUrl", _strTcUrl);
xzl committed
162 163 164 165 166 167 168 169
	sendInvoke("connect", obj);
	addOnResultCB([this](AMFDecoder &dec){
		//TraceL << "connect result";
		dec.load<AMFValue>();
		auto val = dec.load<AMFValue>();
		auto level = val["level"].as_string();
		auto code = val["code"].as_string();
		if(level != "status"){
170
			throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl);
xzl committed
171 172 173 174 175 176 177 178 179 180 181
		}
		send_createStream();
	});
}

inline void RtmpPusher::send_createStream() {
	AMFValue obj(AMF_NULL);
	sendInvoke("createStream", obj);
	addOnResultCB([this](AMFDecoder &dec){
		//TraceL << "createStream result";
		dec.load<AMFValue>();
182
		_ui32StreamId = dec.load<int>();
xzl committed
183 184 185 186 187
		send_publish();
	});
}
inline void RtmpPusher::send_publish() {
	AMFEncoder enc;
188
	enc << "publish" << ++_iReqID << nullptr << _strStream << _strApp ;
xzl committed
189 190 191 192 193 194
	sendRequest(MSG_CMD, enc.data());

	addOnStatusCB([this](AMFValue &val) {
		auto level = val["level"].as_string();
		auto code = val["code"].as_string();
		if(level != "status") {
195
			throw std::runtime_error(StrPrinter <<"publish 失败:" << level << " " << code << endl);
xzl committed
196 197 198 199 200 201 202
		}
		//start send media
		send_metaData();
	});
}

inline void RtmpPusher::send_metaData(){
203
    auto src = _pMediaSrc.lock();
xzl committed
204
    if (!src) {
205
        throw std::runtime_error("the media source was released");
xzl committed
206
    }
207

xzl committed
208 209 210 211
    AMFEncoder enc;
    enc << "@setDataFrame" << "onMetaData" <<  src->getMetaData();
    sendRequest(MSG_DATA, enc.data());
    
xiongziliang committed
212
    src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
213
        sendRtmp(pkt->typeId, _ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId );
xzl committed
214 215
    });
    
216
    _pRtmpReader = src->getRing()->attach(getPoller());
xzl committed
217
    weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
218
    _pRtmpReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
xzl committed
219 220 221 222
    	auto strongSelf = weakSelf.lock();
    	if(!strongSelf) {
    		return;
    	}
223
    	strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId);
xzl committed
224
    });
225
    _pRtmpReader->setDetachCB([weakSelf](){
xzl committed
226 227
        auto strongSelf = weakSelf.lock();
        if(strongSelf){
228
            strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放"), !strongSelf->_pPublishTimer);
xzl committed
229 230
        }
    });
231
    onPublishResult(SockException(Err_success,"success"), false);
232 233
	//提升发送性能
	setSocketFlags();
xzl committed
234
}
235 236 237 238 239 240 241 242 243 244

void RtmpPusher::setSocketFlags(){
	GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
	if(!ultraLowDelay) {
        //提高发送性能
        (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
        SockUtil::setNoDelay(_sock->rawFD(), false);
	}
}

xzl committed
245 246
void RtmpPusher::onCmd_result(AMFDecoder &dec){
	auto iReqId = dec.load<int>();
247 248 249
	lock_guard<recursive_mutex> lck(_mtxOnResultCB);
	auto it = _mapOnResultCB.find(iReqId);
	if(it != _mapOnResultCB.end()){
xzl committed
250
		it->second(dec);
251
		_mapOnResultCB.erase(it);
xzl committed
252 253 254 255 256 257 258 259 260 261 262 263 264
	}else{
		WarnL << "unhandled _result";
	}
}
void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) {
	AMFValue val;
	while(true){
		val = dec.load<AMFValue>();
		if(val.type() == AMF_OBJECT){
			break;
		}
	}
	if(val.type() != AMF_OBJECT){
265
		throw std::runtime_error("onStatus:the result object was not found");
xzl committed
266 267
	}

268 269 270 271
    lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
	if(_dqOnStatusCB.size()){
		_dqOnStatusCB.front()(val);
		_dqOnStatusCB.pop_front();
xzl committed
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
	}else{
		auto level = val["level"];
		auto code = val["code"].as_string();
		if(level.type() == AMF_STRING){
			if(level.as_string() != "status"){
				throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl);
			}
		}
    }
}

void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) {
	switch (chunkData.typeId) {
		case MSG_CMD:
		case MSG_CMD3: {
287 288 289 290 291 292 293 294
			typedef void (RtmpPusher::*rtmpCMDHandle)(AMFDecoder &dec);
			static unordered_map<string, rtmpCMDHandle> g_mapCmd;
			static onceToken token([]() {
				g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result);
				g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result);
				g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus);
			}, []() {});

xzl committed
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
			AMFDecoder dec(chunkData.strBuf, 0);
			std::string type = dec.load<std::string>();
			auto it = g_mapCmd.find(type);
			if(it != g_mapCmd.end()){
				auto fun = it->second;
				(this->*fun)(dec);
			}else{
				WarnL << "can not support cmd:" << type;
			}
		}
			break;
		default:
			//WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
			break;
		}
}


xiongziliang committed
313
} /* namespace mediakit */
xzl committed
314