RtspSession.cpp 36.4 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 * Copyright (c) 2016 xiongziliang <771730766@qq.com>
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25
 */
26

xiongzilaing committed
27
#include <atomic>
xiongziliang committed
28
#include "Common/config.h"
xiongzilaing committed
29
#include "UDPServer.h"
xzl committed
30
#include "RtspSession.h"
xiongzilaing committed
31
#include "Util/mini.h"
32
#include "Util/MD5.h"
xiongziliang committed
33
#include "Util/base64.h"
xiongzilaing committed
34
#include "Util/onceToken.h"
xzl committed
35 36
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongzilaing committed
37
#include "Network/sockutil.h"
xzl committed
38

xiongziliang committed
39 40
using namespace std;
using namespace toolkit;
xzl committed
41

xiongziliang committed
42
namespace mediakit {
xzl committed
43

44 45
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;

xzl committed
46 47 48 49 50 51 52 53 54 55 56 57
string dateHeader() {
	char buf[200];
	time_t tt = time(NULL);
	strftime(buf, sizeof buf, "Date: %a, %b %d %Y %H:%M:%S GMT\r\n", gmtime(&tt));
	return buf;
}

unordered_map<string, weak_ptr<RtspSession> > RtspSession::g_mapGetter;
unordered_map<void *, std::shared_ptr<RtspSession> > RtspSession::g_mapPostter;
recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护
recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护
RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
58
		TcpSession(pTh, pSock), _pSender(pSock) {
xiongziliang committed
59 60 61 62
	//设置10秒发送缓存
	pSock->setSendBufSecond(10);
	//设置15秒发送超时时间
	pSock->setSendTimeOutSecond(15);
xzl committed
63

64
	DebugL <<  get_peer_ip();
xzl committed
65 66 67
}

RtspSession::~RtspSession() {
68 69
	if (_onDestory) {
		_onDestory();
xzl committed
70
	}
71
    DebugL <<  get_peer_ip();
xzl committed
72 73 74
}

void RtspSession::shutdown(){
xiongziliang committed
75 76 77
	shutdown_l(true);
}
void RtspSession::shutdown_l(bool close){
78
	if (_sock) {
xiongziliang committed
79
		_sock->emitErr(SockException(Err_other, "self shutdown"),close);
xzl committed
80
	}
81
	if (_bBase64need && !_sock) {
xzl committed
82 83 84 85
		//quickTime http postter,and self is detached from tcpServer
		lock_guard<recursive_mutex> lock(g_mtxPostter);
		g_mapPostter.erase(this);
	}
86 87 88
	if (_pBrdcaster) {
		_pBrdcaster->setDetachCB(this, nullptr);
		_pBrdcaster.reset();
xzl committed
89
	}
90 91
	if (_pRtpReader) {
		_pRtpReader.reset();
xzl committed
92 93 94 95 96
	}
}

void RtspSession::onError(const SockException& err) {
	TraceL << err.getErrCode() << " " << err.what();
97
	if (_bListenPeerUdpData) {
xiongziliang committed
98
		//取消UDP端口监听
99
		UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
100
		_bListenPeerUdpData = false;
xzl committed
101
	}
102
	if (!_bBase64need && _strSessionCookie.size() != 0) {
xzl committed
103 104
		//quickTime http getter
		lock_guard<recursive_mutex> lock(g_mtxGetter);
105
		g_mapGetter.erase(_strSessionCookie);
xzl committed
106 107
	}

108
	if (_bBase64need && err.getErrCode() == Err_eof) {
xzl committed
109
		//quickTime http postter,正在发送rtp; QuickTime只是断开了请求连接,请继续发送rtp
110
		_sock = nullptr;
xzl committed
111 112
		lock_guard<recursive_mutex> lock(g_mtxPostter);
		//为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用
xiongziliang committed
113 114 115 116
		try {
			g_mapPostter.emplace(this, dynamic_pointer_cast<RtspSession>(shared_from_this()));
		}catch (std::exception &ex){
		}
117
		TraceL << "quickTime will not send request any more!";
xzl committed
118
	}
119 120

    //流量统计事件广播
121
    GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold);
122
    if(_ui64TotalBytes > iFlowThreshold * 1024){
123
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
124 125 126
										   _mediaInfo,
										   _ui64TotalBytes,
										   _ticker.createdTime()/1000,
127
										   *this);
128
    }
xzl committed
129 130 131
}

void RtspSession::onManager() {
132 133
	if (_ticker.createdTime() > 15 * 1000) {
		if (_strSession.size() == 0) {
134
			WarnL << "非法链接:" << get_peer_ip();
xzl committed
135 136 137 138
			shutdown();
			return;
		}
	}
139
	if (_rtpType != PlayerBase::RTP_TCP && _ticker.elapsedTime() > 15 * 1000) {
140
		WarnL << "RTSP会话超时:" << get_peer_ip();
xzl committed
141 142 143
		shutdown();
		return;
	}
144 145 146 147 148 149 150

    if(_delayTask){
        if(time(NULL) > _iTaskTimeLine){
            _delayTask();
            _delayTask = nullptr;
        }
    }
xzl committed
151 152
}

xiongziliang committed
153

154
int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
155
    char tmp[2 * 1024];
156
    _pcBuf = tmp;
157

158 159 160
	_parser.Parse(header); //rtsp请求解析
	string strCmd = _parser.Method(); //提取出请求命令字
	_iCseq = atoi(_parser["CSeq"].data());
