RtspSession.cpp 44.5 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4
 * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
xiongziliang committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25
 */
26

xiongzilaing committed
27
#include <atomic>
28
#include <iomanip>
xiongziliang committed
29
#include "Common/config.h"
xiongzilaing committed
30
#include "UDPServer.h"
xzl committed
31
#include "RtspSession.h"
xiongzilaing committed
32
#include "Util/mini.h"
33
#include "Util/MD5.h"
xiongziliang committed
34
#include "Util/base64.h"
xiongzilaing committed
35
#include "Util/onceToken.h"
xzl committed
36 37
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongzilaing committed
38
#include "Network/sockutil.h"
xzl committed
39

40 41
#define RTSP_SERVER_SEND_RTCP 0

xiongziliang committed
42 43
using namespace std;
using namespace toolkit;
xzl committed
44

xiongziliang committed
45
namespace mediakit {
xzl committed
46

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
/**
 * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
 * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
 * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输
 * 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输
 * 4: rtp over http,下面着重讲解:rtp over http
 *
 * rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的,
 * 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求,
 * 第二次是http post请求。
 *
 * 这两次请求通过http请求头中的x-sessioncookie键完成绑定
 *
 * 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求
 * 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送
 * 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包,
 * 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接
 * 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据
 *
 * zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理
 */


//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
xiongziliang committed
74

xiongziliang committed
75
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
xiongziliang committed
76
    DebugP(this);
77 78
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
    pSock->setSendTimeOutSecond(keep_alive_sec);
xiongziliang committed
79 80
    //起始接收buffer缓存设置为4K,节省内存
    pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
xzl committed
81 82 83
}

RtspSession::~RtspSession() {
84
    DebugP(this);
xzl committed
85 86 87
}

void RtspSession::onError(const SockException& err) {
xiongziliang committed
88
    bool isPlayer = !_pushSrc;
89
    WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(")
xiongziliang committed
90 91 92 93 94
                << _mediaInfo._vhost << "/"
                << _mediaInfo._app << "/"
                << _mediaInfo._streamid
                << ")断开:" << err.what();

xiongziliang committed
95
	if (_rtpType == Rtsp::RTP_MULTICAST) {
xiongziliang committed
96
		//取消UDP端口监听
97
		UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
xzl committed
98 99
	}

100 101 102 103
	if (_http_x_sessioncookie.size() != 0) {
		//移除http getter的弱引用记录
		lock_guard<recursive_mutex> lock(g_mtxGetter);
		g_mapGetter.erase(_http_x_sessioncookie);
xzl committed
104
	}
105 106

    //流量统计事件广播
107
    GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
108
    if(_ui64TotalBytes > iFlowThreshold * 1024){
109
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, _ticker.createdTime()/1000, isPlayer);
110
    }
111

xzl committed
112 113 114
}

void RtspSession::onManager() {
115
    GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
116 117 118
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);

    if (_ticker.createdTime() > handshake_sec * 1000) {
119
		if (_strSession.size() == 0) {
xiongziliang committed
120
			shutdown(SockException(Err_timeout,"illegal connection"));
xzl committed
121 122 123
			return;
		}
	}
124

125

126
	if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
127
		//如果是推流端或者rtp over udp类型的播放端,那么就做超时检测
xiongziliang committed
128 129
        shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
        return;
xzl committed
130 131 132
	}
}

xiongziliang committed
133 134 135 136 137 138 139
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
	_ticker.resetTime();
    _ui64TotalBytes += pBuf->size();
    if (_onRecv) {
		//http poster的请求数据转发给http getter处理
		_onRecv(pBuf);
	} else {
140
//    	TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
xiongziliang committed
141 142 143
		input(pBuf->data(),pBuf->size());
	}
}
xiongziliang committed
144

145 146 147 148 149 150
//字符串是否以xx结尾
static inline bool end_of(const string &str, const string &substr){
    auto pos = str.rfind(substr);
    return pos != string::npos && pos == str.size() - substr.size();
};

