RtspSession.cpp 42 KB
Newer Older
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4
 * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
xiongziliang committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25
 */
26

xiongzilaing committed
27
#include <atomic>
28
#include <iomanip>
xiongziliang committed
29
#include "Common/config.h"
xiongzilaing committed
30
#include "UDPServer.h"
xzl committed
31
#include "RtspSession.h"
xiongzilaing committed
32
#include "Util/mini.h"
33
#include "Util/MD5.h"
xiongziliang committed
34
#include "Util/base64.h"
xiongzilaing committed
35
#include "Util/onceToken.h"
xzl committed
36 37
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongzilaing committed
38
#include "Network/sockutil.h"
xzl committed
39

40 41
#define RTSP_SERVER_SEND_RTCP 0

xiongziliang committed
42 43
using namespace std;
using namespace toolkit;
xzl committed
44

xiongziliang committed
45
namespace mediakit {
xzl committed
46

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
/**
 * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
 * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
 * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输
 * 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输
 * 4: rtp over http,下面着重讲解:rtp over http
 *
 * rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的,
 * 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求,
 * 第二次是http post请求。
 *
 * 这两次请求通过http请求头中的x-sessioncookie键完成绑定
 *
 * 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求
 * 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送
 * 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包,
 * 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接
 * 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据
 *
 * zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理
 */


//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
xiongziliang committed
74 75
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;

xiongziliang committed
76
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
xiongziliang committed
77
    DebugP(this);
78 79
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
    pSock->setSendTimeOutSecond(keep_alive_sec);
xiongziliang committed
80 81
    //起始接收buffer缓存设置为4K,节省内存
    pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
xzl committed
82 83 84
}

RtspSession::~RtspSession() {
85
    DebugP(this);
xzl committed
86 87 88
}

void RtspSession::onError(const SockException& err) {
xiongziliang committed
89
	WarnP(this) << err.what();
xiongziliang committed
90
	if (_rtpType == Rtsp::RTP_MULTICAST) {
xiongziliang committed
91
		//取消UDP端口监听
92
		UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
xzl committed
93 94
	}

95 96 97 98
	if (_http_x_sessioncookie.size() != 0) {
		//移除http getter的弱引用记录
		lock_guard<recursive_mutex> lock(g_mtxGetter);
		g_mapGetter.erase(_http_x_sessioncookie);
xzl committed
99
	}
100 101

    //流量统计事件广播
102
    GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
103
    if(_ui64TotalBytes > iFlowThreshold * 1024){
104
        bool isPlayer = !_pushSrc;
105
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
106 107 108
										   _mediaInfo,
										   _ui64TotalBytes,
										   _ticker.createdTime()/1000,
109
                                           isPlayer,
110
										   *this);
111
    }
112

xzl committed
113 114 115
}

void RtspSession::onManager() {
116 117 118 119
    GET_CONFIG(uint32_t,handshake_sec,Rtsp::kKeepAliveSecond);
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);

    if (_ticker.createdTime() > handshake_sec * 1000) {
120
		if (_strSession.size() == 0) {
xiongziliang committed
121
			shutdown(SockException(Err_timeout,"illegal connection"));
xzl committed
122 123 124
			return;
		}
	}
125

126

127
	if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
128
		//如果是推流端或者rtp over udp类型的播放端,那么就做超时检测
xiongziliang committed
129 130
        shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
        return;
xzl committed
131 132 133
	}
}

xiongziliang committed
134 135 136 137 138 139 140
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
	_ticker.resetTime();
    _ui64TotalBytes += pBuf->size();
    if (_onRecv) {
		//http poster的请求数据转发给http getter处理
		_onRecv(pBuf);
	} else {
141
//    	TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
xiongziliang committed
142 143 144
		input(pBuf->data(),pBuf->size());
	}
}
xiongziliang committed
145

xiongziliang committed
146 147 148
void RtspSession::onWholeRtspPacket(Parser &parser) {
	string strCmd = parser.Method(); //提取出请求命令字
	_iCseq = atoi(parser["CSeq"].data());
149
	if(_strContentBase.empty() && strCmd != "GET"){
xiongziliang committed
150 151
		_strContentBase = parser.Url();
		_mediaInfo.parse(parser.FullUrl());
152 153
        _mediaInfo._schema = RTSP_SCHEMA;
    }
xzl committed
154

155
	typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
156
	static unordered_map<string, rtsp_request_handler> s_handler_map;
xiongziliang committed
157
	static onceToken token( []() {
158 159 160
		s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options);
		s_handler_map.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
		s_handler_map.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
xiongziliang committed
161
		s_handler_map.emplace("RECORD",&RtspSession::handleReq_RECORD);
162 163 164 165 166 167 168 169
		s_handler_map.emplace("SETUP",&RtspSession::handleReq_Setup);
		s_handler_map.emplace("PLAY",&RtspSession::handleReq_Play);
		s_handler_map.emplace("PAUSE",&RtspSession::handleReq_Pause);
		s_handler_map.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
		s_handler_map.emplace("GET",&RtspSession::handleReq_Get);
		s_handler_map.emplace("POST",&RtspSession::handleReq_Post);
		s_handler_map.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
		s_handler_map.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
xiongziliang committed
170 171
	}, []() {});