xzl committed
161

xiongziliang committed
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
	typedef bool (RtspSession::*rtspCMDHandle)();
	static unordered_map<string, rtspCMDHandle> g_mapCmd;
	static onceToken token( []() {
		g_mapCmd.emplace("OPTIONS",&RtspSession::handleReq_Options);
		g_mapCmd.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
		g_mapCmd.emplace("SETUP",&RtspSession::handleReq_Setup);
		g_mapCmd.emplace("PLAY",&RtspSession::handleReq_Play);
		g_mapCmd.emplace("PAUSE",&RtspSession::handleReq_Pause);
		g_mapCmd.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
		g_mapCmd.emplace("GET",&RtspSession::handleReq_Get);
		g_mapCmd.emplace("POST",&RtspSession::handleReq_Post);
		g_mapCmd.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
		g_mapCmd.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
	}, []() {});

xzl committed
177 178 179
	auto it = g_mapCmd.find(strCmd);
	if (it != g_mapCmd.end()) {
		auto fun = it->second;
xiongziliang committed
180 181 182 183
		if(!(this->*fun)()){
		    shutdown();
		}
	} else{
xzl committed
184 185 186
		shutdown();
		WarnL << "cmd=" << strCmd;
	}
xiongziliang committed
187

188
    _parser.Clear();
xiongziliang committed
189 190 191 192 193
    return 0;
}


void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
194 195 196
	_ticker.resetTime();
    _ui64TotalBytes += pBuf->size();
    if (_bBase64need) {
xiongziliang committed
197
		//quicktime 加密后的rtsp请求,需要解密
198 199
		auto str = decodeBase64(string(pBuf->data(),pBuf->size()));
		inputRtspOrRtcp(str.data(),str.size());
xiongziliang committed
200
	} else {
201
        inputRtspOrRtcp(pBuf->data(),pBuf->size());
xiongziliang committed
202 203 204
	}
}

205
void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
206
	if(data[0] == '$' && _rtpType == PlayerBase::RTP_TCP){
xiongziliang committed
207 208 209
		//这是rtcp
		return;
	}
210
    input(data,len);
xzl committed
211 212 213 214
}

bool RtspSession::handleReq_Options() {
//支持这些命令
215 216 217 218 219 220 221 222 223 224
	int n = sprintf(_pcBuf,
					"RTSP/1.0 200 OK\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY,"
					" PAUSE, SET_PARAMETER, GET_PARAMETER\r\n\r\n",
					_iCseq, SERVER_NAME,
					RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data());
225
	SocketHelper::send(_pcBuf, n);
xzl committed
226 227 228 229
	return true;
}

bool RtspSession::handleReq_Describe() {
230 231
    {
        //解析url获取媒体名称
232 233
        _strUrl = _parser.Url();
        _mediaInfo.parse(_parser.FullUrl());
234 235
    }

236 237 238 239 240 241 242 243 244
	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());\
	auto parserCopy = _parser;
    findStream([weakSelf,parserCopy](bool success){
    	auto strongSelf = weakSelf.lock();
    	if(!strongSelf){
			return;
    	}
    	//恢复现场
		strongSelf->_parser = parserCopy;
xiongziliang committed
245 246 247
		char tmp[2 * 1024];
		strongSelf->_pcBuf = tmp;

248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
    	if(!success){
			//未找到相应的MediaSource
			strongSelf->send_StreamNotFound();
			strongSelf->shutdown();
			return;
    	}
		//该请求中的认证信息
		auto authorization = strongSelf->_parser["Authorization"];
		onGetRealm invoker = [weakSelf,authorization](const string &realm){
			if(realm.empty()){
				//无需认证,回复sdp
				onAuthSuccess(weakSelf);
				return;
			}
			//该流需要认证
			onAuthUser(weakSelf,realm,authorization);
		};

		//广播是否需要认证事件
		if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
											   strongSelf->_mediaInfo,
											   invoker,
											   *strongSelf)){
			//无人监听此事件,说明无需认证
			invoker("");
		}
    });
275
    return true;