xiongziliang committed
151 152 153
void RtspSession::onWholeRtspPacket(Parser &parser) {
	string strCmd = parser.Method(); //提取出请求命令字
	_iCseq = atoi(parser["CSeq"].data());
154
	if(_strContentBase.empty() && strCmd != "GET"){
xiongziliang committed
155 156
		_strContentBase = parser.Url();
		_mediaInfo.parse(parser.FullUrl());
157 158
        _mediaInfo._schema = RTSP_SCHEMA;
    }
xzl committed
159

160
	typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
xiongziliang committed
161
	static unordered_map<string, rtsp_request_handler> s_cmd_functions;
xiongziliang committed
162
	static onceToken token( []() {
xiongziliang committed
163 164 165 166 167 168 169 170 171 172 173 174
		s_cmd_functions.emplace("OPTIONS",&RtspSession::handleReq_Options);
		s_cmd_functions.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
		s_cmd_functions.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
		s_cmd_functions.emplace("RECORD",&RtspSession::handleReq_RECORD);
		s_cmd_functions.emplace("SETUP",&RtspSession::handleReq_Setup);
		s_cmd_functions.emplace("PLAY",&RtspSession::handleReq_Play);
		s_cmd_functions.emplace("PAUSE",&RtspSession::handleReq_Pause);
		s_cmd_functions.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
		s_cmd_functions.emplace("GET",&RtspSession::handleReq_Get);
		s_cmd_functions.emplace("POST",&RtspSession::handleReq_Post);
		s_cmd_functions.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
		s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
xiongziliang committed
175 176
	}, []() {});

xiongziliang committed
177 178
	auto it = s_cmd_functions.find(strCmd);
	if (it == s_cmd_functions.end()) {
179
        sendRtspResponse("403 Forbidden");
xiongziliang committed
180
        shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
181
        return;
xzl committed
182
	}
183 184 185 186 187 188 189 190 191 192 193 194

    auto &fun = it->second;
    try {
        (this->*fun)(parser);
    }catch (SockException &ex){
        if(ex){
            shutdown(ex);
        }
    }catch (exception &ex){
        shutdown(SockException(Err_shutdown,ex.what()));
    }
    parser.Clear();
xiongziliang committed
195 196
}

xiongziliang committed
197
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
xiongziliang committed
198 199 200
	if(!_pushSrc){
		return;
	}
201

xiongziliang committed
202 203 204 205
	int trackIdx = -1;
	uint8_t interleaved = data[1];
	if(interleaved %2 == 0){
		trackIdx = getTrackIndexByInterleaved(interleaved);
206 207 208 209 210 211
        if (trackIdx != -1) {
            handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
        }
	}else{
        trackIdx = getTrackIndexByInterleaved(interleaved - 1);
        if (trackIdx != -1) {
212
            onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4);
213
        }
xiongziliang committed
214 215 216
	}
}

217
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
218 219

}
xiongziliang committed
220 221 222 223
int64_t RtspSession::getContentLength(Parser &parser) {
	if(parser.Method() == "POST"){
		//http post请求的content数据部分是base64编码后的rtsp请求信令包
		return remainDataSize();
xiongziliang committed
224
	}
xiongziliang committed
225
	return RtspSplitter::getContentLength(parser);
xzl committed
226 227
}

xiongziliang committed
228

229
void RtspSession::handleReq_Options(const Parser &parser) {
230
	//支持这些命令
231
	sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
xzl committed
232 233
}

234
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
xiongziliang committed
235 236 237 238 239 240 241
	auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTSP_SCHEMA,
																	   _mediaInfo._vhost,
																	   _mediaInfo._app,
																	   _mediaInfo._streamid,
																	   false));
	if(src){
		sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
242 243 244 245 246 247
        string err = StrPrinter << "ANNOUNCE:"
                                << "Already publishing:"
                                << _mediaInfo._vhost << " "
                                << _mediaInfo._app << " "
                                << _mediaInfo._streamid << endl;
		throw SockException(Err_shutdown,err);
248
	}
xiongziliang committed
249

250 251 252
    auto full_url = parser.FullUrl();
    if(end_of(full_url,".sdp")){
        //去除.sdp后缀,防止EasyDarwin推流器强制添加.sdp后缀
253
        full_url = full_url.substr(0,full_url.length() - 4);
254 255 256
        _mediaInfo.parse(full_url);
    }

257 258 259
    SdpParser sdpParser(parser.Content());
    _strSession = makeRandStr(12);
    _aTrackInfo = sdpParser.getAvailableTrack();
xiongziliang committed
260

261
	_pushSrc = std::make_shared<RtspMediaSourceImp>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
xiongziliang committed
262
	_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
263
    _pushSrc->setSdp(sdpParser.toString());
264 265

	sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"});
266 267
}

268
void RtspSession::handleReq_RECORD(const Parser &parser){
xiongziliang committed
269
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
270
		send_SessionNotFound();
271
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
272
	}
273
	auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
274 275 276
		bool authSuccess = err.empty();
		if(!authSuccess){
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
277
			shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
278 279 280
			return;
		}

281 282 283
        //设置转协议
        _pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);

284 285 286 287
		_StrPrinter rtp_info;
		for(auto &track : _aTrackInfo){
			if (track->_inited == false) {
				//还有track没有setup
xiongziliang committed
288 289
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
290
			}
xiongziliang committed
291
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
292 293 294 295
		}

		rtp_info.pop_back();
		sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
296 297
		if(_rtpType == Rtsp::RTP_TCP){
			//如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能
298
			_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
299
            setSocketFlags();
300
		}
301 302 303
	};

	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
304
	Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
305 306 307 308
		auto strongSelf = weakSelf.lock();
		if(!strongSelf){
			return;
		}
309
		strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){
310 311 312 313
			auto strongSelf = weakSelf.lock();
			if(!strongSelf){
				return;
			}
314
			onRes(err,enableRtxp,enableHls,enableMP4);
315 316 317 318 319 320 321
		});
	};

	//rtsp推流需要鉴权
	auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
	if(!flag){
		//该事件无人监听,默认不鉴权
322 323 324 325 326
        GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
        GET_CONFIG(bool,toHls,General::kPublishToHls);
        GET_CONFIG(bool,toMP4,General::kPublishToMP4);
        onRes("",toRtxp,toHls,toMP4);
    }