172
	auto it = s_handler_map.find(strCmd);
173 174
	if (it == s_handler_map.end()) {
        sendRtspResponse("403 Forbidden");
xiongziliang committed
175
        shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
176
        return;
xzl committed
177
	}
178 179 180 181 182 183 184 185 186 187 188 189

    auto &fun = it->second;
    try {
        (this->*fun)(parser);
    }catch (SockException &ex){
        if(ex){
            shutdown(ex);
        }
    }catch (exception &ex){
        shutdown(SockException(Err_shutdown,ex.what()));
    }
    parser.Clear();
xiongziliang committed
190 191
}

xiongziliang committed
192
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
xiongziliang committed
193 194 195
	if(!_pushSrc){
		return;
	}
196

xiongziliang committed
197 198 199 200
	int trackIdx = -1;
	uint8_t interleaved = data[1];
	if(interleaved %2 == 0){
		trackIdx = getTrackIndexByInterleaved(interleaved);
201 202 203 204 205 206
        if (trackIdx != -1) {
            handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
        }
	}else{
        trackIdx = getTrackIndexByInterleaved(interleaved - 1);
        if (trackIdx != -1) {
207
            onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4);
208
        }
xiongziliang committed
209 210 211
	}
}

212
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
213 214

}
xiongziliang committed
215 216 217 218
int64_t RtspSession::getContentLength(Parser &parser) {
	if(parser.Method() == "POST"){
		//http post请求的content数据部分是base64编码后的rtsp请求信令包
		return remainDataSize();
xiongziliang committed
219
	}
xiongziliang committed
220
	return RtspSplitter::getContentLength(parser);
xzl committed
221 222
}

xiongziliang committed
223

224
void RtspSession::handleReq_Options(const Parser &parser) {
225
	//支持这些命令
226
	sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
xzl committed
227 228
}

229
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
xiongziliang committed
230 231 232 233 234 235 236
	auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTSP_SCHEMA,
																	   _mediaInfo._vhost,
																	   _mediaInfo._app,
																	   _mediaInfo._streamid,
																	   false));
	if(src){
		sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
237 238 239 240 241 242
        string err = StrPrinter << "ANNOUNCE:"
                                << "Already publishing:"
                                << _mediaInfo._vhost << " "
                                << _mediaInfo._app << " "
                                << _mediaInfo._streamid << endl;
		throw SockException(Err_shutdown,err);
243
	}
xiongziliang committed
244

xiongziliang committed
245
	_strSession = makeRandStr(12);
xiongziliang committed
246 247
    _strSdp = parser.Content();
    _aTrackInfo = SdpAttr(_strSdp).getAvailableTrack();
xiongziliang committed
248

xiongziliang committed
249 250 251 252
	_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
	_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
	_pushSrc->onGetSDP(_strSdp);
	sendRtspResponse("200 OK");
253 254
}

255
void RtspSession::handleReq_RECORD(const Parser &parser){
xiongziliang committed
256
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
257
		send_SessionNotFound();
258
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
259 260 261 262 263
	}
	auto onRes = [this](const string &err){
		bool authSuccess = err.empty();
		if(!authSuccess){
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
264
			shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
265 266 267 268 269 270 271
			return;
		}

		_StrPrinter rtp_info;
		for(auto &track : _aTrackInfo){
			if (track->_inited == false) {
				//还有track没有setup
xiongziliang committed
272 273
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
274
			}
xiongziliang committed
275
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
276 277 278 279
		}

		rtp_info.pop_back();
		sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
280
		SockUtil::setNoDelay(_sock->rawFD(),false);
281 282
		if(_rtpType == Rtsp::RTP_TCP){
			//如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能
283
			_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
284
		}
285
		(*this) << SocketFlags(kSockFlags);
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
	};

	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
	Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
		auto strongSelf = weakSelf.lock();
		if(!strongSelf){
			return;
		}
		strongSelf->async([weakSelf,onRes,err](){
			auto strongSelf = weakSelf.lock();
			if(!strongSelf){
				return;
			}
			onRes(err);
		});
	};

	//rtsp推流需要鉴权
	auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
	if(!flag){
		//该事件无人监听,默认不鉴权
		onRes("");
	}
}