xzl committed
276
}
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
void RtspSession::onAuthSuccess(const weak_ptr<RtspSession> &weakSelf) {
    auto strongSelf = weakSelf.lock();
    if(!strongSelf){
        //本对象已销毁
        return;
    }
    strongSelf->async([weakSelf](){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已销毁
            return;
        }
        char response[2 * 1024];
        int n = sprintf(response,
                        "RTSP/1.0 200 OK\r\n"
                        "CSeq: %d\r\n"
                        "Server: %s-%0.2f(build in %s)\r\n"
                        "%s"
                        "x-Accept-Retransmit: our-retransmit\r\n"
                        "x-Accept-Dynamic-Rate: 1\r\n"
                        "Content-Base: %s/\r\n"
                        "Content-Type: application/sdp\r\n"
                        "Content-Length: %d\r\n\r\n%s",
300
                        strongSelf->_iCseq, SERVER_NAME,
301
                        RTSP_VERSION, RTSP_BUILDTIME,
302 303
                        dateHeader().data(), strongSelf->_strUrl.data(),
                        (int) strongSelf->_strSdp.length(), strongSelf->_strSdp.data());
304
        strongSelf->SocketHelper::send(response, n);
305 306 307 308 309 310 311 312 313 314 315 316 317 318
    });
}
void RtspSession::onAuthFailed(const weak_ptr<RtspSession> &weakSelf,const string &realm) {
    auto strongSelf = weakSelf.lock();
    if(!strongSelf){
        //本对象已销毁
        return;
    }
    strongSelf->async([weakSelf,realm]() {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            //本对象已销毁
            return;
        }
xzl committed
319

320 321
        int n;
        char response[2 * 1024];
xiongziliang committed
322
        GET_CONFIG_AND_REGISTER(bool,authBasic,Rtsp::kAuthBasic);
323 324
        if (!authBasic) {
            //我们需要客户端优先以md5方式认证
325
            strongSelf->_strNonce = makeRandStr(32);
326 327 328 329 330
            n = sprintf(response,
                        "RTSP/1.0 401 Unauthorized\r\n"
                        "CSeq: %d\r\n"
                        "Server: %s-%0.2f(build in %s)\r\n"
                        "%s"
331
                        "WWW-Authenticate: Digest realm=\"%s\",nonce=\"%s\"\r\n\r\n",
332
                        strongSelf->_iCseq, SERVER_NAME,
333
                        RTSP_VERSION, RTSP_BUILDTIME,
334
                        dateHeader().data(), realm.data(), strongSelf->_strNonce.data());
335 336 337 338 339 340 341
        }else {
            //当然我们也支持base64认证,但是我们不建议这样做
            n = sprintf(response,
                        "RTSP/1.0 401 Unauthorized\r\n"
                        "CSeq: %d\r\n"
                        "Server: %s-%0.2f(build in %s)\r\n"
                        "%s"
342
                        "WWW-Authenticate: Basic realm=\"%s\"\r\n\r\n",
343
                        strongSelf->_iCseq, SERVER_NAME,
344 345 346
                        RTSP_VERSION, RTSP_BUILDTIME,
                        dateHeader().data(), realm.data());
        }
347
        strongSelf->SocketHelper::send(response, n);
348 349 350 351 352 353 354
    });
}

void RtspSession::onAuthBasic(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &strBase64){
    //base64认证
    char user_pwd_buf[512];
    av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
355
    auto user_pwd_vec = split(user_pwd_buf,":");
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
    if(user_pwd_vec.size() < 2){
        //认证信息格式不合法,回复401 Unauthorized
        onAuthFailed(weakSelf,realm);
        return;
    }
    auto user = user_pwd_vec[0];
    auto pwd = user_pwd_vec[1];
    onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
        if(!encrypted && pwd == good_pwd){
            //提供的是明文密码且匹配正确
            onAuthSuccess(weakSelf);
        }else{
            //密码错误
            onAuthFailed(weakSelf,realm);
        }
    };
372 373 374 375 376 377 378

    auto strongSelf = weakSelf.lock();
    if(!strongSelf){
        //本对象已销毁
        return;
    }

379
    //此时必须提供明文密码
380
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,user, true,invoker,*strongSelf)){
381 382 383 384 385 386 387
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
        WarnL << "请监听kBroadcastOnRtspAuth事件!";
        //但是我们还是忽略认证以便完成播放
        //我们输入的密码是明文
        invoker(false,pwd);
    }
}
388

389 390 391 392 393 394 395 396 397 398
void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &strMd5){
    auto strongSelf = weakSelf.lock();
    if(!strongSelf){
        return;
    }

	DebugL << strMd5;
    auto mapTmp = Parser::parseArgs(strMd5,",","=");
    decltype(mapTmp) map;
    for(auto &pr : mapTmp){
399
        map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
400 401 402 403 404 405 406 407 408
    }
    //check realm
    if(realm != map["realm"]){
        TraceL << "realm not mached:" << realm << "," << map["realm"];
        onAuthFailed(weakSelf,realm);
        return ;
    }
    //check nonce
    auto nonce = map["nonce"];
409 410
    if(strongSelf->_strNonce != nonce){
        TraceL << "nonce not mached:" << nonce << "," << strongSelf->_strNonce;
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
        onAuthFailed(weakSelf,realm);
        return ;
    }
    //check username and uri
    auto username = map["username"];
    auto uri = map["uri"];
    auto response = map["response"];
    if(username.empty() || uri.empty() || response.empty()){
        TraceL << "username/uri/response empty:" << username << "," << uri << "," << response;
        onAuthFailed(weakSelf,realm);
        return ;
    }

    auto realInvoker = [weakSelf,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
        if(ignoreAuth){
            //忽略认证
            onAuthSuccess(weakSelf);
            TraceL << "auth ignored";
            return;
        }
        /*
        response计算方法如下:
        RTSP客户端应该使用username + password并计算response如下:
        (1)当password为MD5编码,则
            response = md5( password:nonce:md5(public_method:url)  );
        (2)当password为ANSI字符串,则
            response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
         */
        auto encrypted_pwd = good_pwd;
        if(!encrypted){
            //提供的是明文密码
            encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
        }

        auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
        if(strcasecmp(good_response.data(),response.data()) == 0){
            //认证成功!md5不区分大小写
            onAuthSuccess(weakSelf);
            TraceL << "onAuthSuccess";
        }else{
            //认证失败!
            onAuthFailed(weakSelf,realm);
            TraceL << "onAuthFailed";
        }
    };
    onAuth invoker = [realInvoker](bool encrypted,const string &good_pwd){
        realInvoker(false,encrypted,good_pwd);
    };

    //此时可以提供明文或md5加密的密码
461
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,username, false,invoker,*strongSelf)){
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
        WarnL << "请监听kBroadcastOnRtspAuth事件!";
        //但是我们还是忽略认证以便完成播放
        realInvoker(true,true,"");
    }
}