327 328
}

329
void RtspSession::handleReq_Describe(const Parser &parser) {
330 331
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    //该请求中的认证信息
332
    auto authorization = parser["Authorization"];
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
    onGetRealm invoker = [weakSelf,authorization](const string &realm){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
        }
        //切换到自己的线程然后执行
        strongSelf->async([weakSelf,realm,authorization](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            if(realm.empty()){
                //无需认证,回复sdp
                strongSelf->onAuthSuccess();
                return;
            }
            //该流需要认证
            strongSelf->onAuthUser(realm,authorization);
        });

    };

    //广播是否需要认证事件
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
                                           _mediaInfo,
                                           invoker,
                                           *this)){
        //无人监听此事件,说明无需认证
        invoker("");
    }
}
void RtspSession::onAuthSuccess() {
    TraceP(this);
368
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
xiongziliang committed
369
    MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){
370 371 372 373 374 375 376
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
        if (!rtsp_src) {
            //未找到相应的MediaSource
xiongziliang committed
377
            string err = StrPrinter << "no such stream:" <<  strongSelf->_mediaInfo._vhost << " " <<  strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
378
            strongSelf->send_StreamNotFound();
xiongziliang committed
379
            strongSelf->shutdown(SockException(Err_shutdown,err));
380 381
            return;
        }
3503207480@qq.com committed
382
        //找到了相应的rtsp流
383
        strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
384 385
        if (strongSelf->_aTrackInfo.empty()) {
            //该流无效
3503207480@qq.com committed
386
			DebugL << "无trackInfo,该流无效";
387
            strongSelf->send_StreamNotFound();
xiongziliang committed
388
            strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
389 390 391 392 393 394 395 396 397
            return;
        }
        strongSelf->_strSession = makeRandStr(12);
        strongSelf->_pMediaSrc = rtsp_src;
        for(auto &track : strongSelf->_aTrackInfo){
            track->_ssrc = rtsp_src->getSsrc(track->_type);
            track->_seq = rtsp_src->getSeqence(track->_type);
            track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
        }
xiongziliang committed
398

399 400 401 402
        strongSelf->sendRtspResponse("200 OK",
                                     {"Content-Base",strongSelf->_strContentBase + "/",
                                      "x-Accept-Retransmit","our-retransmit",
                                      "x-Accept-Dynamic-Rate","1"
403
                                     },rtsp_src->getSdp());
404
    });
xzl committed
405
}
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
    GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
    if (!authBasic) {
        //我们需要客户端优先以md5方式认证
        _strNonce = makeRandStr(32);
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" });
    }else {
        //当然我们也支持base64认证,但是我们不建议这样做
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Basic realm=\"" << realm << "\"" });
    }
    if(close){
        shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << why));
422 423 424
    }
}

425
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
426 427 428
    //base64认证
    char user_pwd_buf[512];
    av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
429
    auto user_pwd_vec = split(user_pwd_buf,":");
430 431
    if(user_pwd_vec.size() < 2){
        //认证信息格式不合法,回复401 Unauthorized
432
        onAuthFailed(realm,"can not find user and passwd when basic64 auth");
433 434 435 436
        return;
    }
    auto user = user_pwd_vec[0];
    auto pwd = user_pwd_vec[1];
437
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
438
    onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
439 440 441 442
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
443
        }
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
        //切换到自己的线程执行
        strongSelf->async([weakSelf,good_pwd,pwd,realm](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            //base64忽略encrypted参数,上层必须传入明文密码
            if(pwd == good_pwd){
                //提供的密码且匹配正确
                strongSelf->onAuthSuccess();
                return;
            }
            //密码错误
            strongSelf->onAuthFailed(realm,StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd);
        });
460
    };
461

462
    //此时必须提供明文密码
463
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,*this)){
464
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
465
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
466 467 468 469 470
        //但是我们还是忽略认证以便完成播放
        //我们输入的密码是明文
        invoker(false,pwd);
    }
}
471

472 473
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
	DebugP(this) << strMd5;
474 475 476
    auto mapTmp = Parser::parseArgs(strMd5,",","=");
    decltype(mapTmp) map;
    for(auto &pr : mapTmp){
477
        map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
478 479 480
    }
    //check realm
    if(realm != map["realm"]){
481
        onAuthFailed(realm,StrPrinter << "realm not mached:" << realm << " != " << map["realm"]);
482 483 484 485
        return ;
    }
    //check nonce
    auto nonce = map["nonce"];
486 487
    if(_strNonce != nonce){
        onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce);
488 489 490 491 492 493 494
        return ;
    }
    //check username and uri
    auto username = map["username"];
    auto uri = map["uri"];
    auto response = map["response"];
    if(username.empty() || uri.empty() || response.empty()){
495
        onAuthFailed(realm,StrPrinter << "username/uri/response empty:" << username << "," << uri << "," << response);
496 497 498
        return ;
    }

