RtspSession.cpp 42.7 KB
Newer Older
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4
 * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
xiongziliang committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25
 */
26

xiongzilaing committed
27
#include <atomic>
28
#include <iomanip>
xiongziliang committed
29
#include "Common/config.h"
xiongzilaing committed
30
#include "UDPServer.h"
xzl committed
31
#include "RtspSession.h"
xiongzilaing committed
32
#include "Util/mini.h"
33
#include "Util/MD5.h"
xiongziliang committed
34
#include "Util/base64.h"
xiongzilaing committed
35
#include "Util/onceToken.h"
xzl committed
36 37
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongzilaing committed
38
#include "Network/sockutil.h"
xzl committed
39

40 41
#define RTSP_SERVER_SEND_RTCP 0

xiongziliang committed
42 43
using namespace std;
using namespace toolkit;
xzl committed
44

xiongziliang committed
45
namespace mediakit {
xzl committed
46

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
/**
 * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
 * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
 * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输
 * 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输
 * 4: rtp over http,下面着重讲解:rtp over http
 *
 * rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的,
 * 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求,
 * 第二次是http post请求。
 *
 * 这两次请求通过http请求头中的x-sessioncookie键完成绑定
 *
 * 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求
 * 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送
 * 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包,
 * 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接
 * 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据
 *
 * zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理
 */


//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
xiongziliang committed
74 75
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;

xiongziliang committed
76
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
xiongziliang committed
77
    DebugP(this);
78 79
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
    pSock->setSendTimeOutSecond(keep_alive_sec);
xiongziliang committed
80 81
    //起始接收buffer缓存设置为4K,节省内存
    pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
xzl committed
82 83 84
}

RtspSession::~RtspSession() {
85
    DebugP(this);
xzl committed
86 87 88
}

void RtspSession::onError(const SockException& err) {
xiongziliang committed
89
	WarnP(this) << err.what();
xiongziliang committed
90
	if (_rtpType == Rtsp::RTP_MULTICAST) {
xiongziliang committed
91
		//取消UDP端口监听
92
		UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
xzl committed
93 94
	}

95 96 97 98
	if (_http_x_sessioncookie.size() != 0) {
		//移除http getter的弱引用记录
		lock_guard<recursive_mutex> lock(g_mtxGetter);
		g_mapGetter.erase(_http_x_sessioncookie);
xzl committed
99
	}
100 101

    //流量统计事件广播
102
    GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
103
    if(_ui64TotalBytes > iFlowThreshold * 1024){
104
        bool isPlayer = !_pushSrc;
105
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
106 107 108
										   _mediaInfo,
										   _ui64TotalBytes,
										   _ticker.createdTime()/1000,
109
                                           isPlayer,
110
										   *this);
111
    }
112

xzl committed
113 114 115
}

void RtspSession::onManager() {
116 117 118 119
    GET_CONFIG(uint32_t,handshake_sec,Rtsp::kKeepAliveSecond);
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);

    if (_ticker.createdTime() > handshake_sec * 1000) {
120
		if (_strSession.size() == 0) {
xiongziliang committed
121
			shutdown(SockException(Err_timeout,"illegal connection"));
xzl committed
122 123 124
			return;
		}
	}
125

126

127
	if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
128
		//如果是推流端或者rtp over udp类型的播放端,那么就做超时检测
xiongziliang committed
129 130
        shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
        return;
xzl committed
131 132 133
	}
}

xiongziliang committed
134 135 136 137 138 139 140
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
	_ticker.resetTime();
    _ui64TotalBytes += pBuf->size();
    if (_onRecv) {
		//http poster的请求数据转发给http getter处理
		_onRecv(pBuf);
	} else {
141
//    	TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
xiongziliang committed
142 143 144
		input(pBuf->data(),pBuf->size());
	}
}
xiongziliang committed
145

xiongziliang committed
146 147 148
void RtspSession::onWholeRtspPacket(Parser &parser) {
	string strCmd = parser.Method(); //提取出请求命令字
	_iCseq = atoi(parser["CSeq"].data());
149
	if(_strContentBase.empty() && strCmd != "GET"){
xiongziliang committed
150 151
		_strContentBase = parser.Url();
		_mediaInfo.parse(parser.FullUrl());
152 153
        _mediaInfo._schema = RTSP_SCHEMA;
    }
xzl committed
154

155
	typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
156
	static unordered_map<string, rtsp_request_handler> s_handler_map;
xiongziliang committed
157
	static onceToken token( []() {
158 159 160
		s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options);
		s_handler_map.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
		s_handler_map.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
xiongziliang committed
161
		s_handler_map.emplace("RECORD",&RtspSession::handleReq_RECORD);
162 163 164 165 166 167 168 169
		s_handler_map.emplace("SETUP",&RtspSession::handleReq_Setup);
		s_handler_map.emplace("PLAY",&RtspSession::handleReq_Play);
		s_handler_map.emplace("PAUSE",&RtspSession::handleReq_Pause);
		s_handler_map.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
		s_handler_map.emplace("GET",&RtspSession::handleReq_Get);
		s_handler_map.emplace("POST",&RtspSession::handleReq_Post);
		s_handler_map.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
		s_handler_map.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
xiongziliang committed
170 171
	}, []() {});