311
void RtspSession::handleReq_Describe(const Parser &parser) {
312 313
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    //该请求中的认证信息
314
    auto authorization = parser["Authorization"];
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
    onGetRealm invoker = [weakSelf,authorization](const string &realm){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
        }
        //切换到自己的线程然后执行
        strongSelf->async([weakSelf,realm,authorization](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            if(realm.empty()){
                //无需认证,回复sdp
                strongSelf->onAuthSuccess();
                return;
            }
            //该流需要认证
            strongSelf->onAuthUser(realm,authorization);
        });

    };

    //广播是否需要认证事件
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
                                           _mediaInfo,
                                           invoker,
                                           *this)){
        //无人监听此事件,说明无需认证
        invoker("");
    }
}
void RtspSession::onAuthSuccess() {
    TraceP(this);
350
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
351
    MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf](const MediaSource::Ptr &src){
352 353 354 355 356 357 358
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
        if (!rtsp_src) {
            //未找到相应的MediaSource
xiongziliang committed
359
            string err = StrPrinter << "no such stream:" <<  strongSelf->_mediaInfo._vhost << " " <<  strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
360
            strongSelf->send_StreamNotFound();
xiongziliang committed
361
            strongSelf->shutdown(SockException(Err_shutdown,err));
362 363 364 365 366 367 368 369 370
            return;
        }
        //找到了响应的rtsp流
        strongSelf->_strSdp = rtsp_src->getSdp();
        SdpAttr sdpAttr(strongSelf->_strSdp);
        strongSelf->_aTrackInfo = sdpAttr.getAvailableTrack();
        if (strongSelf->_aTrackInfo.empty()) {
            //该流无效
            strongSelf->send_StreamNotFound();
xiongziliang committed
371
            strongSelf->shutdown(SockException(Err_shutdown,"can not find any availabe track in sdp"));
372 373 374 375 376 377 378 379 380
            return;
        }
        strongSelf->_strSession = makeRandStr(12);
        strongSelf->_pMediaSrc = rtsp_src;
        for(auto &track : strongSelf->_aTrackInfo){
            track->_ssrc = rtsp_src->getSsrc(track->_type);
            track->_seq = rtsp_src->getSeqence(track->_type);
            track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
        }
xiongziliang committed
381

382 383 384 385 386
        strongSelf->sendRtspResponse("200 OK",
                                     {"Content-Base",strongSelf->_strContentBase + "/",
                                      "x-Accept-Retransmit","our-retransmit",
                                      "x-Accept-Dynamic-Rate","1"
                                     },strongSelf->_strSdp);
387
    });
xzl committed
388
}
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
    GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
    if (!authBasic) {
        //我们需要客户端优先以md5方式认证
        _strNonce = makeRandStr(32);
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" });
    }else {
        //当然我们也支持base64认证,但是我们不建议这样做
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Basic realm=\"" << realm << "\"" });
    }
    if(close){
        shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << why));
405 406 407
    }
}

408
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
409 410 411
    //base64认证
    char user_pwd_buf[512];
    av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
412
    auto user_pwd_vec = split(user_pwd_buf,":");
413 414
    if(user_pwd_vec.size() < 2){
        //认证信息格式不合法,回复401 Unauthorized
415
        onAuthFailed(realm,"can not find user and passwd when basic64 auth");
416 417 418 419
        return;
    }
    auto user = user_pwd_vec[0];
    auto pwd = user_pwd_vec[1];
420
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
421
    onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
422 423 424 425
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
426
        }
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
        //切换到自己的线程执行
        strongSelf->async([weakSelf,good_pwd,pwd,realm](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            //base64忽略encrypted参数,上层必须传入明文密码
            if(pwd == good_pwd){
                //提供的密码且匹配正确
                strongSelf->onAuthSuccess();
                return;
            }
            //密码错误
            strongSelf->onAuthFailed(realm,StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd);
        });
443
    };
444

445
    //此时必须提供明文密码
446
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,*this)){
447
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
448
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
449 450 451 452 453
        //但是我们还是忽略认证以便完成播放
        //我们输入的密码是明文
        invoker(false,pwd);
    }
}
454

455 456
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
	DebugP(this) << strMd5;
457 458 459
    auto mapTmp = Parser::parseArgs(strMd5,",","=");
    decltype(mapTmp) map;
    for(auto &pr : mapTmp){
460
        map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
461 462 463
    }
    //check realm
    if(realm != map["realm"]){
464
        onAuthFailed(realm,StrPrinter << "realm not mached:" << realm << " != " << map["realm"]);
465 466 467 468
        return ;
    }
    //check nonce
    auto nonce = map["nonce"];
469 470
    if(_strNonce != nonce){
        onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce);