499
    auto realInvoker = [this,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
500 501
        if(ignoreAuth){
            //忽略认证
502 503
            TraceP(this) << "auth ignored";
            onAuthSuccess();
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
            return;
        }
        /*
        response计算方法如下:
        RTSP客户端应该使用username + password并计算response如下:
        (1)当password为MD5编码,则
            response = md5( password:nonce:md5(public_method:url)  );
        (2)当password为ANSI字符串,则
            response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
         */
        auto encrypted_pwd = good_pwd;
        if(!encrypted){
            //提供的是明文密码
            encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
        }

        auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
xiongziliang committed
521
        if(strcasecmp(good_response.data(),response.data()) == 0){
522
            //认证成功!md5不区分大小写
523
            onAuthSuccess();
524 525
        }else{
            //认证失败!
526
            onAuthFailed(realm, StrPrinter << "password mismatch when md5 auth:" << good_response << " != " << response );
527 528
        }
    };
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        //切换到自己的线程确保realInvoker执行时,this指针有效
        strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
            realInvoker(false,encrypted,good_pwd);
        });
544 545 546
    };

    //此时可以提供明文或md5加密的密码
547
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,*this)){
548
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
549
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
550 551 552 553 554
        //但是我们还是忽略认证以便完成播放
        realInvoker(true,true,"");
    }
}

555 556 557 558 559
void RtspSession::onAuthUser(const string &realm,const string &authorization){
    if(authorization.empty()){
        onAuthFailed(realm,"", false);
        return;
    }
560 561 562 563 564
    //请求中包含认证信息
    auto authType = FindField(authorization.data(),NULL," ");
	auto authStr = FindField(authorization.data()," ",NULL);
    if(authType.empty() || authStr.empty()){
        //认证信息格式不合法,回复401 Unauthorized
565
        onAuthFailed(realm,"can not find auth type or auth string");
566 567 568 569
        return;
    }
    if(authType == "Basic"){
        //base64认证,需要明文密码
570
        onAuthBasic(realm,authStr);
571 572
    }else if(authType == "Digest"){
        //md5认证
573
        onAuthDigest(realm,authStr);
574 575
    }else{
        //其他认证方式?不支持!
576
        onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
577 578
    }
}
xzl committed
579
inline void RtspSession::send_StreamNotFound() {
580
	sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
xzl committed
581 582
}
inline void RtspSession::send_UnsupportedTransport() {
583
	sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
xzl committed
584 585 586
}

inline void RtspSession::send_SessionNotFound() {
587
	sendRtspResponse("454 Session Not Found",{"Connection","Close"});
xzl committed
588
}
589 590

void RtspSession::handleReq_Setup(const Parser &parser) {
xzl committed
591
//处理setup命令,该函数可能进入多次
592
    auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
xiongziliang committed
593 594 595
    if(controlSuffix.front() == '/'){
		controlSuffix = controlSuffix.substr(1);
    }
596
	int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
xzl committed
597 598
	if (trackIdx == -1) {
		//未找到相应track
599 600
        throw SockException(Err_shutdown, StrPrinter << "can not find any track by control suffix:" << controlSuffix);
    }
xiongziliang committed
601
	SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
602
	if (trackRef->_inited) {
xzl committed
603
		//已经初始化过该Track
604
        throw SockException(Err_shutdown, "can not setup one track twice");
xzl committed
605
	}
606
	trackRef->_inited = true; //现在初始化
xzl committed
607

xiongziliang committed
608
	if(_rtpType == Rtsp::RTP_Invalid){
609
		auto &strTransport = parser["Transport"];
xzl committed
610
		if(strTransport.find("TCP") != string::npos){
xiongziliang committed
611
			_rtpType = Rtsp::RTP_TCP;
xzl committed
612
		}else if(strTransport.find("multicast") != string::npos){
xiongziliang committed
613
			_rtpType = Rtsp::RTP_MULTICAST;
xzl committed
614
		}else{
xiongziliang committed
615
			_rtpType = Rtsp::RTP_UDP;
xzl committed
616 617 618
		}
	}

619
    //允许接收rtp、rtcp包
xiongziliang committed
620
	RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
xiongziliang committed
621

622
	switch (_rtpType) {
xiongziliang committed
623
	case Rtsp::RTP_TCP: {
624 625 626 627 628 629 630 631 632 633 634 635 636
        if(_pushSrc){
            //rtsp推流时,interleaved由推流者决定
            auto key_values =  Parser::parseArgs(parser["Transport"],";","=");
            int interleaved_rtp = -1 , interleaved_rtcp = -1;
            if(2 == sscanf(key_values["interleaved"].data(),"%d-%d",&interleaved_rtp,&interleaved_rtcp)){
                trackRef->_interleaved = interleaved_rtp;
            }else{
                throw SockException(Err_shutdown, "can not find interleaved when setup of rtp over tcp");
            }
        }else{
            //rtsp播放时,由于数据共享分发,所以interleaved必须由服务器决定
            trackRef->_interleaved = 2 * trackRef->_type;
        }
637 638
		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
639
												 << "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";"
640
												 << "ssrc=" << printSSRC(trackRef->_ssrc),
641 642 643
						  "x-Transport-Options" , "late-tolerance=1.400000",
						  "x-Dynamic-Rate" , "1"
						 });
xzl committed
644 645
	}
		break;
