RtmpProtocol.cpp 21 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 * Copyright (c) 2016 xiongziliang <771730766@qq.com>
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25 26
 */
#include "RtmpProtocol.h"
xiongzilaing committed
27 28
#include "Rtsp/Rtsp.h"
#include "Rtmp/utils.h"
xzl committed
29 30
#include "Util/util.h"
#include "Util/onceToken.h"
xiongzilaing committed
31
#include "Thread/ThreadPool.h"
xzl committed
32 33
using namespace ZL::Util;

34
#ifdef ENABLE_OPENSSL
35
#include "Util/SSLBox.h"
36
#include <openssl/hmac.h>
37 38
#include <openssl/opensslv.h>

39 40
static string openssl_HMACsha256(const void *key,unsigned int key_len,
								 const void *data,unsigned int data_len){
41
	std::shared_ptr<char> out(new char[32],[](char *ptr){delete [] ptr;});
42
	unsigned int out_len;
43

44 45
#if defined(OPENSSL_VERSION_NUMBER) && (OPENSSL_VERSION_NUMBER > 0x10100000L)
    //openssl 1.1.0新增api,老版本api作废
46 47 48 49 50 51 52 53
	HMAC_CTX *ctx = HMAC_CTX_new();
	HMAC_CTX_reset(ctx);
	HMAC_Init_ex(ctx, key, key_len, EVP_sha256(), NULL);
	HMAC_Update(ctx, (unsigned char*)data, data_len);
	HMAC_Final(ctx, (unsigned char *)out.get(), &out_len);
	HMAC_CTX_reset(ctx);
	HMAC_CTX_free(ctx);
#else
54 55 56 57
	HMAC_CTX ctx;
	HMAC_CTX_init(&ctx);
	HMAC_Init_ex(&ctx, key, key_len, EVP_sha256(), NULL);
	HMAC_Update(&ctx, (unsigned char*)data, data_len);
58
	HMAC_Final(&ctx, (unsigned char *)out.get(), &out_len);
59
	HMAC_CTX_cleanup(&ctx);
60
#endif //defined(OPENSSL_VERSION_NUMBER) && (OPENSSL_VERSION_NUMBER > 0x10100000L)
61
	return string(out.get(),out_len);
62 63 64 65 66 67 68 69 70 71 72 73 74
}
#endif //ENABLE_OPENSSL


#define C1_DIGEST_SIZE 32
#define C1_KEY_SIZE 128
#define C1_SCHEMA_SIZE 764
#define C1_HANDSHARK_SIZE (RANDOM_LEN + 8)
#define C1_FPKEY_SIZE 30
#define S1_FMS_KEY_SIZE 36
#define S2_FMS_KEY_SIZE 68
#define C1_OFFSET_SIZE 4

