RtmpSession.cpp 15.8 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 * Copyright (c) 2016 xiongziliang <771730766@qq.com>
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25 26 27 28 29 30 31 32 33 34 35
 */


#include "RtmpSession.h"
#include "Util/onceToken.h"

namespace ZL {
namespace Rtmp {

unordered_map<string, RtmpSession::rtmpCMDHandle> RtmpSession::g_mapCmd;
RtmpSession::RtmpSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
36
		TcpSession(pTh, pSock) {
xzl committed
37 38 39 40 41 42
	static onceToken token([]() {
		g_mapCmd.emplace("connect",&RtmpSession::onCmd_connect);
		g_mapCmd.emplace("createStream",&RtmpSession::onCmd_createStream);
		g_mapCmd.emplace("publish",&RtmpSession::onCmd_publish);
		g_mapCmd.emplace("deleteStream",&RtmpSession::onCmd_deleteStream);
		g_mapCmd.emplace("play",&RtmpSession::onCmd_play);
43
		g_mapCmd.emplace("play2",&RtmpSession::onCmd_play2);
xzl committed
44 45
		g_mapCmd.emplace("seek",&RtmpSession::onCmd_seek);
		g_mapCmd.emplace("pause",&RtmpSession::onCmd_pause);}, []() {});
46
	DebugL << get_peer_ip();
xzl committed
47 48 49
}

RtmpSession::~RtmpSession() {
50
    DebugL << get_peer_ip();
xzl committed
51 52 53
}

void RtmpSession::onError(const SockException& err) {
54
	DebugL << err.what();
55 56

    //流量统计事件广播
57 58
    GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold);

59
    if(m_ui64TotalBytes > iFlowThreshold * 1024){
60
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_ui64TotalBytes,*this);
61
    }
xzl committed
62 63 64 65 66
}

void RtmpSession::onManager() {
	if (m_ticker.createdTime() > 10 * 1000) {
		if (!m_pRingReader && !m_pPublisherSrc) {
67
			WarnL << "非法链接:" << get_peer_ip();
xzl committed
68 69 70 71 72 73
			shutdown();
		}
	}
	if (m_pPublisherSrc) {
		//publisher
		if (m_ticker.elapsedTime() > 10 * 1000) {
74
			WarnL << "数据接收超时:" << get_peer_ip();
xzl committed
75 76 77 78 79
			shutdown();
		}
	}
}

80
void RtmpSession::onRecv(const Buffer::Ptr &pBuf) {
xzl committed
81 82
	m_ticker.resetTime();
	try {
83
        m_ui64TotalBytes += pBuf->size();
xzl committed
84 85 86 87 88 89 90 91 92
		onParseRtmp(pBuf->data(), pBuf->size());
	} catch (exception &e) {
		WarnL << e.what();
		shutdown();
	}
}

void RtmpSession::onCmd_connect(AMFDecoder &dec) {
	auto params = dec.load<AMFValue>();
xiongziliang committed
93 94 95 96 97 98
	double amfVer = 0;
	AMFValue objectEncoding = params["objectEncoding"];
	if(objectEncoding){
		amfVer = objectEncoding.as_number();
	}
	///////////set chunk size////////////////
771730766@qq.com committed
99
	sendChunkSize(60000);
xiongziliang committed
100 101 102 103 104
	////////////window Acknowledgement size/////
	sendAcknowledgementSize(5000000);
	///////////set peerBandwidth////////////////
	sendPeerBandwidth(5000000);

105 106 107 108
    m_mediaInfo.m_app = params["app"].as_string();
    m_strTcUrl = params["tcUrl"].as_string();
    if(m_strTcUrl.empty()){
        //defaultVhost:默认vhost
109
        m_strTcUrl = string(RTMP_SCHEMA) + "://" + DEFAULT_VHOST + "/" + m_mediaInfo.m_app;
110
    }
xzl committed
111 112
	bool ok = true; //(app == APP_NAME);
	AMFValue version(AMF_OBJECT);
113 114
	version.set("fmsVer", "FMS/3,0,1,123");
	version.set("capabilities", 31.0);
xzl committed
115 116 117 118
	AMFValue status(AMF_OBJECT);
	status.set("level", ok ? "status" : "error");
	status.set("code", ok ? "NetConnection.Connect.Success" : "NetConnection.Connect.InvalidApp");
	status.set("description", ok ? "Connection succeeded." : "InvalidApp.");
xiongziliang committed
119
	status.set("objectEncoding", amfVer);
xzl committed
120 121
	sendReply(ok ? "_result" : "_error", version, status);
	if (!ok) {
122
		throw std::runtime_error("Unsupported application: " + m_mediaInfo.m_app);
xzl committed
123 124
	}

xiongziliang committed
125 126 127
	AMFEncoder invoke;
	invoke << "onBWDone" << 0.0 << nullptr;
	sendResponse(MSG_CMD, invoke.data());
xzl committed
128 129 130 131 132 133 134 135
}