172
	auto it = s_handler_map.find(strCmd);
173 174
	if (it == s_handler_map.end()) {
        sendRtspResponse("403 Forbidden");
xiongziliang committed
175
        shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
176
        return;
xzl committed
177
	}
178 179 180 181 182 183 184 185 186 187 188 189

    auto &fun = it->second;
    try {
        (this->*fun)(parser);
    }catch (SockException &ex){
        if(ex){
            shutdown(ex);
        }
    }catch (exception &ex){
        shutdown(SockException(Err_shutdown,ex.what()));
    }
    parser.Clear();
xiongziliang committed
190 191
}

xiongziliang committed
192
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
xiongziliang committed
193 194 195
	if(!_pushSrc){
		return;
	}
196

xiongziliang committed
197 198 199 200
	int trackIdx = -1;
	uint8_t interleaved = data[1];
	if(interleaved %2 == 0){
		trackIdx = getTrackIndexByInterleaved(interleaved);
201 202 203 204 205 206
        if (trackIdx != -1) {
            handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
        }
	}else{
        trackIdx = getTrackIndexByInterleaved(interleaved - 1);
        if (trackIdx != -1) {
207
            onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4);
208
        }
xiongziliang committed
209 210 211
	}
}

212
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
213 214

}
xiongziliang committed
215 216 217 218
int64_t RtspSession::getContentLength(Parser &parser) {
	if(parser.Method() == "POST"){
		//http post请求的content数据部分是base64编码后的rtsp请求信令包
		return remainDataSize();
xiongziliang committed
219
	}
xiongziliang committed
220
	return RtspSplitter::getContentLength(parser);
xzl committed
221 222
}

xiongziliang committed
223

224
void RtspSession::handleReq_Options(const Parser &parser) {
225
	//支持这些命令
226
	sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
xzl committed
227 228
}

229
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
xiongziliang committed
230 231 232 233 234 235 236
	auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTSP_SCHEMA,
																	   _mediaInfo._vhost,
																	   _mediaInfo._app,
																	   _mediaInfo._streamid,
																	   false));
	if(src){
		sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
237 238 239 240 241 242
        string err = StrPrinter << "ANNOUNCE:"
                                << "Already publishing:"
                                << _mediaInfo._vhost << " "
                                << _mediaInfo._app << " "
                                << _mediaInfo._streamid << endl;
		throw SockException(Err_shutdown,err);
243
	}
xiongziliang committed
244

xiongziliang committed
245
	_strSession = makeRandStr(12);
xiongziliang committed
246
    _strSdp = parser.Content();
xiongziliang committed
247
    _aTrackInfo = SdpParser(_strSdp).getAvailableTrack();
xiongziliang committed
248

xiongziliang committed
249 250 251 252
	_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
	_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
	_pushSrc->onGetSDP(_strSdp);
	sendRtspResponse("200 OK");
253 254
}

255
void RtspSession::handleReq_RECORD(const Parser &parser){
xiongziliang committed
256
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
257
		send_SessionNotFound();
258
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
259 260 261 262 263
	}
	auto onRes = [this](const string &err){
		bool authSuccess = err.empty();
		if(!authSuccess){
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
264
			shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
265 266 267 268 269 270 271
			return;
		}

		_StrPrinter rtp_info;
		for(auto &track : _aTrackInfo){
			if (track->_inited == false) {
				//还有track没有setup
xiongziliang committed
272 273
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
274
			}
xiongziliang committed
275
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
276 277 278 279
		}

		rtp_info.pop_back();
		sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
280
		SockUtil::setNoDelay(_sock->rawFD(),false);
281 282
		if(_rtpType == Rtsp::RTP_TCP){
			//如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能
283
			_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
284
		}
285
		(*this) << SocketFlags(kSockFlags);
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
	};

	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
	Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
		auto strongSelf = weakSelf.lock();
		if(!strongSelf){
			return;
		}
		strongSelf->async([weakSelf,onRes,err](){
			auto strongSelf = weakSelf.lock();
			if(!strongSelf){
				return;
			}
			onRes(err);
		});
	};

	//rtsp推流需要鉴权
	auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
	if(!flag){
		//该事件无人监听,默认不鉴权
		onRes("");
	}
}

