RtspSession.cpp 44.6 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4
 * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
xiongziliang committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25
 */
26

xiongzilaing committed
27
#include <atomic>
28
#include <iomanip>
xiongziliang committed
29
#include "Common/config.h"
xiongzilaing committed
30
#include "UDPServer.h"
xzl committed
31
#include "RtspSession.h"
xiongzilaing committed
32
#include "Util/mini.h"
33
#include "Util/MD5.h"
xiongziliang committed
34
#include "Util/base64.h"
xiongzilaing committed
35
#include "Util/onceToken.h"
xzl committed
36 37
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongzilaing committed
38
#include "Network/sockutil.h"
xzl committed
39

40 41
#define RTSP_SERVER_SEND_RTCP 0

xiongziliang committed
42 43
using namespace std;
using namespace toolkit;
xzl committed
44

xiongziliang committed
45
namespace mediakit {
xzl committed
46

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
/**
 * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
 * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
 * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输
 * 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输
 * 4: rtp over http,下面着重讲解:rtp over http
 *
 * rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的,
 * 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求,
 * 第二次是http post请求。
 *
 * 这两次请求通过http请求头中的x-sessioncookie键完成绑定
 *
 * 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求
 * 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送
 * 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包,
 * 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接
 * 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据
 *
 * zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理
 */


//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
xiongziliang committed
74

xiongziliang committed
75
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
xiongziliang committed
76
    DebugP(this);
77 78
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
    pSock->setSendTimeOutSecond(keep_alive_sec);
xiongziliang committed
79 80
    //起始接收buffer缓存设置为4K,节省内存
    pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
xzl committed
81 82 83
}

RtspSession::~RtspSession() {
84
    DebugP(this);
xzl committed
85 86 87
}

void RtspSession::onError(const SockException& err) {
xiongziliang committed
88 89 90 91 92 93 94
    bool isPlayer = !_pushSrc;
    WarnP(this) << (isPlayer ? "播放器(" : "推流器(")
                << _mediaInfo._vhost << "/"
                << _mediaInfo._app << "/"
                << _mediaInfo._streamid
                << ")断开:" << err.what();

xiongziliang committed
95
	if (_rtpType == Rtsp::RTP_MULTICAST) {
xiongziliang committed
96
		//取消UDP端口监听
97
		UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
xzl committed
98 99
	}

100 101 102 103
	if (_http_x_sessioncookie.size() != 0) {
		//移除http getter的弱引用记录
		lock_guard<recursive_mutex> lock(g_mtxGetter);
		g_mapGetter.erase(_http_x_sessioncookie);
xzl committed
104
	}
105 106

    //流量统计事件广播
107
    GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
108
    if(_ui64TotalBytes > iFlowThreshold * 1024){
109
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
110 111 112
										   _mediaInfo,
										   _ui64TotalBytes,
										   _ticker.createdTime()/1000,
113
                                           isPlayer,
114
										   *this);
115
    }
116

xzl committed
117 118 119
}

void RtspSession::onManager() {
120
    GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
121 122 123
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);

    if (_ticker.createdTime() > handshake_sec * 1000) {
124
		if (_strSession.size() == 0) {
xiongziliang committed
125
			shutdown(SockException(Err_timeout,"illegal connection"));
xzl committed
126 127 128
			return;
		}
	}
129

130

131
	if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
132
		//如果是推流端或者rtp over udp类型的播放端,那么就做超时检测
xiongziliang committed
133 134
        shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
        return;
xzl committed
135 136 137
	}
}

xiongziliang committed
138 139 140 141 142 143 144
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
	_ticker.resetTime();
    _ui64TotalBytes += pBuf->size();
    if (_onRecv) {
		//http poster的请求数据转发给http getter处理
		_onRecv(pBuf);
	} else {
145
//    	TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
xiongziliang committed
146 147 148
		input(pBuf->data(),pBuf->size());
	}
}
xiongziliang committed
149

150 151 152 153 154 155
//字符串是否以xx结尾
static inline bool end_of(const string &str, const string &substr){
    auto pos = str.rfind(substr);
    return pos != string::npos && pos == str.size() - substr.size();
};