471 472 473 474 475 476 477
        return ;
    }
    //check username and uri
    auto username = map["username"];
    auto uri = map["uri"];
    auto response = map["response"];
    if(username.empty() || uri.empty() || response.empty()){
478
        onAuthFailed(realm,StrPrinter << "username/uri/response empty:" << username << "," << uri << "," << response);
479 480 481
        return ;
    }

482
    auto realInvoker = [this,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
483 484
        if(ignoreAuth){
            //忽略认证
485 486
            TraceP(this) << "auth ignored";
            onAuthSuccess();
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
            return;
        }
        /*
        response计算方法如下:
        RTSP客户端应该使用username + password并计算response如下:
        (1)当password为MD5编码,则
            response = md5( password:nonce:md5(public_method:url)  );
        (2)当password为ANSI字符串,则
            response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
         */
        auto encrypted_pwd = good_pwd;
        if(!encrypted){
            //提供的是明文密码
            encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
        }

        auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
xiongziliang committed
504
        if(strcasecmp(good_response.data(),response.data()) == 0){
505
            //认证成功!md5不区分大小写
506
            onAuthSuccess();
507 508
        }else{
            //认证失败!
509
            onAuthFailed(realm, StrPrinter << "password mismatch when md5 auth:" << good_response << " != " << response );
510 511
        }
    };
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        //切换到自己的线程确保realInvoker执行时,this指针有效
        strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
            realInvoker(false,encrypted,good_pwd);
        });
527 528 529
    };

    //此时可以提供明文或md5加密的密码
530
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,*this)){
531
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
532
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
533 534 535 536 537
        //但是我们还是忽略认证以便完成播放
        realInvoker(true,true,"");
    }
}

538 539 540 541 542
void RtspSession::onAuthUser(const string &realm,const string &authorization){
    if(authorization.empty()){
        onAuthFailed(realm,"", false);
        return;
    }
543 544 545 546 547
    //请求中包含认证信息
    auto authType = FindField(authorization.data(),NULL," ");
	auto authStr = FindField(authorization.data()," ",NULL);
    if(authType.empty() || authStr.empty()){
        //认证信息格式不合法,回复401 Unauthorized
548
        onAuthFailed(realm,"can not find auth type or auth string");
549 550 551 552
        return;
    }
    if(authType == "Basic"){
        //base64认证,需要明文密码
553
        onAuthBasic(realm,authStr);
554 555
    }else if(authType == "Digest"){
        //md5认证
556
        onAuthDigest(realm,authStr);
557 558
    }else{
        //其他认证方式?不支持!
559
        onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
560 561
    }
}
xzl committed
562
inline void RtspSession::send_StreamNotFound() {
563
	sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
xzl committed
564 565
}
inline void RtspSession::send_UnsupportedTransport() {
566
	sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
xzl committed
567 568 569
}

inline void RtspSession::send_SessionNotFound() {
570
	sendRtspResponse("454 Session Not Found",{"Connection","Close"});
xzl committed
571
}
572 573

void RtspSession::handleReq_Setup(const Parser &parser) {
xzl committed
574
//处理setup命令,该函数可能进入多次
575
    auto controlSuffix = split(parser.Url(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
xiongziliang committed
576 577 578
    if(controlSuffix.front() == '/'){
		controlSuffix = controlSuffix.substr(1);
    }
579
	int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
xzl committed
580 581
	if (trackIdx == -1) {
		//未找到相应track
582 583
        throw SockException(Err_shutdown, StrPrinter << "can not find any track by control suffix:" << controlSuffix);
    }
xiongziliang committed
584
	SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
585
	if (trackRef->_inited) {
xzl committed
586
		//已经初始化过该Track
587
        throw SockException(Err_shutdown, "can not setup one track twice");
xzl committed
588
	}
589
	trackRef->_inited = true; //现在初始化
590
	trackRef->_interleaved = trackRef->_type * 2;
xzl committed
591

xiongziliang committed
592
	if(_rtpType == Rtsp::RTP_Invalid){
xiongziliang committed
593
		auto strTransport = parser["Transport"];
xzl committed
594
		if(strTransport.find("TCP") != string::npos){
xiongziliang committed
595
			_rtpType = Rtsp::RTP_TCP;
xzl committed
596
		}else if(strTransport.find("multicast") != string::npos){
xiongziliang committed
597
			_rtpType = Rtsp::RTP_MULTICAST;
xzl committed
598
		}else{
xiongziliang committed
599
			_rtpType = Rtsp::RTP_UDP;
xzl committed
600 601 602
		}
	}

xiongziliang committed
603
	//允许接收rtp、rtcp包
xiongziliang committed
604
	RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
xiongziliang committed
605

606
	switch (_rtpType) {
xiongziliang committed
607
	case Rtsp::RTP_TCP: {
608 609 610
		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
												 << "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";"
611
												 << "ssrc=" << printSSRC(trackRef->_ssrc),
612 613 614
						  "x-Transport-Options" , "late-tolerance=1.400000",
						  "x-Dynamic-Rate" , "1"
						 });
xzl committed
615 616
	}
		break;
xiongziliang committed
617
	case Rtsp::RTP_UDP: {
xiongziliang committed
618
		//我们用trackIdx区分rtp和rtcp包
619 620
		auto pSockRtp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) {
xiongziliang committed
621 622
			//分配端口失败
			send_NotAcceptable();
623 624
            throw SockException(Err_shutdown, "open rtp socket failed");
        }
625 626
		auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) {
xzl committed
627 628
			//分配端口失败
			send_NotAcceptable();
629 630
            throw SockException(Err_shutdown, "open rtcp socket failed");
        }
631 632
		_apRtpSock[trackIdx] = pSockRtp;
		_apRtcpSock[trackIdx] = pSockRtcp;
xiongziliang committed
633
		//设置客户端内网端口信息
xiongziliang committed
634
		string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
635 636 637 638 639
		uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
        uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data());

        struct sockaddr_in peerAddr;
        //设置rtp发送目标地址