xzl committed
75 76 77 78 79 80 81 82 83
namespace ZL {
namespace Rtmp {

RtmpProtocol::RtmpProtocol() {
	m_nextHandle = [this](){
		handle_C0C1();
	};
}
RtmpProtocol::~RtmpProtocol() {
xiongziliang committed
84
	reset();
xzl committed
85
}
xiongziliang committed
86
void RtmpProtocol::reset() {
xzl committed
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	////////////ChunkSize////////////
	m_iChunkLenIn = DEFAULT_CHUNK_LEN;
	m_iChunkLenOut = DEFAULT_CHUNK_LEN;
	////////////Acknowledgement////////////
	m_ui32ByteSent = 0;
	m_ui32LastSent = 0;
	m_ui32WinSize = 0;
	///////////PeerBandwidth///////////
	m_ui32Bandwidth = 2500000;
	m_ui8LimitType = 2;
	////////////Chunk////////////
	m_mapChunkData.clear();
	m_iNowStreamID = 0;
	m_iNowChunkID = 0;
	//////////Invoke Request//////////
	m_iReqID = 0;
	//////////Rtmp parser//////////
	m_strRcvBuf.clear();
	m_ui32StreamId = STREAM_CONTROL;
	m_nextHandle = [this]() {
		handle_C0C1();
	};
}

void RtmpProtocol::sendAcknowledgement(uint32_t ui32Size) {
	std::string control;
	uint32_t stream = htonl(ui32Size);
	control.append((char *) &stream, 4);
	sendRequest(MSG_ACK, control);
}

void RtmpProtocol::sendAcknowledgementSize(uint32_t ui32Size) {
	uint32_t windowSize = htonl(ui32Size);
	std::string set_windowSize((char *) &windowSize, 4);
	sendRequest(MSG_WIN_SIZE, set_windowSize);
}

void RtmpProtocol::sendPeerBandwidth(uint32_t ui32Size) {
	uint32_t peerBandwidth = htonl(ui32Size);
	std::string set_peerBandwidth((char *) &peerBandwidth, 4);
	set_peerBandwidth.push_back((char) 0x02);
	sendRequest(MSG_SET_PEER_BW, set_peerBandwidth);
}

void RtmpProtocol::sendChunkSize(uint32_t ui32Size) {
	uint32_t len = htonl(ui32Size);
	std::string set_chunk((char *) &len, 4);
	sendRequest(MSG_SET_CHUNK, set_chunk);
	m_iChunkLenOut = ui32Size;
}

void RtmpProtocol::sendPingRequest(uint32_t ui32TimeStamp) {
	sendUserControl(CONTROL_PING_REQUEST, ui32TimeStamp);
}

void RtmpProtocol::sendPingResponse(uint32_t ui32TimeStamp) {
	sendUserControl(CONTROL_PING_RESPONSE, ui32TimeStamp);
}

void RtmpProtocol::sendSetBufferLength(uint32_t ui32StreamId,
		uint32_t ui32Length) {
	std::string control;
	ui32StreamId = htonl(ui32StreamId);
	control.append((char *) &ui32StreamId, 4);
	ui32Length = htonl(ui32Length);
	control.append((char *) &ui32Length, 4);
	sendUserControl(CONTROL_SETBUFFER, control);
}

void RtmpProtocol::sendUserControl(uint16_t ui16EventType,
		uint32_t ui32EventData) {
	std::string control;
	uint16_t type = htons(ui16EventType);
	control.append((char *) &type, 2);
	uint32_t stream = htonl(ui32EventData);
	control.append((char *) &stream, 4);
	sendRequest(MSG_USER_CONTROL, control);
}

void RtmpProtocol::sendUserControl(uint16_t ui16EventType,
		const string& strEventData) {
	std::string control;
	uint16_t type = htons(ui16EventType);
	control.append((char *) &type, 2);
	control.append(strEventData);
	sendRequest(MSG_USER_CONTROL, control);
}

void RtmpProtocol::sendResponse(int iType, const string& str) {
xiongziliang committed
176 177 178 179
	if(!m_bDataStarted && (iType == MSG_DATA)){
		m_bDataStarted =  true;
	}
	sendRtmp(iType, m_iNowStreamID, str, 0, m_bDataStarted ? CHUNK_CLIENT_REQUEST_AFTER : CHUNK_CLIENT_REQUEST_BEFORE);
xzl committed
180 181 182 183 184 185 186 187 188 189 190 191 192
}

void RtmpProtocol::sendInvoke(const string& strCmd, const AMFValue& val) {
	AMFEncoder enc;
	enc << strCmd << ++m_iReqID << val;
	sendRequest(MSG_CMD, enc.data());
}

void RtmpProtocol::sendRequest(int iCmd, const string& str) {
	sendRtmp(iCmd, m_ui32StreamId, str, 0, CHUNK_SERVER_REQUEST);
}

void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId,
193
		const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) {
xzl committed
194
	if (iChunkId < 2 || iChunkId > 63) {
195
		auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl;
xzl committed
196 197 198 199 200 201 202 203 204 205
		throw std::runtime_error(strErr);
	}

	bool bExtStamp = ui32TimeStamp >= 0xFFFFFF;
	RtmpHeader header;
	header.flags = (iChunkId & 0x3f) | (0 << 6);
	header.typeId = ui8Type;
	set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp);
	set_be24(header.bodySize, strBuf.size());
	set_le32(header.streamId, ui32StreamId);
771730766@qq.com committed
206 207