xiongziliang committed
156 157 158
void RtspSession::onWholeRtspPacket(Parser &parser) {
	string strCmd = parser.Method(); //提取出请求命令字
	_iCseq = atoi(parser["CSeq"].data());
159
	if(_strContentBase.empty() && strCmd != "GET"){
xiongziliang committed
160 161
		_strContentBase = parser.Url();
		_mediaInfo.parse(parser.FullUrl());
162 163
        _mediaInfo._schema = RTSP_SCHEMA;
    }
xzl committed
164

165
	typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
xiongziliang committed
166
	static unordered_map<string, rtsp_request_handler> s_cmd_functions;
xiongziliang committed
167
	static onceToken token( []() {
xiongziliang committed
168 169 170 171 172 173 174 175 176 177 178 179
		s_cmd_functions.emplace("OPTIONS",&RtspSession::handleReq_Options);
		s_cmd_functions.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
		s_cmd_functions.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
		s_cmd_functions.emplace("RECORD",&RtspSession::handleReq_RECORD);
		s_cmd_functions.emplace("SETUP",&RtspSession::handleReq_Setup);
		s_cmd_functions.emplace("PLAY",&RtspSession::handleReq_Play);
		s_cmd_functions.emplace("PAUSE",&RtspSession::handleReq_Pause);
		s_cmd_functions.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
		s_cmd_functions.emplace("GET",&RtspSession::handleReq_Get);
		s_cmd_functions.emplace("POST",&RtspSession::handleReq_Post);
		s_cmd_functions.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
		s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
xiongziliang committed
180 181
	}, []() {});

xiongziliang committed
182 183
	auto it = s_cmd_functions.find(strCmd);
	if (it == s_cmd_functions.end()) {
184
        sendRtspResponse("403 Forbidden");
xiongziliang committed
185
        shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
186
        return;
xzl committed
187
	}
188 189 190 191 192 193 194 195 196 197 198 199

    auto &fun = it->second;
    try {
        (this->*fun)(parser);
    }catch (SockException &ex){
        if(ex){
            shutdown(ex);
        }
    }catch (exception &ex){
        shutdown(SockException(Err_shutdown,ex.what()));
    }
    parser.Clear();
xiongziliang committed
200 201
}

xiongziliang committed
202
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
xiongziliang committed
203 204 205
	if(!_pushSrc){
		return;
	}
206

xiongziliang committed
207 208 209 210
	int trackIdx = -1;
	uint8_t interleaved = data[1];
	if(interleaved %2 == 0){
		trackIdx = getTrackIndexByInterleaved(interleaved);
211 212 213 214 215 216
        if (trackIdx != -1) {
            handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
        }
	}else{
        trackIdx = getTrackIndexByInterleaved(interleaved - 1);
        if (trackIdx != -1) {
217
            onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4);
218
        }
xiongziliang committed
219 220 221
	}
}

222
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
223 224

}
xiongziliang committed
225 226 227 228
int64_t RtspSession::getContentLength(Parser &parser) {
	if(parser.Method() == "POST"){
		//http post请求的content数据部分是base64编码后的rtsp请求信令包
		return remainDataSize();
xiongziliang committed
229
	}
xiongziliang committed
230
	return RtspSplitter::getContentLength(parser);
xzl committed
231 232
}

xiongziliang committed
233

234
void RtspSession::handleReq_Options(const Parser &parser) {
235
	//支持这些命令
236
	sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
xzl committed
237 238
}

239
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
xiongziliang committed
240 241 242 243 244 245 246
	auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTSP_SCHEMA,
																	   _mediaInfo._vhost,
																	   _mediaInfo._app,
																	   _mediaInfo._streamid,
																	   false));
	if(src){
		sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
247 248 249 250 251 252
        string err = StrPrinter << "ANNOUNCE:"
                                << "Already publishing:"
                                << _mediaInfo._vhost << " "
                                << _mediaInfo._app << " "
                                << _mediaInfo._streamid << endl;
		throw SockException(Err_shutdown,err);
253
	}
xiongziliang committed
254

255 256 257
    auto full_url = parser.FullUrl();
    if(end_of(full_url,".sdp")){
        //去除.sdp后缀,防止EasyDarwin推流器强制添加.sdp后缀
258
        full_url = full_url.substr(0,full_url.length() - 4);
259 260 261
        _mediaInfo.parse(full_url);
    }

262 263 264
    SdpParser sdpParser(parser.Content());
    _strSession = makeRandStr(12);
    _aTrackInfo = sdpParser.getAvailableTrack();
xiongziliang committed
265

266
	_pushSrc = std::make_shared<RtspMediaSourceImp>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
xiongziliang committed
267
	_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
268
    _pushSrc->setSdp(sdpParser.toString());
269 270

	sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"});
271 272
}

273
void RtspSession::handleReq_RECORD(const Parser &parser){
xiongziliang committed
274
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
275
		send_SessionNotFound();
276
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
277
	}
278
	auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
279 280 281
		bool authSuccess = err.empty();
		if(!authSuccess){
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
282
			shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
283 284 285
			return;
		}

286 287 288
        //设置转协议
        _pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);

289 290 291 292
		_StrPrinter rtp_info;
		for(auto &track : _aTrackInfo){
			if (track->_inited == false) {
				//还有track没有setup
xiongziliang committed
293 294
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
295
			}
xiongziliang committed
296
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
297 298 299 300
		}

		rtp_info.pop_back();
		sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
301 302
		if(_rtpType == Rtsp::RTP_TCP){
			//如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能
303
			_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
304
            setSocketFlags();
305
		}
306 307 308
	};

	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
309
	Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
310 311 312 313
		auto strongSelf = weakSelf.lock();
		if(!strongSelf){
			return;
		}