void RtspSession::onAuthUser(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &authorization){
    //请求中包含认证信息
    auto authType = FindField(authorization.data(),NULL," ");
	auto authStr = FindField(authorization.data()," ",NULL);
    if(authType.empty() || authStr.empty()){
        //认证信息格式不合法,回复401 Unauthorized
        onAuthFailed(weakSelf,realm);
        return;
    }
    if(authType == "Basic"){
        //base64认证,需要明文密码
        onAuthBasic(weakSelf,realm,authStr);
    }else if(authType == "Digest"){
        //md5认证
        onAuthDigest(weakSelf,realm,authStr);
    }else{
        //其他认证方式?不支持!
        onAuthFailed(weakSelf,realm);
    }
}
xzl committed
489
inline void RtspSession::send_StreamNotFound() {
490 491 492 493 494 495 496 497 498
	int n = sprintf(_pcBuf,
					"RTSP/1.0 404 Stream Not Found\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Connection: Close\r\n\r\n",
					_iCseq, SERVER_NAME,
					RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data());
499
	SocketHelper::send(_pcBuf, n);
xzl committed
500 501
}
inline void RtspSession::send_UnsupportedTransport() {
502 503 504 505 506 507 508 509 510
	int n = sprintf(_pcBuf,
					"RTSP/1.0 461 Unsupported Transport\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Connection: Close\r\n\r\n",
					_iCseq, SERVER_NAME,
					RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data());
511
	SocketHelper::send(_pcBuf, n);
xzl committed
512 513 514
}

inline void RtspSession::send_SessionNotFound() {
515 516 517 518 519 520 521 522 523
	int n = sprintf(_pcBuf,
					"RTSP/1.0 454 Session Not Found\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Connection: Close\r\n\r\n",
					_iCseq, SERVER_NAME,
					RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data());
524
	SocketHelper::send(_pcBuf, n);
xzl committed
525

xiongziliang committed
526
	/*40 Method Not Allowed*/
xzl committed
527 528 529 530

}
bool RtspSession::handleReq_Setup() {
//处理setup命令,该函数可能进入多次
531
    auto controlSuffix = _parser.FullUrl().substr(1 + _parser.FullUrl().rfind('/'));
532
	int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
xzl committed
533 534 535 536
	if (trackIdx == -1) {
		//未找到相应track
		return false;
	}
xiongziliang committed
537
	SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
538
	if (trackRef->_inited) {
xzl committed
539 540 541
		//已经初始化过该Track
		return false;
	}
542
	trackRef->_inited = true; //现在初始化
xzl committed
543

544 545 546
	if(!_bSetUped){
		_bSetUped = true;
		auto strTransport = _parser["Transport"];
xzl committed
547
		if(strTransport.find("TCP") != string::npos){
548
			_rtpType = PlayerBase::RTP_TCP;
xzl committed
549
		}else if(strTransport.find("multicast") != string::npos){
550
			_rtpType = PlayerBase::RTP_MULTICAST;
xzl committed
551
		}else{
552
			_rtpType = PlayerBase::RTP_UDP;
xzl committed
553 554 555
		}
	}

556
	switch (_rtpType) {
xzl committed
557
	case PlayerBase::RTP_TCP: {
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
		int iLen = sprintf(_pcBuf,
						   "RTSP/1.0 200 OK\r\n"
						   "CSeq: %d\r\n"
						   "Server: %s-%0.2f(build in %s)\r\n"
						   "%s"
						   "Transport: RTP/AVP/TCP;unicast;"
						   "interleaved=%d-%d;ssrc=%s;mode=play\r\n"
						   "Session: %s\r\n"
						   "x-Transport-Options: late-tolerance=1.400000\r\n"
						   "x-Dynamic-Rate: 1\r\n\r\n",
						   _iCseq, SERVER_NAME,
						   RTSP_VERSION, RTSP_BUILDTIME,
						   dateHeader().data(), trackRef->_type * 2,
						   trackRef->_type * 2 + 1,
						   printSSRC(trackRef->_ssrc).data(),
						   _strSession.data());
574
		SocketHelper::send(_pcBuf, iLen);
xzl committed
575 576 577
	}
		break;
	case PlayerBase::RTP_UDP: {
xiongziliang committed
578
		//我们用trackIdx区分rtp和rtcp包
579
		auto pSockRtp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx);
xiongziliang committed
580 581 582 583 584 585
		if (!pSockRtp) {
			//分配端口失败
			WarnL << "分配rtp端口失败";
			send_NotAcceptable();
			return false;
		}
586
		auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1);
xiongziliang committed
587
		if (!pSockRtcp) {
xzl committed
588
			//分配端口失败
xiongziliang committed
589
			WarnL << "分配rtcp端口失败";
xzl committed
590 591 592
			send_NotAcceptable();
			return false;
		}
593
		_apUdpSock[trackIdx] = pSockRtp;
xiongziliang committed
594
		//设置客户端内网端口信息
595
		string strClientPort = FindField(_parser["Transport"].data(), "client_port=", NULL);
xzl committed
596 597 598 599
		uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
		struct sockaddr_in peerAddr;
		peerAddr.sin_family = AF_INET;
		peerAddr.sin_port = htons(ui16PeerPort);
600
		peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
xzl committed
601
		bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
602
		_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
xiongziliang committed
603 604
		//尝试获取客户端nat映射地址
		startListenPeerUdpData();
xzl committed
605
		//InfoL << "分配端口:" << srv_port;
606 607 608 609 610 611 612 613 614 615 616 617 618 619
		int n = sprintf(_pcBuf,
						"RTSP/1.0 200 OK\r\n"
						"CSeq: %d\r\n"
						"Server: %s-%0.2f(build in %s)\r\n"
						"%s"
						"Transport: RTP/AVP/UDP;unicast;"
						"client_port=%s;server_port=%d-%d;ssrc=%s;mode=play\r\n"
						"Session: %s\r\n\r\n",
						_iCseq, SERVER_NAME,
						RTSP_VERSION, RTSP_BUILDTIME,
						dateHeader().data(), strClientPort.data(),
						pSockRtp->get_local_port(), pSockRtcp->get_local_port(),
						printSSRC(trackRef->_ssrc).data(),
						_strSession.data());
620
		SocketHelper::send(_pcBuf, n);
xzl committed
621 622 623
	}
		break;
	case PlayerBase::RTP_MULTICAST: {
624 625 626
		if(!_pBrdcaster){
			_pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
			if (!_pBrdcaster) {
xzl committed
627 628 629 630
				send_NotAcceptable();
				return false;
			}
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
631
			_pBrdcaster->setDetachCB(this, [weakSelf]() {
xzl committed
632 633 634 635 636 637 638
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
				strongSelf->safeShutdown();
			});
		}
639
		int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
xiongziliang committed
640
		//我们用trackIdx区分rtp和rtcp包
641
		auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
xiongziliang committed
642 643 644 645 646 647 648
		if (!pSockRtcp) {
			//分配端口失败
			WarnL << "分配rtcp端口失败";
			send_NotAcceptable();
			return false;
		}
		startListenPeerUdpData();
649
        GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL);