311
void RtspSession::handleReq_Describe(const Parser &parser) {
312 313
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    //该请求中的认证信息
314
    auto authorization = parser["Authorization"];
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
    onGetRealm invoker = [weakSelf,authorization](const string &realm){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
        }
        //切换到自己的线程然后执行
        strongSelf->async([weakSelf,realm,authorization](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            if(realm.empty()){
                //无需认证,回复sdp
                strongSelf->onAuthSuccess();
                return;
            }
            //该流需要认证
            strongSelf->onAuthUser(realm,authorization);
        });

    };

    //广播是否需要认证事件
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
                                           _mediaInfo,
                                           invoker,
                                           *this)){
        //无人监听此事件,说明无需认证
        invoker("");
    }
}
void RtspSession::onAuthSuccess() {
    TraceP(this);
350
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
351
    MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf](const MediaSource::Ptr &src){
352 353 354 355 356 357 358
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
        if (!rtsp_src) {
            //未找到相应的MediaSource
xiongziliang committed
359
            string err = StrPrinter << "no such stream:" <<  strongSelf->_mediaInfo._vhost << " " <<  strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
360
            strongSelf->send_StreamNotFound();
xiongziliang committed
361
            strongSelf->shutdown(SockException(Err_shutdown,err));
362 363 364 365
            return;
        }
        //找到了响应的rtsp流
        strongSelf->_strSdp = rtsp_src->getSdp();
xiongziliang committed
366 367
        SdpParser sdpParser(strongSelf->_strSdp);
        strongSelf->_aTrackInfo = sdpParser.getAvailableTrack();
368 369 370
        if (strongSelf->_aTrackInfo.empty()) {
            //该流无效
            strongSelf->send_StreamNotFound();
xiongziliang committed
371
            strongSelf->shutdown(SockException(Err_shutdown,"can not find any availabe track in sdp"));
372 373 374 375 376 377 378 379 380
            return;
        }
        strongSelf->_strSession = makeRandStr(12);
        strongSelf->_pMediaSrc = rtsp_src;
        for(auto &track : strongSelf->_aTrackInfo){
            track->_ssrc = rtsp_src->getSsrc(track->_type);
            track->_seq = rtsp_src->getSeqence(track->_type);
            track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
        }
xiongziliang committed
381

382 383 384 385 386
        strongSelf->sendRtspResponse("200 OK",
                                     {"Content-Base",strongSelf->_strContentBase + "/",
                                      "x-Accept-Retransmit","our-retransmit",
                                      "x-Accept-Dynamic-Rate","1"
                                     },strongSelf->_strSdp);
387
    });
xzl committed
388
}
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
    GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
    if (!authBasic) {
        //我们需要客户端优先以md5方式认证
        _strNonce = makeRandStr(32);
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" });
    }else {
        //当然我们也支持base64认证,但是我们不建议这样做
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Basic realm=\"" << realm << "\"" });
    }
    if(close){
        shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << why));
405 406 407
    }
}

408
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
409 410 411
    //base64认证
    char user_pwd_buf[512];
    av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
412
    auto user_pwd_vec = split(user_pwd_buf,":");
413 414
    if(user_pwd_vec.size() < 2){
        //认证信息格式不合法,回复401 Unauthorized
415
        onAuthFailed(realm,"can not find user and passwd when basic64 auth");
416 417 418 419
        return;
    }
    auto user = user_pwd_vec[0];
    auto pwd = user_pwd_vec[1];
420
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
421
    onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
422 423 424 425
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
426
        }
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
        //切换到自己的线程执行
        strongSelf->async([weakSelf,good_pwd,pwd,realm](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            //base64忽略encrypted参数,上层必须传入明文密码
            if(pwd == good_pwd){
                //提供的密码且匹配正确
                strongSelf->onAuthSuccess();
                return;
            }
            //密码错误
            strongSelf->onAuthFailed(realm,StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd);
        });
443
    };
444

445
    //此时必须提供明文密码
446
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,*this)){
447
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
448
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
449 450 451 452 453
        //但是我们还是忽略认证以便完成播放
        //我们输入的密码是明文
        invoker(false,pwd);
    }
}
454

455 456
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
	DebugP(this) << strMd5;
457 458 459
    auto mapTmp = Parser::parseArgs(strMd5,",","=");
    decltype(mapTmp) map;
    for(auto &pr : mapTmp){
460
        map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
461 462 463
    }
    //check realm
    if(realm != map["realm"]){
464
        onAuthFailed(realm,StrPrinter << "realm not mached:" << realm << " != " << map["realm"]);
465 466 467 468
        return ;
    }
    //check nonce
    auto nonce = map["nonce"];
469 470
    if(_strNonce != nonce){
        onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce);
471 472 473 474 475 476 477
        return ;
    }
    //check username and uri
    auto username = map["username"];
    auto uri = map["uri"];
    auto response = map["response"];
    if(username.empty() || uri.empty() || response.empty()){
478
        onAuthFailed(realm,StrPrinter << "username/uri/response empty:" << username << "," << uri << "," << response);
479 480 481
        return ;
    }