314
		strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){
315 316 317 318
			auto strongSelf = weakSelf.lock();
			if(!strongSelf){
				return;
			}
319
			onRes(err,enableRtxp,enableHls,enableMP4);
320 321 322 323 324 325 326
		});
	};

	//rtsp推流需要鉴权
	auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
	if(!flag){
		//该事件无人监听,默认不鉴权
327 328 329 330 331
        GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
        GET_CONFIG(bool,toHls,General::kPublishToHls);
        GET_CONFIG(bool,toMP4,General::kPublishToMP4);
        onRes("",toRtxp,toHls,toMP4);
    }
332 333
}

334
void RtspSession::handleReq_Describe(const Parser &parser) {
335 336
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    //该请求中的认证信息
337
    auto authorization = parser["Authorization"];
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
    onGetRealm invoker = [weakSelf,authorization](const string &realm){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
        }
        //切换到自己的线程然后执行
        strongSelf->async([weakSelf,realm,authorization](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            if(realm.empty()){
                //无需认证,回复sdp
                strongSelf->onAuthSuccess();
                return;
            }
            //该流需要认证
            strongSelf->onAuthUser(realm,authorization);
        });

    };

    //广播是否需要认证事件
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
                                           _mediaInfo,
                                           invoker,
                                           *this)){
        //无人监听此事件,说明无需认证
        invoker("");
    }
}
void RtspSession::onAuthSuccess() {
    TraceP(this);
373
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
xiongziliang committed
374
    MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){
375 376 377 378 379 380 381
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
        if (!rtsp_src) {
            //未找到相应的MediaSource
xiongziliang committed
382
            string err = StrPrinter << "no such stream:" <<  strongSelf->_mediaInfo._vhost << " " <<  strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
383
            strongSelf->send_StreamNotFound();
xiongziliang committed
384
            strongSelf->shutdown(SockException(Err_shutdown,err));
385 386
            return;
        }
3503207480@qq.com committed
387
        //找到了相应的rtsp流
388
        strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
389 390
        if (strongSelf->_aTrackInfo.empty()) {
            //该流无效
3503207480@qq.com committed
391
			DebugL << "无trackInfo,该流无效";
392
            strongSelf->send_StreamNotFound();
xiongziliang committed
393
            strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
394 395 396 397 398 399 400 401 402
            return;
        }
        strongSelf->_strSession = makeRandStr(12);
        strongSelf->_pMediaSrc = rtsp_src;
        for(auto &track : strongSelf->_aTrackInfo){
            track->_ssrc = rtsp_src->getSsrc(track->_type);
            track->_seq = rtsp_src->getSeqence(track->_type);
            track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
        }
xiongziliang committed
403

404 405 406 407
        strongSelf->sendRtspResponse("200 OK",
                                     {"Content-Base",strongSelf->_strContentBase + "/",
                                      "x-Accept-Retransmit","our-retransmit",
                                      "x-Accept-Dynamic-Rate","1"
408
                                     },rtsp_src->getSdp());
409
    });
xzl committed
410
}
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
    GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
    if (!authBasic) {
        //我们需要客户端优先以md5方式认证
        _strNonce = makeRandStr(32);
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" });
    }else {
        //当然我们也支持base64认证,但是我们不建议这样做
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Basic realm=\"" << realm << "\"" });
    }
    if(close){
        shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << why));
427 428 429
    }
}

430
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
431 432 433
    //base64认证
    char user_pwd_buf[512];
    av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
434
    auto user_pwd_vec = split(user_pwd_buf,":");
435 436
    if(user_pwd_vec.size() < 2){
        //认证信息格式不合法,回复401 Unauthorized
437
        onAuthFailed(realm,"can not find user and passwd when basic64 auth");
438 439 440 441
        return;
    }
    auto user = user_pwd_vec[0];
    auto pwd = user_pwd_vec[1];
442
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
443
    onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
444 445 446 447
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
448
        }
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
        //切换到自己的线程执行
        strongSelf->async([weakSelf,good_pwd,pwd,realm](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            //base64忽略encrypted参数,上层必须传入明文密码
            if(pwd == good_pwd){
                //提供的密码且匹配正确
                strongSelf->onAuthSuccess();
                return;
            }
            //密码错误
            strongSelf->onAuthFailed(realm,StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd);
        });
465
    };
466

467
    //此时必须提供明文密码
468
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,*this)){
469
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
470
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
471 472 473 474 475
        //但是我们还是忽略认证以便完成播放
        //我们输入的密码是明文
        invoker(false,pwd);
    }
}
476

477 478
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
	DebugP(this) << strMd5;
479 480 481
    auto mapTmp = Parser::parseArgs(strMd5,",","=");
    decltype(mapTmp) map;
    for(auto &pr : mapTmp){
482
        map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
483 484 485
    }
    //check realm
    if(realm != map["realm"]){
486
        onAuthFailed(realm,StrPrinter << "realm not mached:" << realm << " != " << map["realm"]);
487 488 489 490
        return ;
    }
    //check nonce
    auto nonce = map["nonce"];
491 492
    if(_strNonce != nonce){
        onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce);