	//估算rtmp包数据大小
208
	uint32_t capacity = ((bExtStamp ? 5 : 1) * (1 + (strBuf.size() / m_iChunkLenOut))) + strBuf.size() + sizeof(header);
771730766@qq.com committed
209
	uint32_t totalSize = 0;
xiongziliang committed
210
	BufferRaw::Ptr buffer = obtainBuffer();
771730766@qq.com committed
211 212 213 214
	buffer->setCapacity(capacity);
	memcpy(buffer->data() + totalSize,(char *) &header, sizeof(header));
	totalSize += sizeof(header);

xzl committed
215 216 217 218 219 220 221 222 223
	char acExtStamp[4];
	if (bExtStamp) {
		//扩展时间戳
		set_be32(acExtStamp, ui32TimeStamp);
	}
	size_t pos = 0;
	while (pos < strBuf.size()) {
		if (pos) {
			uint8_t flags = (iChunkId & 0x3f) | (3 << 6);
771730766@qq.com committed
224 225
			memcpy(buffer->data() + totalSize,&flags, 1);
			totalSize += 1;
xzl committed
226 227 228
		}
		if (bExtStamp) {
			//扩展时间戳
771730766@qq.com committed
229 230
			memcpy(buffer->data() + totalSize,acExtStamp, 4);
			totalSize += 4;
xzl committed
231 232
		}
		size_t chunk = min(m_iChunkLenOut, strBuf.size() - pos);
771730766@qq.com committed
233 234
		memcpy(buffer->data() + totalSize,strBuf.data() + pos, chunk);
		totalSize += chunk;
xzl committed
235 236
		pos += chunk;
	}
771730766@qq.com committed
237
    buffer->setSize(totalSize);
238
	onSendRawData(buffer);
771730766@qq.com committed
239
	m_ui32ByteSent += totalSize;
xzl committed
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
	if (m_ui32WinSize > 0 && m_ui32ByteSent - m_ui32LastSent >= m_ui32WinSize) {
		m_ui32LastSent = m_ui32ByteSent;
		sendAcknowledgement(m_ui32ByteSent);
	}
}

void RtmpProtocol::onParseRtmp(const char *pcRawData, int iSize) {
	m_strRcvBuf.append(pcRawData, iSize);
	auto cb = m_nextHandle;
	cb();
}

////for client////
void RtmpProtocol::startClientSession(const function<void()> &callBack) {
	//发送 C0C1
	char handshake_head = HANDSHAKE_PLAINTEXT;
256
	onSendRawData(obtainBuffer(&handshake_head, 1));
257
	RtmpHandshake c1(0);
258
	onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1)));
xzl committed
259 260 261 262 263 264
	m_nextHandle = [this,callBack]() {
		//等待 S0+S1+S2
		handle_S0S1S2(callBack);
	};
}
void RtmpProtocol::handle_S0S1S2(const function<void()> &callBack) {
265
	if (m_strRcvBuf.size() < 1 + 2 * C1_HANDSHARK_SIZE) {
xzl committed
266 267 268 269 270 271 272 273
		//数据不够
		return;
	}
	if (m_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) {
		throw std::runtime_error("only plaintext[0x03] handshake supported");
	}
	//发送 C2
	const char *pcC2 = m_strRcvBuf.data() + 1;
274
	onSendRawData(obtainBuffer(pcC2, C1_HANDSHARK_SIZE));
275
	m_strRcvBuf.erase(0, 1 + 2 * C1_HANDSHARK_SIZE);
xzl committed
276 277 278 279 280 281 282 283 284
	//握手结束
	m_nextHandle = [this]() {
		//握手结束并且开始进入解析命令模式
		handle_rtmp();
	};
	callBack();
}
////for server ////
void RtmpProtocol::handle_C0C1() {
285
	if (m_strRcvBuf.size() < 1 + C1_HANDSHARK_SIZE) {
xzl committed
286 287 288 289 290 291
		//need more data!
		return;
	}
	if (m_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) {
		throw std::runtime_error("only plaintext[0x03] handshake supported");
	}
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
	if(memcmp(m_strRcvBuf.c_str() + 5,"\x00\x00\x00\x00",4) ==0 ){
		//simple handsharke
		handle_C1_simple();
	}else{
#ifdef ENABLE_OPENSSL
		//complex handsharke
		handle_C1_complex();
#else
		WarnL << "未打开ENABLE_OPENSSL宏,复杂握手采用简单方式处理!";
		handle_C1_simple();
#endif//ENABLE_OPENSSL
	}
	m_strRcvBuf.erase(0, 1 + C1_HANDSHARK_SIZE);
}
void RtmpProtocol::handle_C1_simple(){
xiongziliang committed
307
	//发送S0
308
	char handshake_head = HANDSHAKE_PLAINTEXT;
309
	onSendRawData(obtainBuffer(&handshake_head, 1));
xiongziliang committed
310
	//发送S1
311
	RtmpHandshake s1(0);
312
	onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE));