482
    auto realInvoker = [this,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
483 484
        if(ignoreAuth){
            //忽略认证
485 486
            TraceP(this) << "auth ignored";
            onAuthSuccess();
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
            return;
        }
        /*
        response计算方法如下:
        RTSP客户端应该使用username + password并计算response如下:
        (1)当password为MD5编码,则
            response = md5( password:nonce:md5(public_method:url)  );
        (2)当password为ANSI字符串,则
            response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
         */
        auto encrypted_pwd = good_pwd;
        if(!encrypted){
            //提供的是明文密码
            encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
        }

        auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
xiongziliang committed
504
        if(strcasecmp(good_response.data(),response.data()) == 0){
505
            //认证成功!md5不区分大小写
506
            onAuthSuccess();
507 508
        }else{
            //认证失败!
509
            onAuthFailed(realm, StrPrinter << "password mismatch when md5 auth:" << good_response << " != " << response );
510 511
        }
    };
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        //切换到自己的线程确保realInvoker执行时,this指针有效
        strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
            realInvoker(false,encrypted,good_pwd);
        });
527 528 529
    };

    //此时可以提供明文或md5加密的密码
530
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,*this)){
531
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
532
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
533 534 535 536 537
        //但是我们还是忽略认证以便完成播放
        realInvoker(true,true,"");
    }
}

538 539 540 541 542
void RtspSession::onAuthUser(const string &realm,const string &authorization){
    if(authorization.empty()){
        onAuthFailed(realm,"", false);
        return;
    }
543 544 545 546 547
    //请求中包含认证信息
    auto authType = FindField(authorization.data(),NULL," ");
	auto authStr = FindField(authorization.data()," ",NULL);
    if(authType.empty() || authStr.empty()){
        //认证信息格式不合法,回复401 Unauthorized
548
        onAuthFailed(realm,"can not find auth type or auth string");
549 550 551 552
        return;
    }
    if(authType == "Basic"){
        //base64认证,需要明文密码
553
        onAuthBasic(realm,authStr);
554 555
    }else if(authType == "Digest"){
        //md5认证
556
        onAuthDigest(realm,authStr);
557 558
    }else{
        //其他认证方式?不支持!
559
        onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
560 561
    }
}
xzl committed
562
inline void RtspSession::send_StreamNotFound() {
563
	sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
xzl committed
564 565
}
inline void RtspSession::send_UnsupportedTransport() {
566
	sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
xzl committed
567 568 569
}

inline void RtspSession::send_SessionNotFound() {
570
	sendRtspResponse("454 Session Not Found",{"Connection","Close"});
xzl committed
571
}
572 573

void RtspSession::handleReq_Setup(const Parser &parser) {
xzl committed
574
//处理setup命令,该函数可能进入多次
575
    auto controlSuffix = split(parser.Url(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
xiongziliang committed
576 577 578
    if(controlSuffix.front() == '/'){
		controlSuffix = controlSuffix.substr(1);
    }
579
	int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
xzl committed
580 581
	if (trackIdx == -1) {
		//未找到相应track
582 583
        throw SockException(Err_shutdown, StrPrinter << "can not find any track by control suffix:" << controlSuffix);
    }
xiongziliang committed
584
	SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
585
	if (trackRef->_inited) {
xzl committed
586
		//已经初始化过该Track
587
        throw SockException(Err_shutdown, "can not setup one track twice");
xzl committed
588
	}
589
	trackRef->_inited = true; //现在初始化
xzl committed
590

xiongziliang committed
591
	if(_rtpType == Rtsp::RTP_Invalid){
592
		auto &strTransport = parser["Transport"];
xzl committed
593
		if(strTransport.find("TCP") != string::npos){
xiongziliang committed
594
			_rtpType = Rtsp::RTP_TCP;
xzl committed
595
		}else if(strTransport.find("multicast") != string::npos){
xiongziliang committed
596
			_rtpType = Rtsp::RTP_MULTICAST;
xzl committed
597
		}else{
xiongziliang committed
598
			_rtpType = Rtsp::RTP_UDP;
xzl committed
599 600 601
		}
	}

602
    //允许接收rtp、rtcp包
xiongziliang committed
603
	RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
xiongziliang committed
604

605
	switch (_rtpType) {
xiongziliang committed
606
	case Rtsp::RTP_TCP: {
607 608 609 610 611 612 613 614 615 616 617 618 619
        if(_pushSrc){
            //rtsp推流时,interleaved由推流者决定
            auto key_values =  Parser::parseArgs(parser["Transport"],";","=");
            int interleaved_rtp = -1 , interleaved_rtcp = -1;
            if(2 == sscanf(key_values["interleaved"].data(),"%d-%d",&interleaved_rtp,&interleaved_rtcp)){
                trackRef->_interleaved = interleaved_rtp;
            }else{
                throw SockException(Err_shutdown, "can not find interleaved when setup of rtp over tcp");
            }
        }else{
            //rtsp播放时,由于数据共享分发,所以interleaved必须由服务器决定
            trackRef->_interleaved = 2 * trackRef->_type;
        }
620 621
		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
622
												 << "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";"
623
												 << "ssrc=" << printSSRC(trackRef->_ssrc),
624 625 626
						  "x-Transport-Options" , "late-tolerance=1.400000",
						  "x-Dynamic-Rate" , "1"
						 });
xzl committed
627 628
	}
		break;
xiongziliang committed
629
	case Rtsp::RTP_UDP: {
xiongziliang committed
630
		//我们用trackIdx区分rtp和rtcp包
631 632
		auto pSockRtp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) {
xiongziliang committed
633 634
			//分配端口失败
			send_NotAcceptable();
635 636
            throw SockException(Err_shutdown, "open rtp socket failed");
        }
637 638
		auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) {
xzl committed
639 640
			//分配端口失败
			send_NotAcceptable();
641 642
            throw SockException(Err_shutdown, "open rtcp socket failed");
        }
643 644
		_apRtpSock[trackIdx] = pSockRtp;
		_apRtcpSock[trackIdx] = pSockRtcp;
xiongziliang committed
645
		//设置客户端内网端口信息
xiongziliang committed
646
		string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
647 648 649 650 651
		uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
        uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data());

        struct sockaddr_in peerAddr;
        //设置rtp发送目标地址