void RtmpSession::onCmd_createStream(AMFDecoder &dec) {
	sendReply("_result", nullptr, double(STREAM_MEDIA));
}

void RtmpSession::onCmd_publish(AMFDecoder &dec) {
	dec.load<AMFValue>();/* NULL */
136
    m_mediaInfo.parse(m_strTcUrl + "/" + dec.load<std::string>());
137

138
    auto onRes = [this](const string &err){
139 140 141 142 143
        auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
                                                                           m_mediaInfo.m_vhost,
                                                                           m_mediaInfo.m_app,
                                                                           m_mediaInfo.m_streamid,
                                                                           false));
144
        bool authSuccess = err.empty();
145 146 147 148
        bool ok = (!src && !m_pPublisherSrc && authSuccess);
        AMFValue status(AMF_OBJECT);
        status.set("level", ok ? "status" : "error");
        status.set("code", ok ? "NetStream.Publish.Start" : (authSuccess ? "NetStream.Publish.BadName" : "NetStream.Publish.BadAuth"));
149
        status.set("description", ok ? "Started publishing stream." : (authSuccess ? "Already publishing." : err.data()));
150 151 152 153
        status.set("clientid", "0");
        sendReply("onStatus", nullptr, status);
        if (!ok) {
            WarnL << "onPublish:"
154
                  << (authSuccess ? "Already publishing:" : err.data()) << " "
155 156 157 158 159 160 161 162
                  << m_mediaInfo.m_vhost << " "
                  << m_mediaInfo.m_app << " "
                  << m_mediaInfo.m_streamid << endl;
            shutdown();
            return;
        }
        m_bPublisherSrcRegisted = false;
        m_pPublisherSrc.reset(new RtmpToRtspMediaSource(m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid));
163
        m_pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
164 165 166
    };

    weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
167
    Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
168 169 170 171
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
172
        strongSelf->async([weakSelf,onRes,err](){
173 174 175 176
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
177
            onRes(err);
178 179 180 181
        });
    };
    auto flag = NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastRtmpPublish,
                                                   m_mediaInfo,
182 183
                                                   invoker,
                                                   *this);
184 185
    if(!flag){
        //该事件无人监听,默认鉴权成功
186
        onRes("");
187
    }
xzl committed
188 189 190 191 192 193 194 195 196 197 198
}

void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
	AMFValue status(AMF_OBJECT);
	status.set("level", "status");
	status.set("code", "NetStream.Unpublish.Success");
	status.set("description", "Stop publishing.");
	sendReply("onStatus", nullptr, status);
	throw std::runtime_error(StrPrinter << "Stop publishing." << endl);
}

199
void  RtmpSession::doPlay(AMFDecoder &dec){
200
    auto onRes = [this](const string &err) {
201 202 203 204 205
        auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
                                                                           m_mediaInfo.m_vhost,
                                                                           m_mediaInfo.m_app,
                                                                           m_mediaInfo.m_streamid,
                                                                           true));
206
        bool authSuccess = err.empty();