xiongziliang committed
313
	//发送S2
314
	onSendRawData(obtainBuffer(m_strRcvBuf.c_str() + 1, C1_HANDSHARK_SIZE));
xzl committed
315 316 317 318 319
	//等待C2
	m_nextHandle = [this]() {
		handle_C2();
	};
}
320
#ifdef ENABLE_OPENSSL
321
void RtmpProtocol::handle_C1_complex(){
322
	//参考自:http://blog.csdn.net/win_lin/article/details/13006803
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
	//skip c0,time,version
	const char *c1_start = m_strRcvBuf.data() + 1;
	const char *schema_start = c1_start + 8;
	char *digest_start;
	try{
		/* c1s1 schema0
		time: 4bytes
		version: 4bytes
		key: 764bytes
		digest: 764bytes
		 */
		auto digest = get_C1_digest((uint8_t *)schema_start + C1_SCHEMA_SIZE,&digest_start);
		string c1_joined(c1_start,C1_HANDSHARK_SIZE);
		c1_joined.erase(digest_start - c1_start , C1_DIGEST_SIZE );
		check_C1_Digest(digest,c1_joined);

		send_complex_S0S1S2(0,digest);
		InfoL << "schema0";
	}catch(std::exception &ex){
		//貌似flash从来都不用schema1
		WarnL << "try rtmp complex schema0 failed:" <<  ex.what();
		try{
			/* c1s1 schema1
			time: 4bytes
			version: 4bytes
			digest: 764bytes
			key: 764bytes
			 */
			auto digest = get_C1_digest((uint8_t *)schema_start,&digest_start);
			string c1_joined(c1_start,C1_HANDSHARK_SIZE);
			c1_joined.erase(digest_start - c1_start , C1_DIGEST_SIZE );
			check_C1_Digest(digest,c1_joined);

			send_complex_S0S1S2(1,digest);
			InfoL << "schema1";
		}catch(std::exception &ex){
			WarnL << "try rtmp complex schema1 failed:" <<  ex.what();
			handle_C1_simple();
		}
	}
}

365 366 367
#if !defined(u_int8_t)
#define u_int8_t unsigned char
#endif // !defined(u_int8_t)
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393

static u_int8_t FMSKey[] = {
    0x47, 0x65, 0x6e, 0x75, 0x69, 0x6e, 0x65, 0x20,
    0x41, 0x64, 0x6f, 0x62, 0x65, 0x20, 0x46, 0x6c,
    0x61, 0x73, 0x68, 0x20, 0x4d, 0x65, 0x64, 0x69,
    0x61, 0x20, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72,
    0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Media Server 001
    0xf0, 0xee, 0xc2, 0x4a, 0x80, 0x68, 0xbe, 0xe8,
    0x2e, 0x00, 0xd0, 0xd1, 0x02, 0x9e, 0x7e, 0x57,
    0x6e, 0xec, 0x5d, 0x2d, 0x29, 0x80, 0x6f, 0xab,
    0x93, 0xb8, 0xe6, 0x36, 0xcf, 0xeb, 0x31, 0xae
}; // 68