xzl committed
652
		peerAddr.sin_family = AF_INET;
653
		peerAddr.sin_port = htons(ui16RtpPort);
654
		peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
xzl committed
655
		bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
656
		pSockRtp->setSendPeerAddr((struct sockaddr *)(&peerAddr));
657 658 659 660 661 662 663 664

		//设置rtcp发送目标地址
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtcpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
        pSockRtcp->setSendPeerAddr((struct sockaddr *)(&peerAddr));

xiongziliang committed
665
		//尝试获取客户端nat映射地址
666
		startListenPeerUdpData(trackIdx);
667
		//InfoP(this) << "分配端口:" << srv_port;
668 669 670 671 672

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;"
												 << "client_port=" << strClientPort << ";"
												 << "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";"
673
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
674
						 });
xzl committed
675 676
	}
		break;
xiongziliang committed
677
	case Rtsp::RTP_MULTICAST: {
678
		if(!_pBrdcaster){
679
			_pBrdcaster = RtpBroadCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
680
			if (!_pBrdcaster) {
xzl committed
681
				send_NotAcceptable();
682 683
                throw SockException(Err_shutdown, "can not get a available udp multicast socket");
            }
xzl committed
684
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
685
			_pBrdcaster->setDetachCB(this, [weakSelf]() {
xzl committed
686 687 688 689
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
690
				strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
xzl committed
691 692
			});
		}
693
		int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
xiongziliang committed
694
		//我们用trackIdx区分rtp和rtcp包
695
		//由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
696
		auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
xiongziliang committed
697 698 699
		if (!pSockRtcp) {
			//分配端口失败
			send_NotAcceptable();
700
            throw SockException(Err_shutdown, "open shared rtcp socket failed");
xiongziliang committed
701
		}
702
		startListenPeerUdpData(trackIdx);
703
        GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
704 705 706 707 708 709 710 711 712

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP;multicast;"
												 << "destination=" << _pBrdcaster->getIP() << ";"
												 << "source=" << get_local_ip() << ";"
												 << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
												 << "ttl=" << udpTTL << ";"
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
						 });
xzl committed
713 714 715 716 717 718 719
	}
		break;
	default:
		break;
	}
}

720
void RtspSession::handleReq_Play(const Parser &parser) {
xiongziliang committed
721
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
xzl committed
722
		send_SessionNotFound();
723 724
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when play" : "session not found when play");
    }
xiongziliang committed
725
	auto strRange = parser["Range"];