xiongziliang committed
646
	case Rtsp::RTP_UDP: {
xiongziliang committed
647
		//我们用trackIdx区分rtp和rtcp包
648 649
		auto pSockRtp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) {
xiongziliang committed
650 651
			//分配端口失败
			send_NotAcceptable();
652 653
            throw SockException(Err_shutdown, "open rtp socket failed");
        }
654 655
		auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller());
		if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) {
xzl committed
656 657
			//分配端口失败
			send_NotAcceptable();
658 659
            throw SockException(Err_shutdown, "open rtcp socket failed");
        }
660 661
		_apRtpSock[trackIdx] = pSockRtp;
		_apRtcpSock[trackIdx] = pSockRtcp;
xiongziliang committed
662
		//设置客户端内网端口信息
xiongziliang committed
663
		string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
664 665 666 667 668
		uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
        uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data());

        struct sockaddr_in peerAddr;
        //设置rtp发送目标地址
xzl committed
669
		peerAddr.sin_family = AF_INET;
670
		peerAddr.sin_port = htons(ui16RtpPort);
671
		peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
xzl committed
672
		bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
673
		pSockRtp->setSendPeerAddr((struct sockaddr *)(&peerAddr));
674 675 676 677 678 679 680 681

		//设置rtcp发送目标地址
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtcpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
        pSockRtcp->setSendPeerAddr((struct sockaddr *)(&peerAddr));

xiongziliang committed
682
		//尝试获取客户端nat映射地址
683
		startListenPeerUdpData(trackIdx);
684
		//InfoP(this) << "分配端口:" << srv_port;
685 686 687 688 689

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;"
												 << "client_port=" << strClientPort << ";"
												 << "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";"
690
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
691
						 });
xzl committed
692 693
	}
		break;
xiongziliang committed
694
	case Rtsp::RTP_MULTICAST: {
xiongziliang committed
695 696 697
		if(!_multicaster){
			_multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
			if (!_multicaster) {
xzl committed
698
				send_NotAcceptable();
699 700
                throw SockException(Err_shutdown, "can not get a available udp multicast socket");
            }
xzl committed
701
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
xiongziliang committed
702
			_multicaster->setDetachCB(this, [weakSelf]() {
xzl committed
703 704 705 706
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
707
				strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
xzl committed
708 709
			});
		}
xiongziliang committed
710
		int iSrvPort = _multicaster->getPort(trackRef->_type);
xiongziliang committed
711
		//我们用trackIdx区分rtp和rtcp包
712
		//由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
713
		auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
xiongziliang committed
714 715 716
		if (!pSockRtcp) {
			//分配端口失败
			send_NotAcceptable();
717
            throw SockException(Err_shutdown, "open shared rtcp socket failed");
xiongziliang committed
718
		}
719
		startListenPeerUdpData(trackIdx);
720
        GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
721 722 723

		sendRtspResponse("200 OK",
						 {"Transport",StrPrinter << "RTP/AVP;multicast;"
xiongziliang committed
724
												 << "destination=" << _multicaster->getIP() << ";"
725 726 727 728 729
												 << "source=" << get_local_ip() << ";"
												 << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
												 << "ttl=" << udpTTL << ";"
												 << "ssrc=" << printSSRC(trackRef->_ssrc)
						 });
xzl committed
730 731 732 733 734 735 736
	}
		break;
	default:
		break;
	}
}

737
void RtspSession::handleReq_Play(const Parser &parser) {
xiongziliang committed
738
	if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
xzl committed
739
		send_SessionNotFound();
740 741
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when play" : "session not found when play");
    }
xiongziliang committed
742
	auto strRange = parser["Range"];
743
    auto onRes = [this,strRange](const string &err){
744
        bool authSuccess = err.empty();
xiongziliang committed
745
        if(!authSuccess){
746
            //第一次play是播放,否则是恢复播放。只对播放鉴权
747
			sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
xiongziliang committed
748
            shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
749 750
            return;
        }
xzl committed
751

752
        auto pMediaSrc = _pMediaSrc.lock();
753 754
        if(!pMediaSrc){
        	send_StreamNotFound();
xiongziliang committed
755
        	shutdown(SockException(Err_shutdown,"rtsp stream released"));
756
			return;
757
        }
758

759 760 761
        bool useBuf = true;
		_enableSendRtp = false;

762
		if (strRange.size() && !_bFirstPlay) {
xiongziliang committed
763
            //这个是seek操作
764 765 766 767 768
			auto strStart = FindField(strRange.data(), "npt=", "-");
			if (strStart == "now") {
				strStart = "0";
			}
			auto iStartTime = 1000 * atof(strStart.data());
769
			InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
770
			useBuf = !pMediaSrc->seekTo(iStartTime);
771
		}else if(pMediaSrc->totalReaderCount() == 0){
772 773 774 775
			//第一个消费者
			pMediaSrc->seekTo(0);
		}
		_bFirstPlay = false;
776

777
		_StrPrinter rtp_info;
xiongziliang committed
778
		for(auto &track : _aTrackInfo){
779
			if (track->_inited == false) {
xiongziliang committed
780
				//还有track没有setup
xiongziliang committed
781 782
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
xiongziliang committed
783
			}
784 785 786 787
			track->_ssrc = pMediaSrc->getSsrc(track->_type);
			track->_seq = pMediaSrc->getSeqence(track->_type);
			track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);

xiongziliang committed
788
			rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";"
789 790
					 << "seq=" << track->_seq << ";"
					 << "rtptime=" << (int)(track->_time_stamp * (track->_samplerate / 1000)) << ",";
xiongziliang committed
791 792
		}