650 651 652 653 654 655 656 657 658 659 660 661 662 663
		int n = sprintf(_pcBuf,
						"RTSP/1.0 200 OK\r\n"
						"CSeq: %d\r\n"
						"Server: %s-%0.2f(build in %s)\r\n"
						"%s"
						"Transport: RTP/AVP;multicast;destination=%s;"
						"source=%s;port=%d-%d;ttl=%d;ssrc=%s\r\n"
						"Session: %s\r\n\r\n",
						_iCseq, SERVER_NAME,
						RTSP_VERSION, RTSP_BUILDTIME,
						dateHeader().data(), _pBrdcaster->getIP().data(),
						get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(),
						udpTTL, printSSRC(trackRef->_ssrc).data(),
						_strSession.data());
664
		SocketHelper::send(_pcBuf, n);
xzl committed
665 666 667 668 669 670 671 672 673
	}
		break;
	default:
		break;
	}
	return true;
}

bool RtspSession::handleReq_Play() {
xiongziliang committed
674
	if (_aTrackInfo.empty() || _parser["Session"] != _strSession) {
xzl committed
675 676 677
		send_SessionNotFound();
		return false;
	}
678
	auto strRange = _parser["Range"];
679
    auto onRes = [this,strRange](const string &err){
680
        bool authSuccess = err.empty();
681
        char response[2 * 1024];
682
        _pcBuf = response;
xiongziliang committed
683
        if(!authSuccess){
684
            //第一次play是播放,否则是恢复播放。只对播放鉴权
685
            int n = sprintf(_pcBuf,
686 687 688
                            "RTSP/1.0 401 Unauthorized\r\n"
                            "CSeq: %d\r\n"
                            "Server: %s-%0.2f(build in %s)\r\n"
689 690 691
                            "%s"
                            "Content-Type: text/plain\r\n"
                            "Content-Length: %d\r\n\r\n%s",
692
                            _iCseq, SERVER_NAME,
693 694
                            RTSP_VERSION, RTSP_BUILDTIME,
                            dateHeader().data(),(int)err.size(),err.data());
695
			SocketHelper::send(_pcBuf,n);
696 697 698
            shutdown();
            return;
        }
xzl committed
699

700
        auto pMediaSrc = _pMediaSrc.lock();
701 702 703 704
        if(!pMediaSrc){
        	send_StreamNotFound();
        	shutdown();
			return;
705
        }
706

707 708 709
        bool useBuf = true;
		_enableSendRtp = false;

710
		if (strRange.size() && !_bFirstPlay) {
xiongziliang committed
711
            //这个是seek操作
712 713 714 715 716 717
			auto strStart = FindField(strRange.data(), "npt=", "-");
			if (strStart == "now") {
				strStart = "0";
			}
			auto iStartTime = 1000 * atof(strStart.data());
			InfoL << "rtsp seekTo(ms):" << iStartTime;
718 719
			useBuf = !pMediaSrc->seekTo(iStartTime);
		}else if(pMediaSrc->getRing()->readerCount() == 0){
720 721 722 723 724 725 726 727 728 729 730 731 732
			//第一个消费者
			pMediaSrc->seekTo(0);
		}
		_bFirstPlay = false;
		int iLen = sprintf(_pcBuf,
						   "RTSP/1.0 200 OK\r\n"
						   "CSeq: %d\r\n"
						   "Server: %s-%0.2f(build in %s)\r\n"
						   "%s"
						   "Session: %s\r\n"
						   "Range: npt=%.2f-\r\n"
						   "RTP-Info: ", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
						   dateHeader().data(), _strSession.data(), pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0);
733

xiongziliang committed
734
		for(auto &track : _aTrackInfo){
735
			if (track->_inited == false) {
xiongziliang committed
736 737 738 739
				//还有track没有setup
				shutdown();
				return;
			}
740 741 742 743
			track->_ssrc = pMediaSrc->getSsrc(track->_type);
			track->_seq = pMediaSrc->getSeqence(track->_type);
			track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);

744 745 746 747
			iLen += sprintf(_pcBuf + iLen, "url=%s/%s;seq=%d;rtptime=%u,",
							_strUrl.data(),
							track->_control_surffix.data(),
							track->_seq,
748
							track->_time_stamp * (track->_samplerate / 1000));
xiongziliang committed
749 750
		}

751
        iLen -= 1;
752 753 754
        (_pcBuf)[iLen] = '\0';
        iLen += sprintf(_pcBuf + iLen, "\r\n\r\n");
		SocketHelper::send(_pcBuf, iLen);
755

756 757
		_enableSendRtp = true;

758 759
		//提高发送性能
		(*this) << SocketFlags(kSockFlags);
760
		SockUtil::setNoDelay(_pSender->rawFD(),false);
xiongziliang committed
761

762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790
		if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) {
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
			_pRtpReader = pMediaSrc->getRing()->attach(useBuf);
			_pRtpReader->setDetachCB([weakSelf]() {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
				strongSelf->safeShutdown();
			});
			_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
				if(!strongSelf->_enableSendRtp) {
					return;
				}
				strongSelf->async([weakSelf,pack](){
					auto strongSelf = weakSelf.lock();
					if(!strongSelf) {
						return;
					}
					if(strongSelf->_enableSendRtp) {
						strongSelf->sendRtpPacket(pack);
					}
				});
			});
		}