493 494 495 496 497 498 499
        return ;
    }
    //check username and uri
    auto username = map["username"];
    auto uri = map["uri"];
    auto response = map["response"];
    if(username.empty() || uri.empty() || response.empty()){
500
        onAuthFailed(realm,StrPrinter << "username/uri/response empty:" << username << "," << uri << "," << response);
501 502 503
        return ;
    }

504
    auto realInvoker = [this,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
505 506
        if(ignoreAuth){
            //忽略认证
507 508
            TraceP(this) << "auth ignored";
            onAuthSuccess();
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
            return;
        }
        /*
        response计算方法如下:
        RTSP客户端应该使用username + password并计算response如下:
        (1)当password为MD5编码,则
            response = md5( password:nonce:md5(public_method:url)  );
        (2)当password为ANSI字符串,则
            response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
         */
        auto encrypted_pwd = good_pwd;
        if(!encrypted){
            //提供的是明文密码
            encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
        }

        auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
xiongziliang committed
526
        if(strcasecmp(good_response.data(),response.data()) == 0){
527
            //认证成功!md5不区分大小写
528
            onAuthSuccess();
529 530
        }else{
            //认证失败!
531
            onAuthFailed(realm, StrPrinter << "password mismatch when md5 auth:" << good_response << " != " << response );
532 533
        }
    };
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        //切换到自己的线程确保realInvoker执行时,this指针有效
        strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
            realInvoker(false,encrypted,good_pwd);
        });
549 550 551
    };

    //此时可以提供明文或md5加密的密码
552
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,*this)){
553
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
554
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
555 556 557 558 559
        //但是我们还是忽略认证以便完成播放
        realInvoker(true,true,"");
    }
}

560 561 562 563 564
void RtspSession::onAuthUser(const string &realm,const string &authorization){
    if(authorization.empty()){
        onAuthFailed(realm,"", false);
        return;
    }
565 566 567 568 569
    //请求中包含认证信息
    auto authType = FindField(authorization.data(),NULL," ");
	auto authStr = FindField(authorization.data()," ",NULL);
    if(authType.empty() || authStr.empty()){
        //认证信息格式不合法,回复401 Unauthorized
570
        onAuthFailed(realm,"can not find auth type or auth string");
571 572 573 574
        return;
    }
    if(authType == "Basic"){
        //base64认证,需要明文密码
575
        onAuthBasic(realm,authStr);
576 577
    }else if(authType == "Digest"){
        //md5认证
578
        onAuthDigest(realm,authStr);
579 580
    }else{
        //其他认证方式?不支持!
581
        onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
582 583
    }
}
xzl committed
584
inline void RtspSession::send_StreamNotFound() {
585
	sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
xzl committed
586 587
}
inline void RtspSession::send_UnsupportedTransport() {
588
	sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
xzl committed
589 590 591
}

inline void RtspSession::send_SessionNotFound() {
592
	sendRtspResponse("454 Session Not Found",{"Connection","Close"});
xzl committed
593
}
594 595

void RtspSession::handleReq_Setup(const Parser &parser) {
xzl committed
596
//处理setup命令,该函数可能进入多次
597
    auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
xiongziliang committed
598 599 600
    if(controlSuffix.front() == '/'){
		controlSuffix = controlSuffix.substr(1);
    }
601
	int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
xzl committed
602 603
	if (trackIdx == -1) {
		//未找到相应track
604 605
        throw SockException(Err_shutdown, StrPrinter << "can not find any track by control suffix:" << controlSuffix);
    }
xiongziliang committed
606
	SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
607
	if (trackRef->_inited) {
xzl committed
608
		//已经初始化过该Track
609
        throw SockException(Err_shutdown, "can not setup one track twice");
xzl committed
610
	}
611
	trackRef->_inited = true; //现在初始化
xzl committed
612

xiongziliang committed
613
	if(_rtpType == Rtsp::RTP_Invalid){
614
		auto &strTransport = parser["Transport"];
xzl committed
615
		if(strTransport.find("TCP") != string::npos){
xiongziliang committed
616
			_rtpType = Rtsp::RTP_TCP;
xzl committed
617
		}else if(strTransport.find("multicast") != string::npos){
xiongziliang committed
618
			_rtpType = Rtsp::RTP_MULTICAST;
xzl committed
619
		}else{
xiongziliang committed
620
			_rtpType = Rtsp::RTP_UDP;
xzl committed
621 622 623
		}
	}

624
    //允许接收rtp、rtcp包
xiongziliang committed
625
	RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
xiongziliang committed
626

627
	switch (_rtpType) {
xiongziliang committed
628
	case Rtsp::RTP_TCP: {
629 630 631 632 633 634 635 636 637 638 639 640 641
        if(_pushSrc){
            //rtsp推流时,interleaved由推流者决定
            auto key_values =  Parser::parseArgs(parser["Transport"],";","=");
            int interleaved_rtp = -1 , interleaved_rtcp = -1;
            if(2 == sscanf(key_values["interleaved"].data(),"%d-%d",&interleaved_rtp,&interleaved_rtcp)){
                trackRef->_interleaved = interleaved_rtp;
            }else{
                throw SockException(Err_shutdown, "can not find interleaved when setup of rtp over tcp");
            }
        }else{
            //rtsp播放时,由于数据共享分发,所以interleaved必须由服务器决定
            trackRef->_interleaved = 2 * trackRef->_type;
        }
642 643
		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
644
												 << "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";"
645
												 << "ssrc=" << printSSRC(trackRef->_ssrc),
646 647 648
						  "x-Transport-Options" , "late-tolerance=1.400000",
						  "x-Dynamic-Rate" , "1"
						 });
xzl committed
649 650
	}
		break;