207 208 209 210 211 212 213 214 215 216 217 218
        bool ok = (src.operator bool() && authSuccess);
        if(ok){
            ok = ok && src->ready();
        }
        if (ok) {
            //stream begin
           sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA);
        }
        // onStatus(NetStream.Play.Reset)
        AMFValue status(AMF_OBJECT);
        status.set("level", ok ? "status" : "error");
        status.set("code", ok ? "NetStream.Play.Reset" : (authSuccess ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth"));
219
        status.set("description", ok ? "Resetting and playing." : (authSuccess ? "No such stream." : err.data()));
220 221 222 223 224
        status.set("details", m_mediaInfo.m_streamid);
        status.set("clientid", "0");
        sendReply("onStatus", nullptr, status);
        if (!ok) {
            WarnL << "onPlayed:"
225
                  << (authSuccess ? "No such stream:" : err.data()) << " "
226 227 228 229 230 231 232
                  << m_mediaInfo.m_vhost << " "
                  << m_mediaInfo.m_app << " "
                  << m_mediaInfo.m_streamid
                  << endl;
            shutdown();
            return;
        }
xiongziliang committed
233

234 235 236 237 238 239 240 241
        // onStatus(NetStream.Play.Start)
        status.clear();
        status.set("level", "status");
        status.set("code", "NetStream.Play.Start");
        status.set("description", "Started playing.");
        status.set("details", m_mediaInfo.m_streamid);
        status.set("clientid", "0");
        sendReply("onStatus", nullptr, status);
xzl committed
242

243 244 245 246
        // |RtmpSampleAccess(true, true)
        AMFEncoder invoke;
        invoke << "|RtmpSampleAccess" << true << true;
        sendResponse(MSG_DATA, invoke.data());
xzl committed
247

248 249 250 251 252 253
        //onStatus(NetStream.Data.Start)
        invoke.clear();
        AMFValue obj(AMF_OBJECT);
        obj.set("code", "NetStream.Data.Start");
        invoke << "onStatus" << obj;
        sendResponse(MSG_DATA, invoke.data());
xiongziliang committed
254

255 256 257 258 259 260 261 262
        //onStatus(NetStream.Play.PublishNotify)
        status.clear();
        status.set("level", "status");
        status.set("code", "NetStream.Play.PublishNotify");
        status.set("description", "Now published.");
        status.set("details", m_mediaInfo.m_streamid);
        status.set("clientid", "0");
        sendReply("onStatus", nullptr, status);
xiongziliang committed
263

264 265 266 267
        // onMetaData
        invoke.clear();
        invoke << "onMetaData" << src->getMetaData();
        sendResponse(MSG_DATA, invoke.data());
xzl committed
268

269 270 271 272
        src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) {
            //DebugL<<"send initial frame";
            onSendMedia(pkt);
        });
xzl committed
273

274 275
        m_pRingReader = src->getRing()->attach();
        weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
276
        SockUtil::setNoDelay(_sock->rawFD(), false);
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
        m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
            }
            strongSelf->async([weakSelf, pkt]() {
                auto strongSelf = weakSelf.lock();
                if (!strongSelf) {
                    return;
                }
                strongSelf->onSendMedia(pkt);
            });
        });
        m_pRingReader->setDetachCB([weakSelf]() {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
            }
            strongSelf->safeShutdown();
        });
        m_pPlayerSrc = src;
        if (src->getRing()->readerCount() == 1) {
            src->seekTo(0);
        }
    };
302

303
    weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
304
    Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
305 306 307 308
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
309
        strongSelf->async([weakSelf,onRes,err](){
310 311 312 313
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
314
            onRes(err);
315 316
        });
    };
317
    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,m_mediaInfo,invoker,*this);
318 319
    if(!flag){
        //该事件无人监听,默认不鉴权
320
        onRes("");
321
    }
322 323
}
void RtmpSession::onCmd_play2(AMFDecoder &dec) {
324
	doPlay(dec);
325 326 327
}
void RtmpSession::onCmd_play(AMFDecoder &dec) {
	dec.load<AMFValue>();/* NULL */
328
    m_mediaInfo.parse(m_strTcUrl + "/" + dec.load<std::string>());
329
	doPlay(dec);
xzl committed
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
}