xzl committed
640
		peerAddr.sin_family = AF_INET;
641
		peerAddr.sin_port = htons(ui16RtpPort);
642
		peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
xzl committed
643
		bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
644
		pSockRtp->setSendPeerAddr((struct sockaddr *)(&peerAddr));
645 646 647 648 649 650 651 652

		//设置rtcp发送目标地址
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtcpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
        pSockRtcp->setSendPeerAddr((struct sockaddr *)(&peerAddr));

xiongziliang committed
653
		//尝试获取客户端nat映射地址
654
		startListenPeerUdpData(trackIdx);
655
		//InfoP(this) << "分配端口:" << srv_port;
656 657 658 659 660

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;"
												 << "client_port=" << strClientPort << ";"
												 << "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";"
661
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
662
						 });
xzl committed
663 664
	}
		break;
xiongziliang committed
665
	case Rtsp::RTP_MULTICAST: {
666
		if(!_pBrdcaster){
667
			_pBrdcaster = RtpBroadCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
668
			if (!_pBrdcaster) {
xzl committed
669
				send_NotAcceptable();
670 671
                throw SockException(Err_shutdown, "can not get a available udp multicast socket");
            }
xzl committed
672
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
673
			_pBrdcaster->setDetachCB(this, [weakSelf]() {
xzl committed
674 675 676 677
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
678
				strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
xzl committed
679 680
			});
		}
681
		int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
xiongziliang committed
682
		//我们用trackIdx区分rtp和rtcp包
683
		//由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
684
		auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
xiongziliang committed
685 686 687
		if (!pSockRtcp) {
			//分配端口失败
			send_NotAcceptable();
688
            throw SockException(Err_shutdown, "open shared rtcp socket failed");
xiongziliang committed
689
		}
690
		startListenPeerUdpData(trackIdx);
691
        GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
692 693 694 695 696 697 698 699 700

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP;multicast;"
												 << "destination=" << _pBrdcaster->getIP() << ";"
												 << "source=" << get_local_ip() << ";"
												 << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
												 << "ttl=" << udpTTL << ";"
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
						 });
xzl committed
701 702 703 704 705 706 707
	}
		break;
	default:
		break;
	}
}

708
void RtspSession::handleReq_Play(const Parser &parser) {
xiongziliang committed
709
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
xzl committed
710
		send_SessionNotFound();
711 712
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when play" : "session not found when play");
    }
xiongziliang committed
713
	auto strRange = parser["Range"];
714
    auto onRes = [this,strRange](const string &err){
715
        bool authSuccess = err.empty();
xiongziliang committed
716
        if(!authSuccess){
717
            //第一次play是播放,否则是恢复播放。只对播放鉴权
718
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
719
            shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
720 721
            return;
        }
xzl committed
722

723
        auto pMediaSrc = _pMediaSrc.lock();
724 725
        if(!pMediaSrc){
        	send_StreamNotFound();
xiongziliang committed
726
        	shutdown(SockException(Err_shutdown,"rtsp stream released"));
727
			return;
728
        }
729

730 731 732
        bool useBuf = true;
		_enableSendRtp = false;

733
		if (strRange.size() && !_bFirstPlay) {
xiongziliang committed
734
            //这个是seek操作
735 736 737 738 739
			auto strStart = FindField(strRange.data(), "npt=", "-");
			if (strStart == "now") {
				strStart = "0";
			}
			auto iStartTime = 1000 * atof(strStart.data());
740
			InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
741
			useBuf = !pMediaSrc->seekTo(iStartTime);
742
		}else if(pMediaSrc->readerCount() == 0){
743 744 745 746
			//第一个消费者
			pMediaSrc->seekTo(0);
		}
		_bFirstPlay = false;
747

748
		_StrPrinter rtp_info;
xiongziliang committed
749
		for(auto &track : _aTrackInfo){
750
			if (track->_inited == false) {
xiongziliang committed
751
				//还有track没有setup
xiongziliang committed
752 753
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
xiongziliang committed
754
			}
755 756 757 758
			track->_ssrc = pMediaSrc->getSsrc(track->_type);
			track->_seq = pMediaSrc->getSeqence(track->_type);
			track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);

xiongziliang committed
759
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";"
760 761
					 << "seq=" << track->_seq << ";"
					 << "rtptime=" << (int)(track->_time_stamp * (track->_samplerate / 1000)) << ",";
xiongziliang committed
762 763
		}