static u_int8_t FPKey[] = {
    0x47, 0x65, 0x6E, 0x75, 0x69, 0x6E, 0x65, 0x20,
    0x41, 0x64, 0x6F, 0x62, 0x65, 0x20, 0x46, 0x6C,
    0x61, 0x73, 0x68, 0x20, 0x50, 0x6C, 0x61, 0x79,
    0x65, 0x72, 0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Player 001
    0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8,
    0x2E, 0x00, 0xD0, 0xD1, 0x02, 0x9E, 0x7E, 0x57,
    0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
    0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
}; // 62
void RtmpProtocol::check_C1_Digest(const string &digest,const string &data){
	auto sha256 = openssl_HMACsha256(FPKey,C1_FPKEY_SIZE,data.data(),data.size());
	if(sha256 != digest){
394
		throw std::runtime_error("digest mismatched");
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
	}else{
		InfoL << "check rtmp complex handshark success!";
	}
}
string RtmpProtocol::get_C1_digest(const uint8_t *ptr,char **digestPos){
	/* 764bytes digest结构
	offset: 4bytes
	random-data: (offset)bytes
	digest-data: 32bytes
	random-data: (764-4-offset-32)bytes
	 */
	int offset = 0;
	for(int i=0;i<C1_OFFSET_SIZE;++i){
		offset += ptr[i];
	}
	offset %= (C1_SCHEMA_SIZE - C1_DIGEST_SIZE - C1_OFFSET_SIZE);
	*digestPos = (char *)ptr + C1_OFFSET_SIZE + offset;
	string digest(*digestPos,C1_DIGEST_SIZE);
	//DebugL << "digest offset:" << offset << ",digest:" << hexdump(digest.data(),digest.size());
	return digest;
}
string RtmpProtocol::get_C1_key(const uint8_t *ptr){
	/* 764bytes key结构
	random-data: (offset)bytes
	key-data: 128bytes
	random-data: (764-offset-128-4)bytes
	offset: 4bytes
	 */
	int offset = 0;
	for(int i = C1_SCHEMA_SIZE - C1_OFFSET_SIZE;i< C1_SCHEMA_SIZE;++i){
		offset += ptr[i];
	}
	offset %= (C1_SCHEMA_SIZE - C1_KEY_SIZE - C1_OFFSET_SIZE);
	string key((char *)ptr + offset,C1_KEY_SIZE);
	//DebugL << "key offset:" << offset << ",key:" << hexdump(key.data(),key.size());
	return key;
}
void RtmpProtocol::send_complex_S0S1S2(int schemeType,const string &digest){
433
	//S1S2计算参考自:https://github.com/hitYangfei/golang/blob/master/rtmpserver.go
434 435
	//发送S0
	char handshake_head = HANDSHAKE_PLAINTEXT;
436
	onSendRawData(obtainBuffer(&handshake_head, 1));
437
	//S1
438
	RtmpHandshake s1(0);
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
	memcpy(s1.zero,"\x04\x05\x00\x01",4);
	char *digestPos;
	if(schemeType == 0){
		/* c1s1 schema0
		time: 4bytes
		version: 4bytes
		key: 764bytes
		digest: 764bytes
		 */
		get_C1_digest(s1.random + C1_SCHEMA_SIZE,&digestPos);
	}else{
		/* c1s1 schema1
		time: 4bytes
		version: 4bytes
		digest: 764bytes
		key: 764bytes
		 */
		get_C1_digest(s1.random,&digestPos);
	}
	char *s1_start = (char *)&s1;
	string s1_joined(s1_start,sizeof(s1));
	s1_joined.erase(digestPos - s1_start,C1_DIGEST_SIZE);
	string s1_digest = openssl_HMACsha256(FMSKey,S1_FMS_KEY_SIZE,s1_joined.data(),s1_joined.size());
	memcpy(digestPos,s1_digest.data(),s1_digest.size());
463
	onSendRawData(obtainBuffer((char *) &s1, sizeof(s1)));
xzl committed
464

465 466 467 468 469 470
	//S2
	string s2_key = openssl_HMACsha256(FMSKey,S2_FMS_KEY_SIZE,digest.data(),digest.size());
	RtmpHandshake s2(0);
	s2.random_generate((char *)&s2,8);
	string s2_digest = openssl_HMACsha256(s2_key.data(),s2_key.size(),&s2,sizeof(s2) - C1_DIGEST_SIZE);
	memcpy((char *)&s2 + C1_HANDSHARK_SIZE - C1_DIGEST_SIZE,s2_digest.data(),C1_DIGEST_SIZE);
471
	onSendRawData(obtainBuffer((char *)&s2, sizeof(s2)));
472 473 474 475 476
	//等待C2
	m_nextHandle = [this]() {
		handle_C2();
	};
}
477
#endif //ENABLE_OPENSSL
xzl committed
478
void RtmpProtocol::handle_C2() {
479
	if (m_strRcvBuf.size() < C1_HANDSHARK_SIZE) {
xzl committed
480 481 482
		//need more data!
		return;
	}
483
	m_strRcvBuf.erase(0, C1_HANDSHARK_SIZE);
xzl committed
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
	//握手结束,进入命令模式
	if (!m_strRcvBuf.empty()) {
		handle_rtmp();
	}
	m_nextHandle = [this]() {
		handle_rtmp();
	};
}