791
    };
xzl committed
792

793
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
794
    Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
795 796 797
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
xzl committed
798
        }
799
        strongSelf->async([weakSelf,onRes,err](){
800 801 802 803
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
804
            onRes(err);
805 806
        });
    };
xiongziliang committed
807 808 809 810 811 812 813 814 815
    if(_bFirstPlay){
        //第一次收到play命令,需要鉴权
        auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
        if(!flag){
            //该事件无人监听,默认不鉴权
            onRes("");
        }
    }else{
        //后面是seek或恢复命令,不需要鉴权
816
        onRes("");
817
    }
xzl committed
818 819 820 821
	return true;
}

bool RtspSession::handleReq_Pause() {
822
	if (_parser["Session"] != _strSession) {
xzl committed
823 824 825
		send_SessionNotFound();
		return false;
	}
826 827 828 829 830 831 832
	int n = sprintf(_pcBuf,
					"RTSP/1.0 200 OK\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data(), _strSession.data());
833
	SocketHelper::send(_pcBuf, n);
834
	_enableSendRtp = false;
xzl committed
835 836 837 838
	return true;
}

bool RtspSession::handleReq_Teardown() {
839 840 841 842 843 844 845
	int n = sprintf(_pcBuf,
					"RTSP/1.0 200 OK\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data(), _strSession.data());
846
	SocketHelper::send(_pcBuf, n);
xzl committed
847 848 849 850 851
	TraceL << "播放器断开连接!";
	return false;
}

bool RtspSession::handleReq_Get() {
852
	_strSessionCookie = _parser["x-sessioncookie"];
853 854 855 856 857 858 859 860
	int n = sprintf(_pcBuf,
					"HTTP/1.0 200 OK\r\n"
					"%s"
					"Connection: close\r\n"
					"Cache-Control: no-store\r\n"
					"Pragma: no-cache\r\n"
					"Content-Type: application/x-rtsp-tunnelled\r\n\r\n",
					dateHeader().data());
xzl committed
861 862
//注册GET
	lock_guard<recursive_mutex> lock(g_mtxGetter);
863 864 865
	g_mapGetter[_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
	//InfoL << _strSessionCookie;
	SocketHelper::send(_pcBuf, n);
xzl committed
866 867 868 869 870 871
	return true;

}

bool RtspSession::handleReq_Post() {
	lock_guard<recursive_mutex> lock(g_mtxGetter);
872
	string sessioncookie = _parser["x-sessioncookie"];
xzl committed
873 874 875
//Poster 找到 Getter
	auto it = g_mapGetter.find(sessioncookie);
	if (it == g_mapGetter.end()) {
xiongziliang committed
876
		//WarnL << sessioncookie;
xzl committed
877 878
		return false;
	}
879
	_bBase64need = true;
xzl committed
880 881 882 883 884
//Poster 找到Getter的SOCK
	auto strongSession = it->second.lock();
	g_mapGetter.erase(sessioncookie);
	if (!strongSession) {
		send_SessionNotFound();
xiongziliang committed
885
		//WarnL;
xzl committed
886 887 888 889 890 891 892 893
		return false;
	}
	initSender(strongSession);
	return true;
}

bool RtspSession::handleReq_SET_PARAMETER() {
	//TraceL<<endl;
894 895 896 897 898 899 900
	int n = sprintf(_pcBuf,
					"RTSP/1.0 200 OK\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data(), _strSession.data());
901
	SocketHelper::send(_pcBuf, n);
xzl committed
902 903 904 905
	return true;
}

inline void RtspSession::send_NotAcceptable() {
906 907 908 909 910 911 912
	int n = sprintf(_pcBuf,
					"RTSP/1.0 406 Not Acceptable\r\n"
					"CSeq: %d\r\n"
					"Server: %s-%0.2f(build in %s)\r\n"
					"%s"
					"Connection: Close\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
					dateHeader().data());
913
	SocketHelper::send(_pcBuf, n);
xzl committed
914 915

}
916

917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
void RtspSession::doDelay(int delaySec, const std::function<void()> &fun) {
    if(_delayTask){
        _delayTask();
    }
    _delayTask = fun;
    _iTaskTimeLine = time(NULL) + delaySec;
}

void RtspSession::cancelDelyaTask(){
    _delayTask = nullptr;
}

void RtspSession::findStream(const function<void(bool)> &cb) {
	bool success = findStream();
	if (success) {
		cb(true);
		return;
	}

	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
	auto task_id = this;
	auto media_info = _mediaInfo;

	auto onRegist = [task_id, weakSelf, media_info, cb](BroadcastMediaChangedArgs) {
		if (bRegist &&
			schema == media_info._schema &&
			vhost == media_info._vhost &&
			app == media_info._app &&
			stream == media_info._streamid) {
			//播发器请求的rtmp流终于注册上了
			auto strongSelf = weakSelf.lock();
			if (!strongSelf) {
				return;
			}
			//切换到自己的线程再回复
			//如果触发 kBroadcastMediaChanged 事件的线程与本RtspSession绑定的线程相同,
			//那么strongSelf->async操作可能是同步操作,
			//通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行,
			//以便防止遍历事件监听对象map时做删除操作
			strongSelf->async([task_id, weakSelf, media_info, cb]() {
				auto strongSelf = weakSelf.lock();
				if (!strongSelf) {
					return;
				}
				DebugL << "收到rtsp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/"
					   << media_info._app << "/" << media_info._streamid;
				cb(strongSelf->findStream());
				//取消延时任务,防止多次回复
				strongSelf->cancelDelyaTask();

				//取消事件监听
				//在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃
				NoticeCenter::Instance().delListener(task_id, Broadcast::kBroadcastMediaChanged);
			}, false);
		}
	};

	NoticeCenter::Instance().addListener(task_id, Broadcast::kBroadcastMediaChanged, onRegist);
	//5秒后执行失败回调
	doDelay(5, [cb]() {
		cb(false);
	});
}

xzl committed
981
inline bool RtspSession::findStream() {
982
	RtspMediaSource::Ptr pMediaSrc =
983
    dynamic_pointer_cast<RtspMediaSource>( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) );
xzl committed
984
	if (!pMediaSrc) {
985
		WarnL << "No such stream:" <<  _mediaInfo._vhost << " " <<  _mediaInfo._app << " " << _mediaInfo._streamid;
xzl committed
986 987
		return false;
	}
988
	_strSdp = pMediaSrc->getSdp();
xiongziliang committed
989 990 991
	_sdpAttr.load(_strSdp);
	_aTrackInfo = _sdpAttr.getAvailableTrack();

xiongziliang committed
992
	if (_aTrackInfo.empty()) {
xzl committed
993 994
		return false;
	}
995 996
	_strSession = makeRandStr(12);
	_pMediaSrc = pMediaSrc;
xzl committed
997

xiongziliang committed
998
	for(auto &track : _aTrackInfo){
999 1000
		track->_ssrc = pMediaSrc->getSsrc(track->_type);
		track->_seq = pMediaSrc->getSeqence(track->_type);
1001
		track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
xzl committed
1002 1003 1004 1005
	}
	return true;
}

1006 1007

inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
xzl committed
1008
	//InfoL<<(int)pkt.Interleaved;
1009
	switch (_rtpType) {
xzl committed
1010
	case PlayerBase::RTP_TCP: {
1011 1012
        BufferRtp::Ptr buffer(new BufferRtp(pkt));
		send(buffer);
xzl committed
1013 1014
#ifdef RTSP_SEND_RTCP
		int iTrackIndex = getTrackIndexByTrackId(pkt.interleaved / 2);
1015
		RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
xzl committed
1016 1017
		counter.pktCnt += 1;
		counter.octCount += (pkt.length - 12);
1018 1019
		auto &_ticker = _aRtcpTicker[iTrackIndex];
		if (_ticker.elapsedTime() > 5 * 1000) {
xzl committed
1020
			//send rtcp every 5 second
1021
			_ticker.resetTime();
xzl committed
1022 1023 1024 1025 1026 1027 1028
			counter.timeStamp = pkt.timeStamp;
			sendRTCP();
		}
#endif
	}
		break;
	case PlayerBase::RTP_UDP: {
xiongziliang committed
1029
		int iTrackIndex = getTrackIndexByTrackType(pkt->type);
1030
		auto pSock = _apUdpSock[iTrackIndex].lock();
xzl committed
1031 1032 1033 1034
		if (!pSock) {
			shutdown();
			return;
		}
1035
		auto peerAddr = _apPeerUdpAddr[iTrackIndex];
xzl committed
1036 1037 1038
		if (!peerAddr) {
			return;
		}
1039
        BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
1040
        _ui64TotalBytes += buffer->size();
1041
        pSock->send(buffer,kSockFlags, peerAddr.get());
xzl committed
1042 1043 1044 1045 1046 1047 1048
	}
		break;
	default:
		break;
	}
}

1049
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
xiongziliang committed
1050 1051
	if(iTrackIdx % 2 == 0){
		//这是rtp探测包
1052
		if(!_bGotAllPeerUdp){
xiongziliang committed
1053
			//还没有获取完整的rtp探测包
1054
			if(SockUtil::in_same_lan(get_local_ip().data(),get_peer_ip().data())){
xiongziliang committed
1055
				//在内网中,客户端上报的端口号是真实的,所以我们忽略udp打洞包
1056
				_bGotAllPeerUdp = true;
xiongziliang committed
1057 1058 1059
				return;
			}
			//设置真实的客户端nat映射端口号
1060 1061 1062
			_apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
			_abGotPeerUdp[iTrackIdx / 2] = true;
			_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
xiongziliang committed
1063
			for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
1064
				if (!_abGotPeerUdp[i]) {
xiongziliang committed
1065
					//还有track没获取到rtp探测包
1066
					_bGotAllPeerUdp = false;
xiongziliang committed
1067 1068 1069
					break;
				}
			}
xzl committed
1070
		}
xiongziliang committed
1071 1072
	}else{
		//这是rtcp心跳包,说明播放器还存活
1073
		_ticker.resetTime();
xiongziliang committed
1074
		//TraceL << "rtcp:" << (iTrackIdx-1)/2 ;
xzl committed
1075 1076 1077 1078
	}
}