764 765 766 767 768 769
		rtp_info.pop_back();

		sendRtspResponse("200 OK",
						 {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) <<  pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0,
						  "RTP-Info",rtp_info
						 });
770

771 772
		_enableSendRtp = true;

773
		//提高发送性能
774 775
		SockUtil::setNoDelay(_sock->rawFD(),false);
		(*this) << SocketFlags(kSockFlags);
xiongziliang committed
776

xiongziliang committed
777
		if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
778
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
779
			_pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf);
780 781 782 783 784
			_pRtpReader->setDetachCB([weakSelf]() {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
785 786
                strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached"));
            });
787 788 789 790 791
			_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
792 793 794
				if(strongSelf->_enableSendRtp) {
					strongSelf->sendRtpPacket(pack);
				}
795 796
			});
		}
797
    };
xzl committed
798

799
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
800
    Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
801 802 803
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
xzl committed
804
        }
805
        strongSelf->async([weakSelf,onRes,err](){
806 807 808 809
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
810
            onRes(err);
811 812
        });
    };
xiongziliang committed
813 814 815 816 817 818 819 820 821
    if(_bFirstPlay){
        //第一次收到play命令,需要鉴权
        auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
        if(!flag){
            //该事件无人监听,默认不鉴权
            onRes("");
        }
    }else{
        //后面是seek或恢复命令,不需要鉴权
822
        onRes("");
823
    }
xzl committed
824 825
}

826
void RtspSession::handleReq_Pause(const Parser &parser) {
xiongziliang committed
827
	if (parser["Session"] != _strSession) {
xzl committed
828
		send_SessionNotFound();
829 830
        throw SockException(Err_shutdown,"session not found when pause");
    }
831 832

	sendRtspResponse("200 OK");
833
	_enableSendRtp = false;
xzl committed
834 835
}

836
void RtspSession::handleReq_Teardown(const Parser &parser) {
837
	sendRtspResponse("200 OK");
838
    throw SockException(Err_shutdown,"rtsp player send teardown request");
xzl committed
839 840
}

841
void RtspSession::handleReq_Get(const Parser &parser) {
xiongziliang committed
842
	_http_x_sessioncookie = parser["x-sessioncookie"];
843
	sendRtspResponse("200 OK",
844
					 {"Cache-Control","no-store",
845 846 847 848
					  "Pragma","no-store",
					  "Content-Type","application/x-rtsp-tunnelled",
					 },"","HTTP/1.0");

849
	//注册http getter,以便http poster绑定
xzl committed
850
	lock_guard<recursive_mutex> lock(g_mtxGetter);
851
	g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
xzl committed
852 853
}

854
void RtspSession::handleReq_Post(const Parser &parser) {
xzl committed
855
	lock_guard<recursive_mutex> lock(g_mtxGetter);
xiongziliang committed
856
	string sessioncookie = parser["x-sessioncookie"];
857
	//Poster 找到 Getter
xzl committed
858 859
	auto it = g_mapGetter.find(sessioncookie);
	if (it == g_mapGetter.end()) {
860 861
        throw SockException(Err_shutdown,"can not find http getter by x-sessioncookie");
    }
862 863 864 865

	//Poster 找到Getter的SOCK
	auto httpGetterWeak = it->second;
	//移除http getter的弱引用记录
xzl committed
866
	g_mapGetter.erase(sessioncookie);
867 868 869 870 871

	//http poster收到请求后转发给http getter处理
	_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
		auto httpGetterStrong = httpGetterWeak.lock();
		if(!httpGetterStrong){
xiongziliang committed
872
			shutdown(SockException(Err_shutdown,"http getter released"));
873 874 875 876 877 878 879 880 881 882 883 884 885
			return;
		}

		//切换到http getter的线程
		httpGetterStrong->async([pBuf,httpGetterWeak](){
			auto httpGetterStrong = httpGetterWeak.lock();
			if(!httpGetterStrong){
				return;
			}
			httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
		});
	};