xiongziliang committed
651
	case Rtsp::RTP_UDP: {
xiongziliang committed
652
		//我们用trackIdx区分rtp和rtcp包
653 654
		auto pSockRtp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) {
xiongziliang committed
655 656
			//分配端口失败
			send_NotAcceptable();
657 658
            throw SockException(Err_shutdown, "open rtp socket failed");
        }
659 660
		auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) {
xzl committed
661 662
			//分配端口失败
			send_NotAcceptable();
663 664
            throw SockException(Err_shutdown, "open rtcp socket failed");
        }
665 666
		_apRtpSock[trackIdx] = pSockRtp;
		_apRtcpSock[trackIdx] = pSockRtcp;
xiongziliang committed
667
		//设置客户端内网端口信息
xiongziliang committed
668
		string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
669 670 671 672 673
		uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
        uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data());

        struct sockaddr_in peerAddr;
        //设置rtp发送目标地址
xzl committed
674
		peerAddr.sin_family = AF_INET;
675
		peerAddr.sin_port = htons(ui16RtpPort);
676
		peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
xzl committed
677
		bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
678
		pSockRtp->setSendPeerAddr((struct sockaddr *)(&peerAddr));
679 680 681 682 683 684 685 686

		//设置rtcp发送目标地址
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtcpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
        pSockRtcp->setSendPeerAddr((struct sockaddr *)(&peerAddr));

xiongziliang committed
687
		//尝试获取客户端nat映射地址
688
		startListenPeerUdpData(trackIdx);
689
		//InfoP(this) << "分配端口:" << srv_port;
690 691 692 693 694

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;"
												 << "client_port=" << strClientPort << ";"
												 << "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";"
695
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
696
						 });
xzl committed
697 698
	}
		break;
xiongziliang committed
699
	case Rtsp::RTP_MULTICAST: {
xiongziliang committed
700 701 702
		if(!_multicaster){
			_multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
			if (!_multicaster) {
xzl committed
703
				send_NotAcceptable();
704 705
                throw SockException(Err_shutdown, "can not get a available udp multicast socket");
            }
xzl committed
706
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
xiongziliang committed
707
			_multicaster->setDetachCB(this, [weakSelf]() {
xzl committed
708 709 710 711
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
712
				strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
xzl committed
713 714
			});
		}
xiongziliang committed
715
		int iSrvPort = _multicaster->getPort(trackRef->_type);
xiongziliang committed
716
		//我们用trackIdx区分rtp和rtcp包
717
		//由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
718
		auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
xiongziliang committed
719 720 721
		if (!pSockRtcp) {
			//分配端口失败
			send_NotAcceptable();
722
            throw SockException(Err_shutdown, "open shared rtcp socket failed");
xiongziliang committed
723
		}
724
		startListenPeerUdpData(trackIdx);
725
        GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
726 727 728

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP;multicast;"
xiongziliang committed
729
												 << "destination=" << _multicaster->getIP() << ";"
730 731 732 733 734
												 << "source=" << get_local_ip() << ";"
												 << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
												 << "ttl=" << udpTTL << ";"
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
						 });
xzl committed
735 736 737 738 739 740 741
	}
		break;
	default:
		break;
	}
}

742
void RtspSession::handleReq_Play(const Parser &parser) {
xiongziliang committed
743
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
xzl committed
744
		send_SessionNotFound();
745 746
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when play" : "session not found when play");
    }
xiongziliang committed
747
	auto strRange = parser["Range"];
748
    auto onRes = [this,strRange](const string &err){
749
        bool authSuccess = err.empty();
xiongziliang committed
750
        if(!authSuccess){
751
            //第一次play是播放,否则是恢复播放。只对播放鉴权
752
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
753
            shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
754 755
            return;
        }
xzl committed
756

757
        auto pMediaSrc = _pMediaSrc.lock();
758 759
        if(!pMediaSrc){
        	send_StreamNotFound();
xiongziliang committed
760
        	shutdown(SockException(Err_shutdown,"rtsp stream released"));
761
			return;
762
        }
763

764 765 766
        bool useBuf = true;
		_enableSendRtp = false;

767
		if (strRange.size() && !_bFirstPlay) {
xiongziliang committed
768
            //这个是seek操作
769 770 771 772 773
			auto strStart = FindField(strRange.data(), "npt=", "-");
			if (strStart == "now") {
				strStart = "0";
			}
			auto iStartTime = 1000 * atof(strStart.data());
774
			InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
775
			useBuf = !pMediaSrc->seekTo(iStartTime);
776
		}else if(pMediaSrc->readerCount() == 0){
777 778 779 780
			//第一个消费者
			pMediaSrc->seekTo(0);
		}
		_bFirstPlay = false;
781

782
		_StrPrinter rtp_info;
xiongziliang committed
783
		for(auto &track : _aTrackInfo){
784
			if (track->_inited == false) {
xiongziliang committed
785
				//还有track没有setup
xiongziliang committed
786 787
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
xiongziliang committed
788
			}
789 790 791 792
			track->_ssrc = pMediaSrc->getSsrc(track->_type);
			track->_seq = pMediaSrc->getSeqence(track->_type);
			track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);

xiongziliang committed
793
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";"
794 795
					 << "seq=" << track->_seq << ";"
					 << "rtptime=" << (int)(track->_time_stamp * (track->_samplerate / 1000)) << ",";
xiongziliang committed
796 797
		}