xiongziliang committed
1079
inline void RtspSession::startListenPeerUdpData() {
1080
	_bListenPeerUdpData = true;
xzl committed
1081
	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
1082 1083
	UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
			[weakSelf](int iTrackIdx,const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool {
xzl committed
1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
				auto strongSelf=weakSelf.lock();
				if(!strongSelf) {
					return false;
				}
				struct sockaddr addr=*pPeerAddr;
				strongSelf->async_first([weakSelf,pBuf,addr,iTrackIdx]() {
							auto strongSelf=weakSelf.lock();
							if(!strongSelf) {
								return;
							}
							strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr);
						});
				return true;
			});
}

inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) {
1101
	_pSender = session->_sock;
xzl committed
1102
	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
1103
	session->_onDestory = [weakSelf]() {
xzl committed
1104
		auto strongSelf=weakSelf.lock();
xiongziliang committed
1105 1106 1107 1108
        if(!strongSelf) {
            return;
        }
        //DebugL;
1109
        strongSelf->_pSender->setOnErr([weakSelf](const SockException &err) {
xzl committed
1110 1111 1112 1113 1114 1115 1116
			auto strongSelf=weakSelf.lock();
			if(!strongSelf) {
				return;
			}
			strongSelf->safeShutdown();
		});
	};
xiongziliang committed
1117
	session->shutdown_l(false);
xzl committed
1118 1119 1120 1121 1122 1123 1124
}