793 794 795 796 797 798
		rtp_info.pop_back();

		sendRtspResponse("200 OK",
						 {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) <<  pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0,
						  "RTP-Info",rtp_info
						 });
799

800
		_enableSendRtp = true;
801
        setSocketFlags();
xiongziliang committed
802

xiongziliang committed
803
		if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
804
			weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
805
			_pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf);
806 807 808 809 810
			_pRtpReader->setDetachCB([weakSelf]() {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
xiongziliang committed
811 812
                strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached"));
            });
813 814 815 816 817
			_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
				auto strongSelf = weakSelf.lock();
				if(!strongSelf) {
					return;
				}
818 819 820
				if(strongSelf->_enableSendRtp) {
					strongSelf->sendRtpPacket(pack);
				}
821 822
			});
		}
823
    };
xzl committed
824

825
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
826
    Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
827 828 829
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
xzl committed
830
        }
831
        strongSelf->async([weakSelf,onRes,err](){
832 833 834 835
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
836
            onRes(err);
837 838
        });
    };
xiongziliang committed
839 840 841 842 843 844 845 846 847
    if(_bFirstPlay){
        //第一次收到play命令,需要鉴权
        auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
        if(!flag){
            //该事件无人监听,默认不鉴权
            onRes("");
        }
    }else{
        //后面是seek或恢复命令,不需要鉴权
848
        onRes("");
849
    }
xzl committed
850 851
}

852
void RtspSession::handleReq_Pause(const Parser &parser) {
xiongziliang committed
853
	if (parser["Session"] != _strSession) {
xzl committed
854
		send_SessionNotFound();
855 856
        throw SockException(Err_shutdown,"session not found when pause");
    }
857 858

	sendRtspResponse("200 OK");
859
	_enableSendRtp = false;
xzl committed
860 861
}

862
void RtspSession::handleReq_Teardown(const Parser &parser) {
863
	sendRtspResponse("200 OK");
864
    throw SockException(Err_shutdown,"rtsp player send teardown request");
xzl committed
865 866
}

867
void RtspSession::handleReq_Get(const Parser &parser) {
xiongziliang committed
868
	_http_x_sessioncookie = parser["x-sessioncookie"];
869
	sendRtspResponse("200 OK",
870
					 {"Cache-Control","no-store",
871 872 873 874
					  "Pragma","no-store",
					  "Content-Type","application/x-rtsp-tunnelled",
					 },"","HTTP/1.0");

875
	//注册http getter,以便http poster绑定
xzl committed
876
	lock_guard<recursive_mutex> lock(g_mtxGetter);
877
	g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
xzl committed
878 879
}

880
void RtspSession::handleReq_Post(const Parser &parser) {
xzl committed
881
	lock_guard<recursive_mutex> lock(g_mtxGetter);
xiongziliang committed
882
	string sessioncookie = parser["x-sessioncookie"];
883
	//Poster 找到 Getter
xzl committed
884 885
	auto it = g_mapGetter.find(sessioncookie);
	if (it == g_mapGetter.end()) {
886 887
        throw SockException(Err_shutdown,"can not find http getter by x-sessioncookie");
    }
888 889 890 891

	//Poster 找到Getter的SOCK
	auto httpGetterWeak = it->second;
	//移除http getter的弱引用记录
xzl committed
892
	g_mapGetter.erase(sessioncookie);
893 894 895 896 897

	//http poster收到请求后转发给http getter处理
	_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
		auto httpGetterStrong = httpGetterWeak.lock();
		if(!httpGetterStrong){
xiongziliang committed
898
			shutdown(SockException(Err_shutdown,"http getter released"));
899 900 901 902 903 904 905 906 907 908 909 910 911
			return;
		}

		//切换到http getter的线程
		httpGetterStrong->async([pBuf,httpGetterWeak](){
			auto httpGetterStrong = httpGetterWeak.lock();
			if(!httpGetterStrong){
				return;
			}
			httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
		});
	};

xiongziliang committed
912 913 914 915
	if(!parser.Content().empty()){
		//http poster后面的粘包
		_onRecv(std::make_shared<BufferString>(parser.Content()));
	}
916 917 918 919 920 921

    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");
xzl committed
922 923
}

924
void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
925
	//TraceP(this) <<endl;