void RtmpProtocol::handle_rtmp() {
	while (!m_strRcvBuf.empty()) {
		uint8_t flags = m_strRcvBuf[0];
		int iOffset = 0;
		static const size_t HEADER_LENGTH[] = { 12, 8, 4, 1 };
		size_t iHeaderLen = HEADER_LENGTH[flags >> 6];
		m_iNowChunkID = flags & 0x3f;
xzl committed
500 501 502 503
        if(m_iNowChunkID >10){
            int i=0;
            i++;
        }
xzl committed
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
		switch (m_iNowChunkID) {
		case 0: {
			//0 值表示二字节形式,并且 ID 范围 64 - 319
			//(第二个字节 + 64)。
			if (m_strRcvBuf.size() < 2) {
				//need more data
				return;
			}
			m_iNowChunkID = 64 + (uint8_t) (m_strRcvBuf[1]);
			iOffset = 1;
		}
			break;
		case 1: {
			//1 值表示三字节形式,并且 ID 范围为 64 - 65599
			//((第三个字节) * 256 + 第二个字节 + 64)。
			if (m_strRcvBuf.size() < 3) {
				//need more data
				return;
			}
			m_iNowChunkID = 64 + ((uint8_t) (m_strRcvBuf[2]) << 8) + (uint8_t) (m_strRcvBuf[1]);
			iOffset = 2;
		}
			break;
		default:
			//带有 2 值的块流 ID 被保留,用于下层协议控制消息和命令。
			break;
		}

		if (m_strRcvBuf.size() < iHeaderLen + iOffset) {
			//need more data
			return;
		}
		RtmpHeader &header = *((RtmpHeader *) (m_strRcvBuf.data() + iOffset));
		auto &chunkData = m_mapChunkData[m_iNowChunkID];
		chunkData.chunkId = m_iNowChunkID;
		switch (iHeaderLen) {
		case 12:
xzl committed
541
            chunkData.hasAbsStamp = true;
xzl committed
542 543 544 545 546
			chunkData.streamId = load_le32(header.streamId);
		case 8:
			chunkData.bodySize = load_be24(header.bodySize);
			chunkData.typeId = header.typeId;
		case 4:
xzl committed
547
			chunkData.deltaStamp = load_be24(header.timeStamp);
xzl committed
548
            chunkData.hasExtStamp = chunkData.deltaStamp == 0xFFFFFF;
xzl committed
549
		}
xzl committed
550
		
xzl committed
551
        if (chunkData.hasExtStamp) {
xzl committed
552 553 554 555
			if (m_strRcvBuf.size() < iHeaderLen + iOffset + 4) {
				//need more data
				return;
			}
xzl committed
556
            chunkData.deltaStamp = load_be32(m_strRcvBuf.data() + iOffset + iHeaderLen);
xzl committed
557 558
			iOffset += 4;
		}
xzl committed
559 560
		
        if (chunkData.bodySize < chunkData.strBuf.size()) {
xzl committed
561 562
			throw std::runtime_error("非法的bodySize");
		}
xzl committed
563
        
xzl committed
564 565 566 567 568
		auto iMore = min(m_iChunkLenIn, chunkData.bodySize - chunkData.strBuf.size());
		if (m_strRcvBuf.size() < iHeaderLen + iOffset + iMore) {
			//need more data
			return;
		}
xzl committed
569 570
		
        chunkData.strBuf.append(m_strRcvBuf, iHeaderLen + iOffset, iMore);
xzl committed
571
		m_strRcvBuf.erase(0, iHeaderLen + iOffset + iMore);
xzl committed
572
        
xzl committed
573
		if (chunkData.strBuf.size() == chunkData.bodySize) {
xzl committed
574
            //frame is ready
xzl committed
575
            m_iNowStreamID = chunkData.streamId;
xzl committed
576
            chunkData.timeStamp = chunkData.deltaStamp + (chunkData.hasAbsStamp ? 0 : chunkData.timeStamp);
xzl committed
577
            
578 579 580
			if(chunkData.bodySize){
				handle_rtmpChunk(chunkData);
			}
xzl committed
581
			chunkData.strBuf.clear();
xzl committed
582 583 584
            chunkData.hasAbsStamp = false;
            chunkData.hasExtStamp = false;
            chunkData.deltaStamp = 0;
xzl committed
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
		}
	}
}