798 799 800 801 802 803
		rtp_info.pop_back();

		sendRtspResponse("200 OK",
						 {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) <<  pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0,
						  "RTP-Info",rtp_info
						 });
804

805
		_enableSendRtp = true;
806
        setSocketFlags();
xiongziliang committed
807

xiongziliang committed
808
		if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
809
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
810
			_pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf);
811 812 813 814 815
			_pRtpReader->setDetachCB([weakSelf]() {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
816 817
                strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached"));
            });
818 819 820 821 822
			_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
823 824 825
				if(strongSelf->_enableSendRtp) {
					strongSelf->sendRtpPacket(pack);
				}
826 827
			});
		}
828
    };
xzl committed
829

830
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
831
    Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
832 833 834
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
xzl committed
835
        }
836
        strongSelf->async([weakSelf,onRes,err](){
837 838 839 840
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
841
            onRes(err);
842 843
        });
    };
xiongziliang committed
844 845 846 847 848 849 850 851 852
    if(_bFirstPlay){
        //第一次收到play命令,需要鉴权
        auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
        if(!flag){
            //该事件无人监听,默认不鉴权
            onRes("");
        }
    }else{
        //后面是seek或恢复命令,不需要鉴权
853
        onRes("");
854
    }
xzl committed
855 856
}

857
void RtspSession::handleReq_Pause(const Parser &parser) {
xiongziliang committed
858
	if (parser["Session"] != _strSession) {
xzl committed
859
		send_SessionNotFound();
860 861
        throw SockException(Err_shutdown,"session not found when pause");
    }
862 863

	sendRtspResponse("200 OK");
864
	_enableSendRtp = false;
xzl committed
865 866
}

867
void RtspSession::handleReq_Teardown(const Parser &parser) {
868
	sendRtspResponse("200 OK");
869
    throw SockException(Err_shutdown,"rtsp player send teardown request");
xzl committed
870 871
}

872
void RtspSession::handleReq_Get(const Parser &parser) {
xiongziliang committed
873
	_http_x_sessioncookie = parser["x-sessioncookie"];
874
	sendRtspResponse("200 OK",
875
					 {"Cache-Control","no-store",
876 877 878 879
					  "Pragma","no-store",
					  "Content-Type","application/x-rtsp-tunnelled",
					 },"","HTTP/1.0");

880
	//注册http getter,以便http poster绑定
xzl committed
881
	lock_guard<recursive_mutex> lock(g_mtxGetter);
882
	g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
xzl committed
883 884
}

885
void RtspSession::handleReq_Post(const Parser &parser) {
xzl committed
886
	lock_guard<recursive_mutex> lock(g_mtxGetter);
xiongziliang committed
887
	string sessioncookie = parser["x-sessioncookie"];
888
	//Poster 找到 Getter
xzl committed
889 890
	auto it = g_mapGetter.find(sessioncookie);
	if (it == g_mapGetter.end()) {
891 892
        throw SockException(Err_shutdown,"can not find http getter by x-sessioncookie");
    }
893 894 895 896

	//Poster 找到Getter的SOCK
	auto httpGetterWeak = it->second;
	//移除http getter的弱引用记录
xzl committed
897
	g_mapGetter.erase(sessioncookie);
898 899 900 901 902

	//http poster收到请求后转发给http getter处理
	_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
		auto httpGetterStrong = httpGetterWeak.lock();
		if(!httpGetterStrong){
xiongziliang committed
903
			shutdown(SockException(Err_shutdown,"http getter released"));
904 905 906 907 908 909 910 911 912 913 914 915 916
			return;
		}

		//切换到http getter的线程
		httpGetterStrong->async([pBuf,httpGetterWeak](){
			auto httpGetterStrong = httpGetterWeak.lock();
			if(!httpGetterStrong){
				return;
			}
			httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
		});
	};

xiongziliang committed
917 918 919 920
	if(!parser.Content().empty()){
		//http poster后面的粘包
		_onRecv(std::make_shared<BufferString>(parser.Content()));
	}
921 922 923 924 925 926

    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");
xzl committed
927 928
}

929
void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
930
	//TraceP(this) <<endl;