926
	sendRtspResponse("200 OK");
xzl committed
927 928 929
}

inline void RtspSession::send_NotAcceptable() {
930
	sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
xzl committed
931
}
932

933

xiongziliang committed
934
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
935 936 937
    GET_CONFIG(bool,modify_stamp,Rtsp::kModifyStamp);
    if(modify_stamp){
        int64_t dts_out;
938
        _stamp[trackidx].revise(rtppt->timeStamp, rtppt->timeStamp, dts_out, dts_out, true);
939 940
        rtppt->timeStamp = dts_out;
    }
xiongziliang committed
941
	_pushSrc->onWrite(rtppt, false);
xiongziliang committed
942
}
943
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
944 945 946
	//这是rtcp心跳包,说明播放器还存活
	_ticker.resetTime();

947
	if(intervaled % 2 == 0){
xiongziliang committed
948
		if(_pushSrc){
949
		    //这是rtsp推流上来的rtp包
950
			handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size());
951 952
		}else if(!_udpSockConnected.count(intervaled)){
            //这是rtsp播放器的rtp打洞包
953 954
            _udpSockConnected.emplace(intervaled);
            _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
xzl committed
955
		}
956 957
	}else{
	    //rtcp包
958 959
        if(!_udpSockConnected.count(intervaled)){
            _udpSockConnected.emplace(intervaled);
960 961
            _apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr);
        }
962
        onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size());
963
    }
xzl committed
964 965 966
}


967
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
xzl committed
968
	weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
969 970
    auto srcIP = inet_addr(get_peer_ip().data());
	auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
971 972 973 974
		auto strongSelf=weakSelf.lock();
		if(!strongSelf) {
			return false;
		}
975 976 977 978 979 980 981

        if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
            WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
                                    << inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
            return true;
        }

982
		struct sockaddr addr=*pPeerAddr;
983
		strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
984 985 986 987
			auto strongSelf=weakSelf.lock();
			if(!strongSelf) {
				return;
			}
988
			strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
989 990 991 992 993
		});
		return true;
	};

	switch (_rtpType){
xiongziliang committed
994
		case Rtsp::RTP_MULTICAST:{
995 996
			//组播使用的共享rtcp端口
			UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
997 998
					int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
				return onUdpData(pBuf,pPeerAddr,intervaled);
xzl committed
999
			});
1000 1001
		}
			break;
xiongziliang committed
1002
		case Rtsp::RTP_UDP:{
1003
			auto setEvent = [&](Socket::Ptr &sock,int intervaled){
1004
				if(!sock){
1005
					WarnP(this) << "udp端口为空:" << intervaled;
1006 1007
					return;
				}
1008
				sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
1009
					onUdpData(pBuf,pPeerAddr,intervaled);
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
				});
			};
			setEvent(_apRtpSock[trackIdx], 2*trackIdx );
			setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
		}
			break;

		default:
			break;
	}

xzl committed
1021 1022
}

1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
static string dateStr(){
	char buf[64];
	time_t tt = time(NULL);
	strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
	return buf;
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const StrCaseMap &header_const,
								   const string &sdp,
								   const char *protocol){
	auto header = header_const;
	header.emplace("CSeq",StrPrinter << _iCseq);
	if(!_strSession.empty()){
		header.emplace("Session",_strSession);
	}

	header.emplace("Server",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
	header.emplace("Date",dateStr());

	if(!sdp.empty()){
		header.emplace("Content-Length",StrPrinter << sdp.size());
		header.emplace("Content-Type","application/sdp");
	}

	_StrPrinter printer;
	printer << protocol << " " << res_code << "\r\n";
	for (auto &pr : header){
		printer << pr.first << ": " << pr.second << "\r\n";
	}

	printer << "\r\n";

	if(!sdp.empty()){
		printer << sdp;
	}
1059
//	DebugP(this) << printer;
1060 1061 1062 1063
	return send(std::make_shared<BufferString>(printer)) > 0 ;
}

int RtspSession::send(const Buffer::Ptr &pkt){
xiongziliang committed
1064
//	if(!_enableSendRtp){
1065
//		DebugP(this) << pkt->data();
xiongziliang committed
1066
//	}
1067
	_ui64TotalBytes += pkt->size();
1068
	return TcpSession::send(pkt);
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
}

bool RtspSession::sendRtspResponse(const string &res_code,
								   const std::initializer_list<string> &header,
								   const string &sdp,
								   const char *protocol) {
	string key;
	StrCaseMap header_map;
	int i = 0;
	for(auto &val : header){
		if(++i % 2 == 0){
			header_map.emplace(key,val);
		}else{
			key = val;
		}
	}
	return sendRtspResponse(res_code,header_map,sdp,protocol);
}

inline string RtspSession::printSSRC(uint32_t ui32Ssrc) {
	char tmp[9] = { 0 };
	ui32Ssrc = htonl(ui32Ssrc);
	uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
	for (int i = 0; i < 4; i++) {
		sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
	}
	return tmp;
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (type == _aTrackInfo[i]->_type) {
			return i;
		}
	}
xiongziliang committed
1103 1104 1105
    if(_aTrackInfo.size() == 1){
        return 0;
    }
1106 1107 1108 1109 1110 1111 1112 1113
	return -1;
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
			return i;
		}
	}