726
    auto onRes = [this,strRange](const string &err){
727
        bool authSuccess = err.empty();
xiongziliang committed
728
        if(!authSuccess){
729
            //第一次play是播放,否则是恢复播放。只对播放鉴权
730
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
731
            shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
732 733
            return;
        }
xzl committed
734

735
        auto pMediaSrc = _pMediaSrc.lock();
736 737
        if(!pMediaSrc){
        	send_StreamNotFound();
xiongziliang committed
738
        	shutdown(SockException(Err_shutdown,"rtsp stream released"));
739
			return;
740
        }
741

742 743 744
        bool useBuf = true;
		_enableSendRtp = false;

745
		if (strRange.size() && !_bFirstPlay) {
xiongziliang committed
746
            //这个是seek操作
747 748 749 750 751
			auto strStart = FindField(strRange.data(), "npt=", "-");
			if (strStart == "now") {
				strStart = "0";
			}
			auto iStartTime = 1000 * atof(strStart.data());
752
			InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
753
			useBuf = !pMediaSrc->seekTo(iStartTime);
754
		}else if(pMediaSrc->readerCount() == 0){
755 756 757 758
			//第一个消费者
			pMediaSrc->seekTo(0);
		}
		_bFirstPlay = false;
759

760
		_StrPrinter rtp_info;
xiongziliang committed
761
		for(auto &track : _aTrackInfo){
762
			if (track->_inited == false) {
xiongziliang committed
763
				//还有track没有setup
xiongziliang committed
764 765
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
xiongziliang committed
766
			}
767 768 769 770
			track->_ssrc = pMediaSrc->getSsrc(track->_type);
			track->_seq = pMediaSrc->getSeqence(track->_type);
			track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);

xiongziliang committed
771
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";"
772 773
					 << "seq=" << track->_seq << ";"
					 << "rtptime=" << (int)(track->_time_stamp * (track->_samplerate / 1000)) << ",";
xiongziliang committed
774 775
		}

776 777 778 779 780 781
		rtp_info.pop_back();

		sendRtspResponse("200 OK",
						 {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) <<  pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0,
						  "RTP-Info",rtp_info
						 });
782

783 784
		_enableSendRtp = true;

785
		//提高发送性能
786 787
		SockUtil::setNoDelay(_sock->rawFD(),false);
		(*this) << SocketFlags(kSockFlags);
xiongziliang committed
788

xiongziliang committed
789
		if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
790
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
791
			_pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf);
792 793 794 795 796
			_pRtpReader->setDetachCB([weakSelf]() {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
797 798
                strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached"));
            });
799 800 801 802 803
			_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
804 805 806
				if(strongSelf->_enableSendRtp) {
					strongSelf->sendRtpPacket(pack);
				}
807 808
			});
		}
809
    };
xzl committed
810

811
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
812
    Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
813 814 815
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
xzl committed
816
        }
817
        strongSelf->async([weakSelf,onRes,err](){
818 819 820 821
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
822
            onRes(err);
823 824
        });
    };
xiongziliang committed
825 826 827 828 829 830 831 832 833
    if(_bFirstPlay){
        //第一次收到play命令,需要鉴权
        auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
        if(!flag){
            //该事件无人监听,默认不鉴权
            onRes("");
        }
    }else{
        //后面是seek或恢复命令,不需要鉴权
834
        onRes("");
835
    }
xzl committed
836 837
}

838
void RtspSession::handleReq_Pause(const Parser &parser) {
xiongziliang committed
839
	if (parser["Session"] != _strSession) {
xzl committed
840
		send_SessionNotFound();
841 842
        throw SockException(Err_shutdown,"session not found when pause");
    }
843 844

	sendRtspResponse("200 OK");
845
	_enableSendRtp = false;
xzl committed
846 847
}

848
void RtspSession::handleReq_Teardown(const Parser &parser) {
849
	sendRtspResponse("200 OK");
850
    throw SockException(Err_shutdown,"rtsp player send teardown request");
xzl committed
851 852
}

853
void RtspSession::handleReq_Get(const Parser &parser) {
xiongziliang committed
854
	_http_x_sessioncookie = parser["x-sessioncookie"];
855
	sendRtspResponse("200 OK",
856
					 {"Cache-Control","no-store",
857 858 859 860
					  "Pragma","no-store",
					  "Content-Type","application/x-rtsp-tunnelled",
					 },"","HTTP/1.0");

861
	//注册http getter,以便http poster绑定
xzl committed
862
	lock_guard<recursive_mutex> lock(g_mtxGetter);
863
	g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
xzl committed
864 865
}

866
void RtspSession::handleReq_Post(const Parser &parser) {
xzl committed
867
	lock_guard<recursive_mutex> lock(g_mtxGetter);
xiongziliang committed
868
	string sessioncookie = parser["x-sessioncookie"];
869
	//Poster 找到 Getter
xzl committed
870 871
	auto it = g_mapGetter.find(sessioncookie);
	if (it == g_mapGetter.end()) {
872 873
        throw SockException(Err_shutdown,"can not find http getter by x-sessioncookie");
    }
874 875 876 877

	//Poster 找到Getter的SOCK
	auto httpGetterWeak = it->second;
	//移除http getter的弱引用记录
xzl committed
878
	g_mapGetter.erase(sessioncookie);
879 880 881 882 883

	//http poster收到请求后转发给http getter处理
	_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
		auto httpGetterStrong = httpGetterWeak.lock();
		if(!httpGetterStrong){
xiongziliang committed
884
			shutdown(SockException(Err_shutdown,"http getter released"));
885 886 887 888 889 890 891 892 893 894 895 896 897
			return;
		}

		//切换到http getter的线程
		httpGetterStrong->async([pBuf,httpGetterWeak](){
			auto httpGetterStrong = httpGetterWeak.lock();
			if(!httpGetterStrong){
				return;
			}
			httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
		});
	};