931
	sendRtspResponse("200 OK");
xzl committed
932 933 934
}

inline void RtspSession::send_NotAcceptable() {
935
	sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
xzl committed
936
}
937

938

xiongziliang committed
939
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
940 941 942
    GET_CONFIG(bool,modify_stamp,Rtsp::kModifyStamp);
    if(modify_stamp){
        int64_t dts_out;
943
        _stamp[trackidx].revise(rtppt->timeStamp, rtppt->timeStamp, dts_out, dts_out, true);
944 945
        rtppt->timeStamp = dts_out;
    }
xiongziliang committed
946
	_pushSrc->onWrite(rtppt, false);
xiongziliang committed
947
}
948
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
949 950 951
	//这是rtcp心跳包,说明播放器还存活
	_ticker.resetTime();

952
	if(intervaled % 2 == 0){
xiongziliang committed
953
		if(_pushSrc){
954
		    //这是rtsp推流上来的rtp包
955
			handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size());
956 957
		}else if(!_udpSockConnected.count(intervaled)){
            //这是rtsp播放器的rtp打洞包
958 959
            _udpSockConnected.emplace(intervaled);
            _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
xzl committed
960
		}
961 962
	}else{
	    //rtcp包
963 964
        if(!_udpSockConnected.count(intervaled)){
            _udpSockConnected.emplace(intervaled);
965 966
            _apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr);
        }
967
        onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size());
968
    }
xzl committed
969 970 971
}


972
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
xzl committed
973
	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
974 975
    auto srcIP = inet_addr(get_peer_ip().data());
	auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
976 977 978 979
		auto strongSelf=weakSelf.lock();
		if(!strongSelf) {
			return false;
		}
980 981 982 983 984 985 986

        if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
            WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
                                    << inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
            return true;
        }

987
		struct sockaddr addr=*pPeerAddr;
988
		strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
989 990 991 992
			auto strongSelf=weakSelf.lock();
			if(!strongSelf) {
				return;
			}
993
			strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
994 995 996 997 998
		});
		return true;
	};

	switch (_rtpType){
xiongziliang committed
999
		case Rtsp::RTP_MULTICAST:{
1000 1001
			//组播使用的共享rtcp端口
			UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
1002 1003
					int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
				return onUdpData(pBuf,pPeerAddr,intervaled);
xzl committed
1004
			});
1005 1006
		}
			break;
xiongziliang committed
1007
		case Rtsp::RTP_UDP:{
1008
			auto setEvent = [&](Socket::Ptr &sock,int intervaled){
1009
				if(!sock){
1010
					WarnP(this) << "udp端口为空:" << intervaled;
1011 1012
					return;
				}
1013
				sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
1014
					onUdpData(pBuf,pPeerAddr,intervaled);
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
				});
			};
			setEvent(_apRtpSock[trackIdx], 2*trackIdx );
			setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
		}
			break;

		default:
			break;
	}

xzl committed
1026 1027
}

1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
static string dateStr(){
	char buf[64];
	time_t tt = time(NULL);
	strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
	return buf;
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const StrCaseMap &header_const,
								   const string &sdp,
								   const char *protocol){
	auto header = header_const;
	header.emplace("CSeq",StrPrinter << _iCseq);
	if(!_strSession.empty()){
		header.emplace("Session",_strSession);
	}

	header.emplace("Server",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
	header.emplace("Date",dateStr());

	if(!sdp.empty()){
		header.emplace("Content-Length",StrPrinter << sdp.size());
		header.emplace("Content-Type","application/sdp");
	}

	_StrPrinter printer;
	printer << protocol << " " << res_code << "\r\n";
	for (auto &pr : header){
		printer << pr.first << ": " << pr.second << "\r\n";
	}

	printer << "\r\n";

	if(!sdp.empty()){
		printer << sdp;
	}
1064
//	DebugP(this) << printer;
1065 1066 1067 1068
	return send(std::make_shared<BufferString>(printer)) > 0 ;
}

int RtspSession::send(const Buffer::Ptr &pkt){
xiongziliang committed
1069
//	if(!_enableSendRtp){
1070
//		DebugP(this) << pkt->data();
xiongziliang committed
1071
//	}
1072
	_ui64TotalBytes += pkt->size();
1073
	return TcpSession::send(pkt);
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const std::initializer_list<string> &header,
								   const string &sdp,
								   const char *protocol) {
	string key;
	StrCaseMap header_map;
	int i = 0;
	for(auto &val : header){
		if(++i % 2 == 0){
			header_map.emplace(key,val);
		}else{
			key = val;
		}
	}
	return sendRtspResponse(res_code,header_map,sdp,protocol);
}

inline string RtspSession::printSSRC(uint32_t ui32Ssrc) {
	char tmp[9] = { 0 };
	ui32Ssrc = htonl(ui32Ssrc);
	uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
	for (int i = 0; i < 4; i++) {
		sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
	}
	return tmp;
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (type == _aTrackInfo[i]->_type) {
			return i;
		}
	}
xiongziliang committed
1108 1109 1110
    if(_aTrackInfo.size() == 1){
        return 0;
    }
1111 1112 1113 1114 1115 1116 1117 1118
	return -1;
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
			return i;
		}
	}
