Commit dd2192fd by xiongziliang

rtp over udp改成独占式端口,提高性能

parent f411ddc2
......@@ -58,7 +58,7 @@ const char kBroadcastHttpRequest[] = "kBroadcastHttpRequest";
const char kBroadcastOnGetRtspRealm[] = "kBroadcastOnGetRtspRealm";
const char kBroadcastOnRtspAuth[] = "kBroadcastOnRtspAuth";
const char kBroadcastMediaPlayed[] = "kBroadcastMediaPlayed";
const char kBroadcastRtmpPublish[] = "kBroadcastRtmpPublish";
const char kBroadcastMediaPublish[] = "kBroadcastMediaPublish";
const char kBroadcastFlowReport[] = "kBroadcastFlowReport";
const char kBroadcastReloadConfig[] = "kBroadcastReloadConfig";
const char kBroadcastShellLogin[] = "kBroadcastShellLogin";
......
......@@ -92,9 +92,13 @@ extern const char kBroadcastOnRtspAuth[];
//如果errMessage为空则代表鉴权成功
typedef std::function<void(const string &errMessage)> AuthInvoker;
//收到rtmp推流事件广播,通过该事件控制推流鉴权
extern const char kBroadcastRtmpPublish[];
#define BroadcastRtmpPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender
//收到rtsp/rtmp推流事件广播,通过该事件控制推流鉴权
extern const char kBroadcastMediaPublish[];
#define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender
//兼容旧代码的宏
#define BroadcastRtmpPublishArgs BroadcastMediaPublishArgs
#define kBroadcastRtmpPublish kBroadcastMediaPublish
//播放rtsp/rtmp/http-flv事件广播,通过该事件控制播放鉴权
extern const char kBroadcastMediaPlayed[];
......
......@@ -190,7 +190,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
onRes(err);
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRtmpPublish,
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,
_mediaInfo,
invoker,
*this);
......
......@@ -89,10 +89,9 @@ void RtspSession::shutdown_l(bool close){
void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what();
if (_bListenPeerUdpData) {
if (_rtpType == PlayerBase::RTP_MULTICAST) {
//取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
_bListenPeerUdpData = false;
}
if (!_bBase64need && _strSessionCookie.size() != 0) {
//quickTime http getter
......@@ -131,7 +130,9 @@ void RtspSession::onManager() {
return;
}
}
if (_rtpType != PlayerBase::RTP_TCP && _ticker.elapsedTime() > 15 * 1000) {
//组播不检查心跳是否超时
if (_rtpType != PlayerBase::RTP_MULTICAST && _ticker.elapsedTime() > 15 * 1000) {
WarnL << "RTSP会话超时:" << get_peer_ip();
shutdown();
return;
......@@ -157,6 +158,7 @@ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options);
s_handler_map.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
s_handler_map.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
s_handler_map.emplace("RECORD",&RtspSession::handleReq_RECORD);
s_handler_map.emplace("SETUP",&RtspSession::handleReq_Setup);
s_handler_map.emplace("PLAY",&RtspSession::handleReq_Play);
s_handler_map.emplace("PAUSE",&RtspSession::handleReq_Pause);
......@@ -207,7 +209,7 @@ void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
int RtspSession::handleReq_Options() {
//支持这些命令
sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, SET_PARAMETER, GET_PARAMETER"});
sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
return 0;
}
......@@ -218,14 +220,71 @@ void RtspSession::onRecvContent(const char *data, uint64_t len) {
_onContent = nullptr;
}
}
int RtspSession::handleReq_ANNOUNCE() {
sendRtspResponse("200 OK");
_onContent = [this](const char *data, uint64_t len){
_strSdp.assign(data,len);
SdpAttr attr(_strSdp);
_aTrackInfo = attr.getAvailableTrack();
};
return atoi(_parser["Content-Length"].data());
}
int RtspSession::handleReq_RECORD(){
if (_aTrackInfo.empty() || _parser["Session"] != _strSession) {
send_SessionNotFound();
return -1;
}
auto onRes = [this](const string &err){
bool authSuccess = err.empty();
if(!authSuccess){
//第一次play是播放,否则是恢复播放。只对播放鉴权
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
shutdown();
return;
}
_StrPrinter rtp_info;
for(auto &track : _aTrackInfo){
if (track->_inited == false) {
//还有track没有setup
shutdown();
return;
}
rtp_info << "url=" << _strUrl << "/" << track->_control_surffix << ",";
}
rtp_info.pop_back();
sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
SockUtil::setNoDelay(_pSender->rawFD(),false);
};
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->async([weakSelf,onRes,err](){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
onRes(err);
});
};
//rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
if(!flag){
//该事件无人监听,默认不鉴权
onRes("");
}
return 0;
}
int RtspSession::handleReq_Describe() {
{
//解析url获取媒体名称
......@@ -510,21 +569,22 @@ int RtspSession::handleReq_Setup() {
break;
case PlayerBase::RTP_UDP: {
//我们用trackIdx区分rtp和rtcp包
auto pSockRtp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx);
if (!pSockRtp) {
auto pSockRtp = std::make_shared<Socket>(_sock->getPoller());
if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) {
//分配端口失败
WarnL << "分配rtp端口失败";
send_NotAcceptable();
return -1;
}
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1);
if (!pSockRtcp) {
auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller());
if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return -1;
}
_apUdpSock[trackIdx] = pSockRtp;
_apRtpSock[trackIdx] = pSockRtp;
_apRtcpSock[trackIdx] = pSockRtcp;
//设置客户端内网端口信息
string strClientPort = FindField(_parser["Transport"].data(), "client_port=", NULL);
uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
......@@ -533,9 +593,9 @@ int RtspSession::handleReq_Setup() {
peerAddr.sin_port = htons(ui16PeerPort);
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
_apPeerRtpPortAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
//尝试获取客户端nat映射地址
startListenPeerUdpData();
startListenPeerUdpData(trackIdx);
//InfoL << "分配端口:" << srv_port;
sendRtspResponse("200 OK",
......@@ -564,6 +624,7 @@ int RtspSession::handleReq_Setup() {
}
int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
//我们用trackIdx区分rtp和rtcp包
//由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
if (!pSockRtcp) {
//分配端口失败
......@@ -571,7 +632,7 @@ int RtspSession::handleReq_Setup() {
send_NotAcceptable();
return -1;
}
startListenPeerUdpData();
startListenPeerUdpData(trackIdx);
GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL);
sendRtspResponse("200 OK",
......@@ -914,12 +975,12 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
break;
case PlayerBase::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
auto pSock = _apUdpSock[iTrackIndex].lock();
auto &pSock = _apRtpSock[iTrackIndex];
if (!pSock) {
shutdown();
return;
}
auto peerAddr = _apPeerUdpAddr[iTrackIndex];
auto &peerAddr = _apPeerRtpPortAddr[iTrackIndex];
if (!peerAddr) {
return;
}
......@@ -934,7 +995,11 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
}
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
//这是rtcp心跳包,说明播放器还存活
_ticker.resetTime();
if(iTrackIdx % 2 == 0){
// DebugL << "rtp数据包:" << iTrackIdx / 2;
//这是rtp探测包
if(!_bGotAllPeerUdp){
//还没有获取完整的rtp探测包
......@@ -944,7 +1009,7 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf
return;
}
//设置真实的客户端nat映射端口号
_apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
_apPeerRtpPortAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
_abGotPeerUdp[iTrackIdx / 2] = true;
_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
......@@ -956,32 +1021,58 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf
}
}
}else{
//这是rtcp心跳包,说明播放器还存活
_ticker.resetTime();
//TraceL << "rtcp:" << (iTrackIdx-1)/2 ;
// TraceL << "rtcp数据包" << (iTrackIdx-1)/2 ;
}
}
inline void RtspSession::startListenPeerUdpData() {
_bListenPeerUdpData = true;
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
[weakSelf](int iTrackIdx,const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
struct sockaddr addr=*pPeerAddr;
strongSelf->async_first([weakSelf,pBuf,addr,iTrackIdx]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr);
});
return true;
auto onUdpData = [weakSelf](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int iTrackIdx){
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
struct sockaddr addr=*pPeerAddr;
strongSelf->async_first([weakSelf,pBuf,addr,iTrackIdx]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr);
});
return true;
};
switch (_rtpType){
case PlayerBase::RTP_MULTICAST:{
//组播使用的共享rtcp端口
UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
int iTrackIdx, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
return onUdpData(pBuf,pPeerAddr,iTrackIdx);
});
}
break;
case PlayerBase::RTP_UDP:{
auto setEvent = [&](Socket::Ptr &sock,int iTrackIdx){
if(!sock){
WarnL << "udp端口为空:" << iTrackIdx;
return;
}
sock->setOnRead([onUdpData,iTrackIdx](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){
onUdpData(pBuf,pPeerAddr,iTrackIdx);
});
};
setEvent(_apRtpSock[trackIdx], 2*trackIdx );
setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
}
break;
default:
break;
}
}
inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) {
......
......@@ -88,6 +88,7 @@ private:
int handleReq_Options(); //处理options方法
int handleReq_Describe(); //处理describe方法
int handleReq_ANNOUNCE(); //处理options方法
int handleReq_RECORD(); //处理options方法
int handleReq_Setup(); //处理setup方法
int handleReq_Play(); //处理play方法
int handleReq_Pause(); //处理pause方法
......@@ -108,7 +109,7 @@ private:
inline int getTrackIndexByControlSuffix(const string &controlSuffix);
inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr);
inline void startListenPeerUdpData();
inline void startListenPeerUdpData(int iTrackIdx);
//认证相关
static void onAuthSuccess(const weak_ptr<RtspSession> &weakSelf);
......@@ -142,9 +143,9 @@ private:
//RTP over udp
bool _bGotAllPeerUdp = false;
bool _abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数
weak_ptr<Socket> _apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标
std::shared_ptr<struct sockaddr> _apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标
bool _bListenPeerUdpData = false;
Socket::Ptr _apRtpSock[2]; //RTP端口,trackid idx 为数组下标
Socket::Ptr _apRtcpSock[2];//RTCP端口,trackid idx 为数组下标
std::shared_ptr<struct sockaddr> _apPeerRtpPortAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标
//RTP over udp_multicast
RtpBroadCaster::Ptr _pBrdcaster;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论