xiongziliang committed
886 887 888 889
	if(!parser.Content().empty()){
		//http poster后面的粘包
		_onRecv(std::make_shared<BufferString>(parser.Content()));
	}
890 891 892 893 894 895

    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");
xzl committed
896 897
}

898
void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
899
	//TraceP(this) <<endl;
900
	sendRtspResponse("200 OK");
xzl committed
901 902 903
}

inline void RtspSession::send_NotAcceptable() {
904
	sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
xzl committed
905
}
906

907

xiongziliang committed
908
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
xiongziliang committed
909
	_pushSrc->onWrite(rtppt, false);
xiongziliang committed
910
}
911
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
912 913 914
	//这是rtcp心跳包,说明播放器还存活
	_ticker.resetTime();

915
	if(intervaled % 2 == 0){
xiongziliang committed
916
		if(_pushSrc){
917
			handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size());
918 919 920 921
		}else if(_udpSockConnected.count(intervaled)){
            //这是rtp打洞包
            _udpSockConnected.emplace(intervaled);
            _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
xzl committed
922
		}
923 924
	}else{
	    //rtcp包
925 926 927
        if(_udpSockConnected.count(intervaled)){
            _apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr);
        }
928 929
        onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),
                     pBuf->size());
930
    }
xzl committed
931 932 933
}


934
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
xzl committed
935
	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
936 937
    auto srcIP = inet_addr(get_peer_ip().data());
	auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
938 939 940 941
		auto strongSelf=weakSelf.lock();
		if(!strongSelf) {
			return false;
		}
942 943 944 945 946 947 948

        if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
            WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
                                    << inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
            return true;
        }

949
		struct sockaddr addr=*pPeerAddr;
950
		strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
951 952 953 954
			auto strongSelf=weakSelf.lock();
			if(!strongSelf) {
				return;
			}
955
			strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
956 957 958 959 960
		});
		return true;
	};

	switch (_rtpType){
xiongziliang committed
961
		case Rtsp::RTP_MULTICAST:{
962 963
			//组播使用的共享rtcp端口
			UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
964 965
					int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
				return onUdpData(pBuf,pPeerAddr,intervaled);
xzl committed
966
			});
967 968
		}
			break;
xiongziliang committed
969
		case Rtsp::RTP_UDP:{
970
			auto setEvent = [&](Socket::Ptr &sock,int intervaled){
971
				if(!sock){
972
					WarnP(this) << "udp端口为空:" << intervaled;
973 974
					return;
				}
975
				sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
976
					onUdpData(pBuf,pPeerAddr,intervaled);
977 978 979 980 981 982 983 984 985 986 987
				});
			};
			setEvent(_apRtpSock[trackIdx], 2*trackIdx );
			setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
		}
			break;

		default:
			break;
	}

xzl committed
988 989
}

990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
static string dateStr(){
	char buf[64];
	time_t tt = time(NULL);
	strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
	return buf;
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const StrCaseMap &header_const,
								   const string &sdp,
								   const char *protocol){
	auto header = header_const;
	header.emplace("CSeq",StrPrinter << _iCseq);
	if(!_strSession.empty()){
		header.emplace("Session",_strSession);
	}

	header.emplace("Server",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
	header.emplace("Date",dateStr());

	if(!sdp.empty()){
		header.emplace("Content-Length",StrPrinter << sdp.size());
		header.emplace("Content-Type","application/sdp");
	}

	_StrPrinter printer;
	printer << protocol << " " << res_code << "\r\n";
	for (auto &pr : header){
		printer << pr.first << ": " << pr.second << "\r\n";
	}

	printer << "\r\n";

	if(!sdp.empty()){
		printer << sdp;
	}
1026
//	DebugP(this) << printer;
1027 1028 1029 1030
	return send(std::make_shared<BufferString>(printer)) > 0 ;
}

int RtspSession::send(const Buffer::Ptr &pkt){
xiongziliang committed
1031
//	if(!_enableSendRtp){
1032
//		DebugP(this) << pkt->data();
xiongziliang committed
1033
//	}
1034
	_ui64TotalBytes += pkt->size();
1035
	return TcpSession::send(pkt);
1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const std::initializer_list<string> &header,
								   const string &sdp,
								   const char *protocol) {
	string key;
	StrCaseMap header_map;
	int i = 0;
	for(auto &val : header){
		if(++i % 2 == 0){
			header_map.emplace(key,val);
		}else{
			key = val;
		}
	}
	return sendRtspResponse(res_code,header_map,sdp,protocol);
}

inline string RtspSession::printSSRC(uint32_t ui32Ssrc) {
	char tmp[9] = { 0 };
	ui32Ssrc = htonl(ui32Ssrc);
	uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
	for (int i = 0; i < 4; i++) {
		sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
	}
	return tmp;
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (type == _aTrackInfo[i]->_type) {
			return i;
		}
	}
	return -1;
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
			return i;
		}
	}