xiongziliang committed
1119 1120 1121
	if(_aTrackInfo.size() == 1){
        return 0;
	}
1122 1123 1124
	return -1;
}

xiongziliang committed
1125 1126 1127 1128 1129 1130
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (_aTrackInfo[i]->_interleaved == interleaved) {
			return i;
		}
	}
xiongziliang committed
1131 1132 1133
    if(_aTrackInfo.size() == 1){
        return 0;
    }
xiongziliang committed
1134 1135 1136
	return -1;
}

xiongziliang committed
1137
bool RtspSession::close(MediaSource &sender,bool force) {
1138
    //此回调在其他线程触发
1139
    if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){
1140 1141
        return false;
    }
xiongziliang committed
1142 1143
	string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
	safeShutdown(SockException(Err_shutdown,err));
1144 1145
	return true;
}
1146

1147

1148 1149
void RtspSession::onNoneReader(MediaSource &sender){
    //此回调在其他线程触发
1150
    if(!_pushSrc || _pushSrc->totalReaderCount()){
1151 1152 1153 1154 1155
        return;
    }
    MediaSourceEvent::onNoneReader(sender);
}

1156 1157 1158
int RtspSession::totalReaderCount(MediaSource &sender) {
    return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
}
1159

xiongziliang committed
1160
void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
1161
    //InfoP(this) <<(int)pkt.Interleaved;
1162 1163
    switch (_rtpType) {
        case Rtsp::RTP_TCP: {
1164
            send(pkt);
1165 1166 1167 1168 1169 1170
        }
            break;
        case Rtsp::RTP_UDP: {
            int iTrackIndex = getTrackIndexByTrackType(pkt->type);
            auto &pSock = _apRtpSock[iTrackIndex];
            if (!pSock) {
xiongziliang committed
1171
                shutdown(SockException(Err_shutdown,"udp sock not opened yet"));
1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
                return;
            }
            BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
            _ui64TotalBytes += buffer->size();
            pSock->send(buffer);
        }
            break;
        default:
            break;
    }

1183
#if RTSP_SERVER_SEND_RTCP
1184
    int iTrackIndex = getTrackIndexByTrackType(pkt->type);
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
    if(iTrackIndex == -1){
        return;
    }
    RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
    counter.pktCnt += 1;
    counter.octCount += (pkt->length - pkt->offset);
    auto &ticker = _aRtcpTicker[iTrackIndex];
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        ticker.resetTime();
        //直接保存网络字节序
        memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
        sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
    }
1199
#endif
1200 1201
}

xiongziliang committed
1202
void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
xiongziliang committed
1203 1204
    static const char s_cname[] = "ZLMediaKitRtsp";
    uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
1205 1206 1207 1208 1209 1210
    uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
    auto &track = _aTrackInfo[iTrackIndex];
    auto &counter = _aRtcpCnt[iTrackIndex];

    aui8Rtcp[0] = '$';
    aui8Rtcp[1] = track->_interleaved + 1;
xiongziliang committed
1211 1212
    aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8;
    aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;
1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251

    pui8Rtcp_SR[0] = 0x80;
    pui8Rtcp_SR[1] = 0xC8;
    pui8Rtcp_SR[2] = 0x00;
    pui8Rtcp_SR[3] = 0x06;

    uint32_t ssrc=htonl(track->_ssrc);
    memcpy(&pui8Rtcp_SR[4], &ssrc, 4);

    uint64_t msw;
    uint64_t lsw;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
    lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);

    msw = htonl(msw);
    memcpy(&pui8Rtcp_SR[8], &msw, 4);

    lsw = htonl(lsw);
    memcpy(&pui8Rtcp_SR[12], &lsw, 4);
    //直接使用网络字节序
    memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4);

    uint32_t pktCnt = htonl(counter.pktCnt);
    memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);

    uint32_t octCount = htonl(counter.octCount);
    memcpy(&pui8Rtcp_SR[24], &octCount, 4);

    pui8Rtcp_SDES[0] = 0x81;
    pui8Rtcp_SDES[1] = 0xCA;
    pui8Rtcp_SDES[2] = 0x00;
    pui8Rtcp_SDES[3] = 0x06;

    memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

    pui8Rtcp_SDES[8] = 0x01;
    pui8Rtcp_SDES[9] = 0x0f;
xiongziliang committed
1252 1253
    memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
    pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
1254 1255 1256 1257 1258 1259

    if(overTcp){
        send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
    }else {
        _apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
    }
xzl committed
1260 1261
}

1262 1263 1264 1265 1266 1267 1268 1269 1270 1271
void RtspSession::setSocketFlags(){
    GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
    if(!ultraLowDelay) {
        //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
        SockUtil::setNoDelay(_sock->rawFD(), false);
        //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
        (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
    }
}

xzl committed
1272
}
xiongziliang committed
1273
/* namespace mediakit */
xzl committed
1274