xiongziliang committed
898 899 900 901
	if(!parser.Content().empty()){
		//http poster后面的粘包
		_onRecv(std::make_shared<BufferString>(parser.Content()));
	}
902 903 904 905 906 907

    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");
xzl committed
908 909
}

910
void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
911
	//TraceP(this) <<endl;
912
	sendRtspResponse("200 OK");
xzl committed
913 914 915
}

inline void RtspSession::send_NotAcceptable() {
916
	sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
xzl committed
917
}
918

919

xiongziliang committed
920
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
xiongziliang committed
921
	_pushSrc->onWrite(rtppt, false);
xiongziliang committed
922
}
923
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
924 925 926
	//这是rtcp心跳包,说明播放器还存活
	_ticker.resetTime();

927
	if(intervaled % 2 == 0){
xiongziliang committed
928
		if(_pushSrc){
929
		    //这是rtsp推流上来的rtp包
930
			handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size());
931 932
		}else if(!_udpSockConnected.count(intervaled)){
            //这是rtsp播放器的rtp打洞包
933 934
            _udpSockConnected.emplace(intervaled);
            _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
xzl committed
935
		}
936 937
	}else{
	    //rtcp包
938 939
        if(!_udpSockConnected.count(intervaled)){
            _udpSockConnected.emplace(intervaled);
940 941
            _apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr);
        }
942
        onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size());
943
    }
xzl committed
944 945 946
}


947
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
xzl committed
948
	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
949 950
    auto srcIP = inet_addr(get_peer_ip().data());
	auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
951 952 953 954
		auto strongSelf=weakSelf.lock();
		if(!strongSelf) {
			return false;
		}
955 956 957 958 959 960 961

        if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
            WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
                                    << inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
            return true;
        }

962
		struct sockaddr addr=*pPeerAddr;
963
		strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
964 965 966 967
			auto strongSelf=weakSelf.lock();
			if(!strongSelf) {
				return;
			}
968
			strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
969 970 971 972 973
		});
		return true;
	};

	switch (_rtpType){
xiongziliang committed
974
		case Rtsp::RTP_MULTICAST:{
975 976
			//组播使用的共享rtcp端口
			UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
977 978
					int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
				return onUdpData(pBuf,pPeerAddr,intervaled);
xzl committed
979
			});
980 981
		}
			break;
xiongziliang committed
982
		case Rtsp::RTP_UDP:{
983
			auto setEvent = [&](Socket::Ptr &sock,int intervaled){
984
				if(!sock){
985
					WarnP(this) << "udp端口为空:" << intervaled;
986 987
					return;
				}
988
				sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
989
					onUdpData(pBuf,pPeerAddr,intervaled);
990 991 992 993 994 995 996 997 998 999 1000
				});
			};
			setEvent(_apRtpSock[trackIdx], 2*trackIdx );
			setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
		}
			break;

		default:
			break;
	}

xzl committed
1001 1002
}

1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
static string dateStr(){
	char buf[64];
	time_t tt = time(NULL);
	strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
	return buf;
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const StrCaseMap &header_const,
								   const string &sdp,
								   const char *protocol){
	auto header = header_const;
	header.emplace("CSeq",StrPrinter << _iCseq);
	if(!_strSession.empty()){
		header.emplace("Session",_strSession);
	}

	header.emplace("Server",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
	header.emplace("Date",dateStr());

	if(!sdp.empty()){
		header.emplace("Content-Length",StrPrinter << sdp.size());
		header.emplace("Content-Type","application/sdp");
	}

	_StrPrinter printer;
	printer << protocol << " " << res_code << "\r\n";
	for (auto &pr : header){
		printer << pr.first << ": " << pr.second << "\r\n";
	}

	printer << "\r\n";

	if(!sdp.empty()){
		printer << sdp;
	}
1039
//	DebugP(this) << printer;
1040 1041 1042 1043
	return send(std::make_shared<BufferString>(printer)) > 0 ;
}

int RtspSession::send(const Buffer::Ptr &pkt){
xiongziliang committed
1044
//	if(!_enableSendRtp){
1045
//		DebugP(this) << pkt->data();
xiongziliang committed
1046
//	}
1047
	_ui64TotalBytes += pkt->size();
1048
	return TcpSession::send(pkt);
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const std::initializer_list<string> &header,
								   const string &sdp,
								   const char *protocol) {
	string key;
	StrCaseMap header_map;
	int i = 0;
	for(auto &val : header){
		if(++i % 2 == 0){
			header_map.emplace(key,val);
		}else{
			key = val;
		}
	}
	return sendRtspResponse(res_code,header_map,sdp,protocol);
}

inline string RtspSession::printSSRC(uint32_t ui32Ssrc) {
	char tmp[9] = { 0 };
	ui32Ssrc = htonl(ui32Ssrc);
	uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
	for (int i = 0; i < 4; i++) {
		sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
	}
	return tmp;
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (type == _aTrackInfo[i]->_type) {
			return i;
		}
	}
	return -1;
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
			return i;
		}
	}