void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunkData) {
	switch (chunkData.typeId) {
		case MSG_ACK: {
			if (chunkData.strBuf.size() < 4) {
				throw std::runtime_error("MSG_ACK: Not enough data");
			}
			//auto bytePeerRecv = load_be32(&chunkData.strBuf[0]);
			//TraceL << "MSG_ACK:" << bytePeerRecv;
		}
			break;
		case MSG_SET_CHUNK: {
			if (chunkData.strBuf.size() < 4) {
				throw std::runtime_error("MSG_SET_CHUNK :Not enough data");
			}
			m_iChunkLenIn = load_be32(&chunkData.strBuf[0]);
			TraceL << "MSG_SET_CHUNK:" << m_iChunkLenIn;
		}
			break;
		case MSG_USER_CONTROL: {
			//user control message
			if (chunkData.strBuf.size() < 2) {
				throw std::runtime_error("MSG_USER_CONTROL: Not enough data.");
			}
			uint16_t event_type = load_be16(&chunkData.strBuf[0]);
			chunkData.strBuf.erase(0, 2);
			switch (event_type) {
			case CONTROL_PING_REQUEST: {
					if (chunkData.strBuf.size() < 4) {
						throw std::runtime_error("CONTROL_PING_REQUEST: Not enough data.");
					}
					uint32_t timeStamp = load_be32(&chunkData.strBuf[0]);
					//TraceL << "CONTROL_PING_REQUEST:" << timeStamp;
					sendUserControl(CONTROL_PING_RESPONSE, timeStamp);
				}
					break;
			case CONTROL_PING_RESPONSE: {
				if (chunkData.strBuf.size() < 4) {
					throw std::runtime_error("CONTROL_PING_RESPONSE: Not enough data.");
				}
				//uint32_t timeStamp = load_be32(&chunkData.strBuf[0]);
				//TraceL << "CONTROL_PING_RESPONSE:" << timeStamp;
			}
				break;
			case CONTROL_STREAM_BEGIN: {
				//开始播放
				if (chunkData.strBuf.size() < 4) {
					throw std::runtime_error("CONTROL_STREAM_BEGIN: Not enough data.");
				}
				uint32_t stramId = load_be32(&chunkData.strBuf[0]);
				onStreamBegin(stramId);
				TraceL << "CONTROL_STREAM_BEGIN:" << stramId;
			}
				break;

			case CONTROL_STREAM_EOF: {
				//暂停
				if (chunkData.strBuf.size() < 4) {
					throw std::runtime_error("CONTROL_STREAM_EOF: Not enough data.");
				}
				uint32_t stramId = load_be32(&chunkData.strBuf[0]);
				onStreamEof(stramId);
				TraceL << "CONTROL_STREAM_EOF:" << stramId;
			}
				break;
			case CONTROL_STREAM_DRY: {
				//停止播放
				if (chunkData.strBuf.size() < 4) {
					throw std::runtime_error("CONTROL_STREAM_DRY: Not enough data.");
				}
				uint32_t stramId = load_be32(&chunkData.strBuf[0]);
				onStreamDry(stramId);
				TraceL << "CONTROL_STREAM_DRY:" << stramId;
			}
				break;
			default:
				//WarnL << "unhandled user control:" << event_type;
				break;
			}
		}
			break;

		case MSG_WIN_SIZE: {
			m_ui32WinSize = load_be32(&chunkData.strBuf[0]);
			TraceL << "MSG_WIN_SIZE:" << m_ui32WinSize;
		}
			break;
		case MSG_SET_PEER_BW: {
			m_ui32Bandwidth = load_be32(&chunkData.strBuf[0]);
			m_ui8LimitType =  chunkData.strBuf[4];
			TraceL << "MSG_SET_PEER_BW:" << m_ui32WinSize;
		}
			break;
		case MSG_AGGREGATE:
			throw std::runtime_error("streaming FLV not supported");
			break;
		default:
			onRtmpChunk(chunkData);
			break;
		}
}

xiongziliang committed
690 691 692 693
BufferRaw::Ptr RtmpProtocol::obtainBuffer() {
    return std::make_shared<BufferRaw>() ;//_bufferPool.obtain();
}

694 695 696 697 698 699
BufferRaw::Ptr RtmpProtocol::obtainBuffer(const void *data, int len) {
	auto buffer = obtainBuffer();
	buffer->assign((const char *)data,len);
	return buffer;
}

xzl committed
700 701
} /* namespace Rtmp */
} /* namespace ZL */