Commit 0c5cd624 by xiongziliang

优化rtsp over http,删除冗余代码

parent e52c67b3
...@@ -44,13 +44,35 @@ namespace mediakit { ...@@ -44,13 +44,35 @@ namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
unordered_map<string, weak_ptr<RtspSession> > RtspSession::g_mapGetter; /**
unordered_map<void *, std::shared_ptr<RtspSession> > RtspSession::g_mapPostter; * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护 * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护 * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输
* 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输
* 4: rtp over http,下面着重讲解:rtp over http
*
* rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的,
* 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求,
* 第二次是http post请求。
*
* 这两次请求通过http请求头中的x-sessioncookie键完成绑定
*
* 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求
* 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送
* 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包,
* 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接
* 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据
*
* zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理
*/
//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) : RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) : TcpSession(pTh, pSock) {
TcpSession(pTh, pSock), _pSender(pSock) {
//设置10秒发送缓存 //设置10秒发送缓存
pSock->setSendBufSecond(10); pSock->setSendBufSecond(10);
//设置15秒发送超时时间 //设置15秒发送超时时间
...@@ -60,55 +82,20 @@ RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::P ...@@ -60,55 +82,20 @@ RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::P
} }
RtspSession::~RtspSession() { RtspSession::~RtspSession() {
if (_onDestory) {
_onDestory();
}
DebugL << get_peer_ip(); DebugL << get_peer_ip();
} }
void RtspSession::shutdown(){
shutdown_l(true);
}
void RtspSession::shutdown_l(bool close){
if (_sock) {
_sock->emitErr(SockException(Err_other, "self shutdown"),close);
}
if (_bBase64need && !_sock) {
//quickTime http postter,and self is detached from tcpServer
lock_guard<recursive_mutex> lock(g_mtxPostter);
g_mapPostter.erase(this);
}
if (_pBrdcaster) {
_pBrdcaster->setDetachCB(this, nullptr);
_pBrdcaster.reset();
}
if (_pRtpReader) {
_pRtpReader.reset();
}
}
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what(); TraceL << err.getErrCode() << " " << err.what();
if (_rtpType == PlayerBase::RTP_MULTICAST) { if (_rtpType == PlayerBase::RTP_MULTICAST) {
//取消UDP端口监听 //取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
} }
if (!_bBase64need && _strSessionCookie.size() != 0) {
//quickTime http getter
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter.erase(_strSessionCookie);
}
if (_bBase64need && err.getErrCode() == Err_eof) { if (_http_x_sessioncookie.size() != 0) {
//quickTime http postter,正在发送rtp; QuickTime只是断开了请求连接,请继续发送rtp //移除http getter的弱引用记录
_sock = nullptr; lock_guard<recursive_mutex> lock(g_mtxGetter);
lock_guard<recursive_mutex> lock(g_mtxPostter); g_mapGetter.erase(_http_x_sessioncookie);
//为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用
try {
g_mapPostter.emplace(this, dynamic_pointer_cast<RtspSession>(shared_from_this()));
}catch (std::exception &ex){
}
TraceL << "quickTime will not send request any more!";
} }
//流量统计事件广播 //流量统计事件广播
...@@ -120,6 +107,7 @@ void RtspSession::onError(const SockException& err) { ...@@ -120,6 +107,7 @@ void RtspSession::onError(const SockException& err) {
_ticker.createdTime()/1000, _ticker.createdTime()/1000,
*this); *this);
} }
} }
void RtspSession::onManager() { void RtspSession::onManager() {
...@@ -132,7 +120,7 @@ void RtspSession::onManager() { ...@@ -132,7 +120,7 @@ void RtspSession::onManager() {
} }
//组播不检查心跳是否超时 //组播不检查心跳是否超时
if (_rtpType != PlayerBase::RTP_MULTICAST && _ticker.elapsedTime() > 15 * 1000) { if (_rtpType == PlayerBase::RTP_UDP && _ticker.elapsedTime() > 15 * 1000) {
WarnL << "RTSP会话超时:" << get_peer_ip(); WarnL << "RTSP会话超时:" << get_peer_ip();
shutdown(); shutdown();
return; return;
...@@ -189,10 +177,9 @@ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) { ...@@ -189,10 +177,9 @@ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
void RtspSession::onRecv(const Buffer::Ptr &pBuf) { void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
_ticker.resetTime(); _ticker.resetTime();
_ui64TotalBytes += pBuf->size(); _ui64TotalBytes += pBuf->size();
if (_bBase64need) { if (_onRecv) {
//quicktime 加密后的rtsp请求,需要解密 //http poster的请求数据转发给http getter处理
auto str = decodeBase64(string(pBuf->data(),pBuf->size())); _onRecv(pBuf);
inputRtspOrRtcp(str.data(),str.size());
} else { } else {
inputRtspOrRtcp(pBuf->data(),pBuf->size()); inputRtspOrRtcp(pBuf->data(),pBuf->size());
} }
...@@ -250,6 +237,7 @@ int RtspSession::handleReq_ANNOUNCE() { ...@@ -250,6 +237,7 @@ int RtspSession::handleReq_ANNOUNCE() {
_strUrl = _parser.Url(); _strUrl = _parser.Url();
_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid); _pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->onGetSDP(_strSdp); _pushSrc->onGetSDP(_strSdp);
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
}; };
...@@ -282,7 +270,8 @@ int RtspSession::handleReq_RECORD(){ ...@@ -282,7 +270,8 @@ int RtspSession::handleReq_RECORD(){
rtp_info.pop_back(); rtp_info.pop_back();
sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
SockUtil::setNoDelay(_pSender->rawFD(),false); SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags);
}; };
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
...@@ -586,7 +575,7 @@ int RtspSession::handleReq_Setup() { ...@@ -586,7 +575,7 @@ int RtspSession::handleReq_Setup() {
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
<< "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";" << "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play", << "ssrc=" << printSSRC(trackRef->_ssrc),
"x-Transport-Options" , "late-tolerance=1.400000", "x-Transport-Options" , "late-tolerance=1.400000",
"x-Dynamic-Rate" , "1" "x-Dynamic-Rate" , "1"
}); });
...@@ -627,7 +616,7 @@ int RtspSession::handleReq_Setup() { ...@@ -627,7 +616,7 @@ int RtspSession::handleReq_Setup() {
{"Transport",StrPrinter << "RTP/AVP/UDP;unicast;" {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;"
<< "client_port=" << strClientPort << ";" << "client_port=" << strClientPort << ";"
<< "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";" << "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play" << "ssrc=" << printSSRC(trackRef->_ssrc)
}); });
} }
break; break;
...@@ -743,7 +732,8 @@ int RtspSession::handleReq_Play() { ...@@ -743,7 +732,8 @@ int RtspSession::handleReq_Play() {
//提高发送性能 //提高发送性能
(*this) << SocketFlags(kSockFlags); (*this) << SocketFlags(kSockFlags);
SockUtil::setNoDelay(_pSender->rawFD(),false); SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags);
if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) { if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
...@@ -822,7 +812,7 @@ int RtspSession::handleReq_Teardown() { ...@@ -822,7 +812,7 @@ int RtspSession::handleReq_Teardown() {
} }
int RtspSession::handleReq_Get() { int RtspSession::handleReq_Get() {
_strSessionCookie = _parser["x-sessioncookie"]; _http_x_sessioncookie = _parser["x-sessioncookie"];
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Connection","Close", {"Connection","Close",
"Cache-Control","no-store", "Cache-Control","no-store",
...@@ -830,10 +820,9 @@ int RtspSession::handleReq_Get() { ...@@ -830,10 +820,9 @@ int RtspSession::handleReq_Get() {
"Content-Type","application/x-rtsp-tunnelled", "Content-Type","application/x-rtsp-tunnelled",
},"","HTTP/1.0"); },"","HTTP/1.0");
//注册GET //注册http getter,以便http poster绑定
lock_guard<recursive_mutex> lock(g_mtxGetter); lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter[_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this()); g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
//InfoL << _strSessionCookie;
return 0; return 0;
} }
...@@ -841,24 +830,21 @@ int RtspSession::handleReq_Get() { ...@@ -841,24 +830,21 @@ int RtspSession::handleReq_Get() {
int RtspSession::handleReq_Post() { int RtspSession::handleReq_Post() {
lock_guard<recursive_mutex> lock(g_mtxGetter); lock_guard<recursive_mutex> lock(g_mtxGetter);
string sessioncookie = _parser["x-sessioncookie"]; string sessioncookie = _parser["x-sessioncookie"];
//Poster 找到 Getter //Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie); auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) { if (it == g_mapGetter.end()) {
//WarnL << sessioncookie; WarnL << "Http Poster未找到Http Getter";
return -1; return -1;
} }
_bBase64need = true;
//Poster 找到Getter的SOCK //Poster 找到Getter的SOCK
auto strongSession = it->second.lock(); auto httpGetterWeak = it->second;
//移除http getter的弱引用记录
g_mapGetter.erase(sessioncookie); g_mapGetter.erase(sessioncookie);
if (!strongSession) {
send_SessionNotFound();
//WarnL;
return -1;
}
initSender(strongSession);
auto nextPacketSize = remainDataSize(); auto nextPacketSize = remainDataSize();
if(nextPacketSize > 0){ if(nextPacketSize > 0){
//防止http poster中的content部分粘包(后续content都是base64编码的rtsp请求包)
_onContent = [this](const char *data,uint64_t len){ _onContent = [this](const char *data,uint64_t len){
BufferRaw::Ptr buffer = std::make_shared<BufferRaw>(); BufferRaw::Ptr buffer = std::make_shared<BufferRaw>();
buffer->assign(data,len); buffer->assign(data,len);
...@@ -871,6 +857,26 @@ int RtspSession::handleReq_Post() { ...@@ -871,6 +857,26 @@ int RtspSession::handleReq_Post() {
},false); },false);
}; };
} }
//http poster收到请求后转发给http getter处理
_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){
WarnL << "Http Getter已经释放";
shutdown();
return;
}
//切换到http getter的线程
httpGetterStrong->async([pBuf,httpGetterWeak](){
auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){
return;
}
httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
});
};
return nextPacketSize; return nextPacketSize;
} }
...@@ -1106,26 +1112,6 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { ...@@ -1106,26 +1112,6 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) {
} }
inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) {
_pSender = session->_sock;
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
session->_onDestory = [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
//DebugL;
strongSelf->_pSender->setOnErr([weakSelf](const SockException &err) {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->safeShutdown();
});
};
session->shutdown_l(false);
}
static string dateStr(){ static string dateStr(){
char buf[64]; char buf[64];
time_t tt = time(NULL); time_t tt = time(NULL);
...@@ -1168,7 +1154,7 @@ bool RtspSession::sendRtspResponse(const string &res_code, ...@@ -1168,7 +1154,7 @@ bool RtspSession::sendRtspResponse(const string &res_code,
int RtspSession::send(const Buffer::Ptr &pkt){ int RtspSession::send(const Buffer::Ptr &pkt){
_ui64TotalBytes += pkt->size(); _ui64TotalBytes += pkt->size();
return _pSender->send(pkt,_flags); return TcpSession::send(pkt);
} }
bool RtspSession::sendRtspResponse(const string &res_code, bool RtspSession::sendRtspResponse(const string &res_code,
...@@ -1214,6 +1200,11 @@ inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix ...@@ -1214,6 +1200,11 @@ inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix
return -1; return -1;
} }
bool RtspSession::close() {
InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
safeShutdown();
return true;
}
#ifdef RTSP_SEND_RTCP #ifdef RTSP_SEND_RTCP
inline void RtspSession::sendRTCP() { inline void RtspSession::sendRTCP() {
......
...@@ -66,7 +66,7 @@ private: ...@@ -66,7 +66,7 @@ private:
uint32_t _offset; uint32_t _offset;
}; };
class RtspSession: public TcpSession, public HttpRequestSplitter, public RtpReceiver{ class RtspSession: public TcpSession, public HttpRequestSplitter, public RtpReceiver , public MediaSourceEvent{
public: public:
typedef std::shared_ptr<RtspSession> Ptr; typedef std::shared_ptr<RtspSession> Ptr;
typedef std::function<void(const string &realm)> onGetRealm; typedef std::function<void(const string &realm)> onGetRealm;
...@@ -85,10 +85,10 @@ protected: ...@@ -85,10 +85,10 @@ protected:
void onRecvContent(const char *data,uint64_t len) override; void onRecvContent(const char *data,uint64_t len) override;
//RtpReceiver override //RtpReceiver override
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
//MediaSourceEvent override
bool close() override ;
private: private:
void inputRtspOrRtcp(const char *data,uint64_t len); void inputRtspOrRtcp(const char *data,uint64_t len);
void shutdown() override ;
void shutdown_l(bool close);
int handleReq_Options(); //处理options方法 int handleReq_Options(); //处理options方法
int handleReq_Describe(); //处理describe方法 int handleReq_Describe(); //处理describe方法
int handleReq_ANNOUNCE(); //处理options方法 int handleReq_ANNOUNCE(); //处理options方法
...@@ -129,7 +129,6 @@ private: ...@@ -129,7 +129,6 @@ private:
bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0");
bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0");
int send(const Buffer::Ptr &pkt) override; int send(const Buffer::Ptr &pkt) override;
inline void initSender(const std::shared_ptr<RtspSession> &pSession); //处理rtsp over http,quicktime使用的
private: private:
Ticker _ticker; Ticker _ticker;
Parser _parser; //rtsp解析类 Parser _parser; //rtsp解析类
...@@ -160,17 +159,11 @@ private: ...@@ -160,17 +159,11 @@ private:
uint64_t _ui64TotalBytes = 0; uint64_t _ui64TotalBytes = 0;
//RTSP over HTTP //RTSP over HTTP
function<void(void)> _onDestory;
bool _bBase64need = false; //是否需要base64解码
Socket::Ptr _pSender; //回复rtsp时走的tcp通道,供quicktime用
//quicktime 请求rtsp会产生两次tcp连接, //quicktime 请求rtsp会产生两次tcp连接,
//一次发送 get 一次发送post,需要通过sessioncookie关联起来 //一次发送 get 一次发送post,需要通过x-sessioncookie关联起来
string _strSessionCookie; string _http_x_sessioncookie;
static recursive_mutex g_mtxGetter; //对quicktime上锁保护
static recursive_mutex g_mtxPostter; //对quicktime上锁保护
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
static unordered_map<void *, std::shared_ptr<RtspSession> > g_mapPostter;
function<void(const char *data,uint64_t len)> _onContent; function<void(const char *data,uint64_t len)> _onContent;
function<void(const Buffer::Ptr &pBuf)> _onRecv;
std::function<void()> _delayTask; std::function<void()> _delayTask;
uint32_t _iTaskTimeLine = 0; uint32_t _iTaskTimeLine = 0;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论