void RtmpSession::onCmd_pause(AMFDecoder &dec) {
	dec.load<AMFValue>();/* NULL */
	bool paused = dec.load<bool>();
	TraceL << paused;
	AMFValue status(AMF_OBJECT);
	status.set("level", "status");
	status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify");
	status.set("description", paused ? "Paused stream." : "Unpaused stream.");
	sendReply("onStatus", nullptr, status);
//streamBegin
	sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN,
	STREAM_MEDIA);
	if (!m_pRingReader) {
		throw std::runtime_error("Rtmp not started yet!");
	}
	if (paused) {
		m_pRingReader->setReadCB(nullptr);
	} else {
		weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
xiongziliang committed
351
		m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
xzl committed
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
			auto strongSelf = weakSelf.lock();
			if(!strongSelf) {
				return;
			}
			strongSelf->async([weakSelf,pkt]() {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
				strongSelf->onSendMedia(pkt);
			});
		});
	}
}

void RtmpSession::setMetaData(AMFDecoder &dec) {
	if (!m_pPublisherSrc) {
		throw std::runtime_error("not a publisher");
	}
	std::string type = dec.load<std::string>();
	if (type != "onMetaData") {
		throw std::runtime_error("can only set metadata");
	}
	m_pPublisherSrc->onGetMetaData(dec.load<AMFValue>());
}

void RtmpSession::onProcessCmd(AMFDecoder &dec) {
	std::string method = dec.load<std::string>();
	auto it = g_mapCmd.find(method);
	if (it == g_mapCmd.end()) {
		TraceL << "can not support cmd:" << method;
		return;
	}
	m_dNowReqID = dec.load<double>();
	auto fun = it->second;
	(this->*fun)(dec);
}

void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
	switch (chunkData.typeId) {
	case MSG_CMD:
	case MSG_CMD3: {
xiongziliang committed
394
		AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0);
xzl committed
395 396 397 398 399 400
		onProcessCmd(dec);
	}
		break;

	case MSG_DATA:
	case MSG_DATA3: {
xiongziliang committed
401
		AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0);
xzl committed
402 403 404 405 406 407 408 409 410 411 412 413
		std::string type = dec.load<std::string>();
		TraceL << "notify:" << type;
		if (type == "@setDataFrame") {
			setMetaData(dec);
		}
	}
		break;
	case MSG_AUDIO:
	case MSG_VIDEO: {
		if (!m_pPublisherSrc) {
			throw std::runtime_error("Not a rtmp publisher!");
		}
xiongziliang committed
414
		m_pPublisherSrc->onGetMedia(std::make_shared<RtmpPacket>(chunkData));
xzl committed
415 416 417 418
		if(!m_bPublisherSrcRegisted && m_pPublisherSrc->ready()){
			m_bPublisherSrcRegisted = true;
			m_pPublisherSrc->regist();
		}
xzl committed
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
	}
		break;
	default:
		WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
		break;
	}
}

void RtmpSession::onCmd_seek(AMFDecoder &dec) {
    dec.load<AMFValue>();/* NULL */
    auto milliSeconds = dec.load<AMFValue>().as_number();
    InfoL << "rtmp seekTo:" << milliSeconds/1000.0;
    auto stongSrc = m_pPlayerSrc.lock();
    if (stongSrc) {
        stongSrc->seekTo(milliSeconds);
    }
	AMFValue status(AMF_OBJECT);
	AMFEncoder invoke;
	status.set("level", "status");
	status.set("code", "NetStream.Seek.Notify");
	status.set("description", "Seeking.");
	sendReply("onStatus", nullptr, status);
}

xiongziliang committed
443 444 445
void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
	auto modifiedStamp = pkt->timeStamp;
	auto &firstStamp = m_aui32FirstStamp[pkt->typeId % 2];
446 447 448 449 450 451 452 453 454 455 456
	if(!firstStamp){
		firstStamp = modifiedStamp;
	}
	if(modifiedStamp >= firstStamp){
		//计算时间戳增量
		modifiedStamp -= firstStamp;
	}else{
		//发生回环,重新计算时间戳增量
		CLEAR_ARR(m_aui32FirstStamp);
		modifiedStamp = 0;
	}
771730766@qq.com committed
457
	sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId , true);
xzl committed
458 459 460 461
}

} /* namespace Rtmp */
} /* namespace ZL */