xiongziliang committed
1114 1115 1116
	if(_aTrackInfo.size() == 1){
        return 0;
	}
1117 1118 1119
	return -1;
}

xiongziliang committed
1120 1121 1122 1123 1124 1125
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
	for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
		if (_aTrackInfo[i]->_interleaved == interleaved) {
			return i;
		}
	}
xiongziliang committed
1126 1127 1128
    if(_aTrackInfo.size() == 1){
        return 0;
    }
xiongziliang committed
1129 1130 1131
	return -1;
}

xiongziliang committed
1132
bool RtspSession::close(MediaSource &sender,bool force) {
1133
    //此回调在其他线程触发
1134
    if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){
1135 1136
        return false;
    }
xiongziliang committed
1137 1138
	string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
	safeShutdown(SockException(Err_shutdown,err));
1139 1140
	return true;
}
1141

1142

1143 1144
void RtspSession::onNoneReader(MediaSource &sender){
    //此回调在其他线程触发
1145
    if(!_pushSrc || _pushSrc->totalReaderCount()){
1146 1147 1148 1149 1150
        return;
    }
    MediaSourceEvent::onNoneReader(sender);
}

1151 1152 1153
int RtspSession::totalReaderCount(MediaSource &sender) {
    return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
}
1154

xiongziliang committed
1155
void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
1156
    //InfoP(this) <<(int)pkt.Interleaved;
1157 1158
    switch (_rtpType) {
        case Rtsp::RTP_TCP: {
1159
            send(pkt);
1160 1161 1162 1163 1164 1165
        }
            break;
        case Rtsp::RTP_UDP: {
            int iTrackIndex = getTrackIndexByTrackType(pkt->type);
            auto &pSock = _apRtpSock[iTrackIndex];
            if (!pSock) {
xiongziliang committed
1166
                shutdown(SockException(Err_shutdown,"udp sock not opened yet"));
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
                return;
            }
            BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
            _ui64TotalBytes += buffer->size();
            pSock->send(buffer);
        }
            break;
        default:
            break;
    }

1178
#if RTSP_SERVER_SEND_RTCP
1179
    int iTrackIndex = getTrackIndexByTrackType(pkt->type);
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193
    if(iTrackIndex == -1){
        return;
    }
    RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
    counter.pktCnt += 1;
    counter.octCount += (pkt->length - pkt->offset);
    auto &ticker = _aRtcpTicker[iTrackIndex];
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        ticker.resetTime();
        //直接保存网络字节序
        memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
        sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
    }
1194
#endif
1195 1196
}

xiongziliang committed
1197
void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
xiongziliang committed
1198 1199
    static const char s_cname[] = "ZLMediaKitRtsp";
    uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
1200 1201 1202 1203 1204 1205
    uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
    auto &track = _aTrackInfo[iTrackIndex];
    auto &counter = _aRtcpCnt[iTrackIndex];

    aui8Rtcp[0] = '$';
    aui8Rtcp[1] = track->_interleaved + 1;
xiongziliang committed
1206 1207
    aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8;
    aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246

    pui8Rtcp_SR[0] = 0x80;
    pui8Rtcp_SR[1] = 0xC8;
    pui8Rtcp_SR[2] = 0x00;
    pui8Rtcp_SR[3] = 0x06;

    uint32_t ssrc=htonl(track->_ssrc);
    memcpy(&pui8Rtcp_SR[4], &ssrc, 4);

    uint64_t msw;
    uint64_t lsw;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
    lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);

    msw = htonl(msw);
    memcpy(&pui8Rtcp_SR[8], &msw, 4);

    lsw = htonl(lsw);
    memcpy(&pui8Rtcp_SR[12], &lsw, 4);
    //直接使用网络字节序
    memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4);

    uint32_t pktCnt = htonl(counter.pktCnt);
    memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);

    uint32_t octCount = htonl(counter.octCount);
    memcpy(&pui8Rtcp_SR[24], &octCount, 4);

    pui8Rtcp_SDES[0] = 0x81;
    pui8Rtcp_SDES[1] = 0xCA;
    pui8Rtcp_SDES[2] = 0x00;
    pui8Rtcp_SDES[3] = 0x06;

    memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

    pui8Rtcp_SDES[8] = 0x01;
    pui8Rtcp_SDES[9] = 0x0f;
xiongziliang committed
1247 1248
    memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
    pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
1249 1250 1251 1252 1253 1254

    if(overTcp){
        send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
    }else {
        _apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
    }
xzl committed
1255 1256
}

1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
void RtspSession::setSocketFlags(){
    GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
    if(!ultraLowDelay) {
        //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
        SockUtil::setNoDelay(_sock->rawFD(), false);
        //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
        (*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
    }
}

xzl committed
1267
}
xiongziliang committed
1268
/* namespace mediakit */
xzl committed
1269