xiongziliang committed
1078 1079 1080
	if(_aTrackInfo.size() == 1){
        return 0;
	}
1081 1082 1083
	return -1;
}

xiongziliang committed
1084 1085 1086 1087 1088 1089 1090 1091 1092
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (_aTrackInfo[i]->_interleaved == interleaved) {
			return i;
		}
	}
	return -1;
}

xiongziliang committed
1093
bool RtspSession::close(MediaSource &sender,bool force) {
1094 1095
    //此回调在其他线程触发
    if(!_pushSrc || (!force && _pushSrc->readerCount() != 0)){
1096 1097
        return false;
    }
xiongziliang committed
1098 1099
	string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
	safeShutdown(SockException(Err_shutdown,err));
1100 1101
	return true;
}
1102

1103

1104 1105 1106 1107 1108 1109 1110 1111 1112
void RtspSession::onNoneReader(MediaSource &sender){
    //此回调在其他线程触发
    if(!_pushSrc || _pushSrc->readerCount() != 0){
        return;
    }
    MediaSourceEvent::onNoneReader(sender);
}


1113
inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
1114
    //InfoP(this) <<(int)pkt.Interleaved;
1115 1116
    switch (_rtpType) {
        case Rtsp::RTP_TCP: {
1117
            send(pkt);
1118 1119 1120 1121 1122 1123
        }
            break;
        case Rtsp::RTP_UDP: {
            int iTrackIndex = getTrackIndexByTrackType(pkt->type);
            auto &pSock = _apRtpSock[iTrackIndex];
            if (!pSock) {
xiongziliang committed
1124
                shutdown(SockException(Err_shutdown,"udp sock not opened yet"));
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
                return;
            }
            BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
            _ui64TotalBytes += buffer->size();
            pSock->send(buffer);
        }
            break;
        default:
            break;
    }

1136
#if RTSP_SERVER_SEND_RTCP
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
    int iTrackIndex = getTrackIndexByInterleaved(pkt->interleaved);
    if(iTrackIndex == -1){
        return;
    }
    RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
    counter.pktCnt += 1;
    counter.octCount += (pkt->length - pkt->offset);
    auto &ticker = _aRtcpTicker[iTrackIndex];
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        ticker.resetTime();
        //直接保存网络字节序
        memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
        sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
    }
1152
#endif
1153 1154 1155
}

inline void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
xiongziliang committed
1156 1157
    static const char s_cname[] = "ZLMediaKitRtsp";
    uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
1158 1159 1160 1161 1162 1163
    uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
    auto &track = _aTrackInfo[iTrackIndex];
    auto &counter = _aRtcpCnt[iTrackIndex];

    aui8Rtcp[0] = '$';
    aui8Rtcp[1] = track->_interleaved + 1;
xiongziliang committed
1164 1165
    aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8;
    aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;
1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204

    pui8Rtcp_SR[0] = 0x80;
    pui8Rtcp_SR[1] = 0xC8;
    pui8Rtcp_SR[2] = 0x00;
    pui8Rtcp_SR[3] = 0x06;

    uint32_t ssrc=htonl(track->_ssrc);
    memcpy(&pui8Rtcp_SR[4], &ssrc, 4);

    uint64_t msw;
    uint64_t lsw;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
    lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);

    msw = htonl(msw);
    memcpy(&pui8Rtcp_SR[8], &msw, 4);

    lsw = htonl(lsw);
    memcpy(&pui8Rtcp_SR[12], &lsw, 4);
    //直接使用网络字节序
    memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4);

    uint32_t pktCnt = htonl(counter.pktCnt);
    memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);

    uint32_t octCount = htonl(counter.octCount);
    memcpy(&pui8Rtcp_SR[24], &octCount, 4);

    pui8Rtcp_SDES[0] = 0x81;
    pui8Rtcp_SDES[1] = 0xCA;
    pui8Rtcp_SDES[2] = 0x00;
    pui8Rtcp_SDES[3] = 0x06;

    memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

    pui8Rtcp_SDES[8] = 0x01;
    pui8Rtcp_SDES[9] = 0x0f;
xiongziliang committed
1205 1206
    memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
    pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
1207 1208 1209 1210 1211 1212

    if(overTcp){
        send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
    }else {
        _apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
    }
xzl committed
1213 1214 1215
}

}
xiongziliang committed
1216
/* namespace mediakit */
xzl committed
1217