#ifdef RTSP_SEND_RTCP
inline void RtspSession::sendRTCP() {
	//DebugL;
	uint8_t aui8Rtcp[60] = {0};
	uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
1125 1126 1127
	for (uint8_t i = 0; i < _uiTrackCnt; i++) {
		auto &track = _aTrackInfo[i];
		auto &counter = _aRtcpCnt[i];
xzl committed
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180

		aui8Rtcp[0] = '$';
		aui8Rtcp[1] = track.trackId * 2 + 1;
		aui8Rtcp[2] = 56 / 256;
		aui8Rtcp[3] = 56 % 256;

		pui8Rtcp_SR[0] = 0x80;
		pui8Rtcp_SR[1] = 0xC8;
		pui8Rtcp_SR[2] = 0x00;
		pui8Rtcp_SR[3] = 0x06;

		uint32_t ssrc=htonl(track.ssrc);
		memcpy(&pui8Rtcp_SR[4], &ssrc, 4);

		uint64_t msw;
		uint64_t lsw;
		struct timeval tv;
		gettimeofday(&tv, NULL);
		msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
		lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);

		msw = htonl(msw);
		memcpy(&pui8Rtcp_SR[8], &msw, 4);

		lsw = htonl(lsw);
		memcpy(&pui8Rtcp_SR[12], &lsw, 4);

		uint32_t rtpStamp = htonl(counter.timeStamp);
		memcpy(&pui8Rtcp_SR[16], &rtpStamp, 4);

		uint32_t pktCnt = htonl(counter.pktCnt);
		memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);

		uint32_t octCount = htonl(counter.octCount);
		memcpy(&pui8Rtcp_SR[24], &octCount, 4);

		pui8Rtcp_SDES[0] = 0x81;
		pui8Rtcp_SDES[1] = 0xCA;
		pui8Rtcp_SDES[2] = 0x00;
		pui8Rtcp_SDES[3] = 0x06;

		memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

		pui8Rtcp_SDES[8] = 0x01;
		pui8Rtcp_SDES[9] = 0x0f;
		memcpy(&pui8Rtcp_SDES[10], "_ZL_RtspServer_", 15);
		pui8Rtcp_SDES[25] = 0x00;
		send((char *) aui8Rtcp, 60);
	}
}
#endif

}
xiongziliang committed
1181
/* namespace mediakit */
xzl committed
1182