xiongziliang committed
1091 1092 1093
	if(_aTrackInfo.size() == 1){
        return 0;
	}
1094 1095 1096
	return -1;
}

xiongziliang committed
1097 1098 1099 1100 1101 1102 1103 1104 1105
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (_aTrackInfo[i]->_interleaved == interleaved) {
			return i;
		}
	}
	return -1;
}

xiongziliang committed
1106
bool RtspSession::close(MediaSource &sender,bool force) {
1107 1108
    //此回调在其他线程触发
    if(!_pushSrc || (!force && _pushSrc->readerCount() != 0)){
1109 1110
        return false;
    }
xiongziliang committed
1111 1112
	string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
	safeShutdown(SockException(Err_shutdown,err));
1113 1114
	return true;
}
1115

1116

1117 1118 1119 1120 1121 1122 1123 1124 1125
void RtspSession::onNoneReader(MediaSource &sender){
    //此回调在其他线程触发
    if(!_pushSrc || _pushSrc->readerCount() != 0){
        return;
    }
    MediaSourceEvent::onNoneReader(sender);
}


xiongziliang committed
1126
void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
1127
    //InfoP(this) <<(int)pkt.Interleaved;
1128 1129
    switch (_rtpType) {
        case Rtsp::RTP_TCP: {
1130
            send(pkt);
1131 1132 1133 1134 1135 1136
        }
            break;
        case Rtsp::RTP_UDP: {
            int iTrackIndex = getTrackIndexByTrackType(pkt->type);
            auto &pSock = _apRtpSock[iTrackIndex];
            if (!pSock) {
xiongziliang committed
1137
                shutdown(SockException(Err_shutdown,"udp sock not opened yet"));
1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
                return;
            }
            BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
            _ui64TotalBytes += buffer->size();
            pSock->send(buffer);
        }
            break;
        default:
            break;
    }

1149
#if RTSP_SERVER_SEND_RTCP
1150
    int iTrackIndex = getTrackIndexByTrackType(pkt->type);
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164
    if(iTrackIndex == -1){
        return;
    }
    RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
    counter.pktCnt += 1;
    counter.octCount += (pkt->length - pkt->offset);
    auto &ticker = _aRtcpTicker[iTrackIndex];
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        ticker.resetTime();
        //直接保存网络字节序
        memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
        sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
    }
1165
#endif
1166 1167
}

xiongziliang committed
1168
void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
xiongziliang committed
1169 1170
    static const char s_cname[] = "ZLMediaKitRtsp";
    uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
1171 1172 1173 1174 1175 1176
    uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
    auto &track = _aTrackInfo[iTrackIndex];
    auto &counter = _aRtcpCnt[iTrackIndex];

    aui8Rtcp[0] = '$';
    aui8Rtcp[1] = track->_interleaved + 1;
xiongziliang committed
1177 1178
    aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8;
    aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217

    pui8Rtcp_SR[0] = 0x80;
    pui8Rtcp_SR[1] = 0xC8;
    pui8Rtcp_SR[2] = 0x00;
    pui8Rtcp_SR[3] = 0x06;

    uint32_t ssrc=htonl(track->_ssrc);
    memcpy(&pui8Rtcp_SR[4], &ssrc, 4);

    uint64_t msw;
    uint64_t lsw;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
    lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);

    msw = htonl(msw);
    memcpy(&pui8Rtcp_SR[8], &msw, 4);

    lsw = htonl(lsw);
    memcpy(&pui8Rtcp_SR[12], &lsw, 4);
    //直接使用网络字节序
    memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4);

    uint32_t pktCnt = htonl(counter.pktCnt);
    memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);

    uint32_t octCount = htonl(counter.octCount);
    memcpy(&pui8Rtcp_SR[24], &octCount, 4);

    pui8Rtcp_SDES[0] = 0x81;
    pui8Rtcp_SDES[1] = 0xCA;
    pui8Rtcp_SDES[2] = 0x00;
    pui8Rtcp_SDES[3] = 0x06;

    memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

    pui8Rtcp_SDES[8] = 0x01;
    pui8Rtcp_SDES[9] = 0x0f;
xiongziliang committed
1218 1219
    memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
    pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
1220 1221 1222 1223 1224 1225

    if(overTcp){
        send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
    }else {
        _apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
    }
xzl committed
1226 1227 1228
}

}
xiongziliang committed
1229
/* namespace mediakit */
xzl committed
1230