Commit 28b8e8e0 by 771730766@qq.com

大幅优化RTSP服务器性能

parent 25551ef3
...@@ -29,6 +29,8 @@ ...@@ -29,6 +29,8 @@
#include "RtpBroadCaster.h" #include "RtpBroadCaster.h"
#include "Util/util.h" #include "Util/util.h"
#include "Network/sockutil.h" #include "Network/sockutil.h"
#include "RtspSession.h"
using namespace std; using namespace std;
namespace ZL { namespace ZL {
...@@ -112,7 +114,8 @@ RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strApp,con ...@@ -112,7 +114,8 @@ RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strApp,con
int i = (pkt->interleaved/2)%2; int i = (pkt->interleaved/2)%2;
auto &pSock = m_apUdpSock[i]; auto &pSock = m_apUdpSock[i];
auto &peerAddr = m_aPeerUdpAddr[i]; auto &peerAddr = m_aPeerUdpAddr[i];
pSock->sendTo((char *) pkt->payload + 4, pkt->length - 4,(struct sockaddr *)(&peerAddr)); BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
pSock->send(buffer,SOCKET_DEFAULE_FLAGS,(struct sockaddr *)(&peerAddr));
}); });
m_pReader->setDetachCB([this](){ m_pReader->setDetachCB([this](){
unordered_map<void * , onDetach > m_mapDetach_copy; unordered_map<void * , onDetach > m_mapDetach_copy;
......
...@@ -346,7 +346,7 @@ void RtspPlayer::HandleResSETUP(const Parser& parser, unsigned int uiTrackIndex) ...@@ -346,7 +346,7 @@ void RtspPlayer::HandleResSETUP(const Parser& parser, unsigned int uiTrackIndex)
rtpto.sin_port = ntohs(port); rtpto.sin_port = ntohs(port);
rtpto.sin_family = AF_INET; rtpto.sin_family = AF_INET;
rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().c_str()); rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().c_str());
pUdpSockRef->sendTo("\xce\xfa\xed\xfe", 4, (struct sockaddr *) &rtpto); pUdpSockRef->send("\xce\xfa\xed\xfe", 4,SOCKET_DEFAULE_FLAGS, (struct sockaddr *) &rtpto);
} }
} }
......
...@@ -664,7 +664,7 @@ bool RtspSession::handleReq_Play() { ...@@ -664,7 +664,7 @@ bool RtspSession::handleReq_Play() {
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->sendRtpPacket(*pack); strongSelf->sendRtpPacket(pack);
}); });
}); });
...@@ -848,11 +848,13 @@ inline bool RtspSession::findStream() { ...@@ -848,11 +848,13 @@ inline bool RtspSession::findStream() {
return true; return true;
} }
inline void RtspSession::sendRtpPacket(const RtpPacket& pkt) {
inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
//InfoL<<(int)pkt.Interleaved; //InfoL<<(int)pkt.Interleaved;
switch (m_rtpType) { switch (m_rtpType) {
case PlayerBase::RTP_TCP: { case PlayerBase::RTP_TCP: {
send((char *) pkt.payload, pkt.length); BufferRtp::Ptr buffer(new BufferRtp(pkt));
send(buffer);
#ifdef RTSP_SEND_RTCP #ifdef RTSP_SEND_RTCP
int iTrackIndex = getTrackIndexByTrackId(pkt.interleaved / 2); int iTrackIndex = getTrackIndexByTrackId(pkt.interleaved / 2);
RtcpCounter &counter = m_aRtcpCnt[iTrackIndex]; RtcpCounter &counter = m_aRtcpCnt[iTrackIndex];
...@@ -869,7 +871,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket& pkt) { ...@@ -869,7 +871,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket& pkt) {
} }
break; break;
case PlayerBase::RTP_UDP: { case PlayerBase::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackId(pkt.interleaved / 2); int iTrackIndex = getTrackIndexByTrackId(pkt->interleaved / 2);
auto pSock = m_apUdpSock[iTrackIndex].lock(); auto pSock = m_apUdpSock[iTrackIndex].lock();
if (!pSock) { if (!pSock) {
shutdown(); shutdown();
...@@ -879,7 +881,8 @@ inline void RtspSession::sendRtpPacket(const RtpPacket& pkt) { ...@@ -879,7 +881,8 @@ inline void RtspSession::sendRtpPacket(const RtpPacket& pkt) {
if (!peerAddr) { if (!peerAddr) {
return; return;
} }
pSock->sendTo((char *) pkt.payload + 4, pkt.length - 4, peerAddr.get()); BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
pSock->send(buffer,SOCKET_DEFAULE_FLAGS, peerAddr.get());
} }
break; break;
default: default:
......
...@@ -49,6 +49,24 @@ namespace ZL { ...@@ -49,6 +49,24 @@ namespace ZL {
namespace Rtsp { namespace Rtsp {
class RtspSession; class RtspSession;
class BufferRtp : public Socket::Buffer{
public:
typedef std::shared_ptr<BufferRtp> Ptr;
BufferRtp(const RtpPacket::Ptr & pkt,uint32_t offset = 0 ):_rtp(pkt),_offset(offset){}
virtual ~BufferRtp(){}
char *data() override {
return (char *)_rtp->payload + _offset;
}
uint32_t size() const override {
return _rtp->length - _offset;
}
private:
RtpPacket::Ptr _rtp;
uint32_t _offset;
};
class RtspSession: public TcpLimitedSession<MAX_TCP_SESSION> { class RtspSession: public TcpLimitedSession<MAX_TCP_SESSION> {
public: public:
typedef std::shared_ptr<RtspSession> Ptr; typedef std::shared_ptr<RtspSession> Ptr;
...@@ -73,6 +91,9 @@ private: ...@@ -73,6 +91,9 @@ private:
int send(const char *pcBuf, int iSize) override { int send(const char *pcBuf, int iSize) override {
return m_pSender->send(pcBuf, iSize); return m_pSender->send(pcBuf, iSize);
} }
int send(const Socket::Buffer::Ptr &pkt){
return m_pSender->send(pkt);
}
void shutdown() override; void shutdown() override;
bool handleReq_Options(); //处理options方法 bool handleReq_Options(); //处理options方法
bool handleReq_Describe(); //处理describe方法 bool handleReq_Describe(); //处理describe方法
...@@ -92,7 +113,7 @@ private: ...@@ -92,7 +113,7 @@ private:
inline bool findStream(); //根据rtsp url查找 MediaSource实例 inline bool findStream(); //根据rtsp url查找 MediaSource实例
inline void initSender(const std::shared_ptr<RtspSession> &pSession); //处理rtsp over http,quicktime使用的 inline void initSender(const std::shared_ptr<RtspSession> &pSession); //处理rtsp over http,quicktime使用的
inline void sendRtpPacket(const RtpPacket &pkt); inline void sendRtpPacket(const RtpPacket::Ptr &pkt);
inline string printSSRC(uint32_t ui32Ssrc) { inline string printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 }; char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc); ui32Ssrc = htonl(ui32Ssrc);
......
...@@ -162,7 +162,7 @@ int main(int argc,char *argv[]){ ...@@ -162,7 +162,7 @@ int main(int argc,char *argv[]){
//输入用户名和密码登录(user:test,pwd:123456),输入help命令查看帮助 //输入用户名和密码登录(user:test,pwd:123456),输入help命令查看帮助
TcpServer<ShellSession>::Ptr shellSrv(new TcpServer<ShellSession>()); TcpServer<ShellSession>::Ptr shellSrv(new TcpServer<ShellSession>());
ShellSession::addUser("test", "123456"); ShellSession::addUser("test", "123456");
shellSrv->start(8023); shellSrv->start(mINI::Instance()[Config::Shell::kPort]);
//开启rtsp/rtmp/http服务器 //开启rtsp/rtmp/http服务器
TcpServer<RtspSession>::Ptr rtspSrv(new TcpServer<RtspSession>()); TcpServer<RtspSession>::Ptr rtspSrv(new TcpServer<RtspSession>());
TcpServer<RtmpSession>::Ptr rtmpSrv(new TcpServer<RtmpSession>()); TcpServer<RtmpSession>::Ptr rtmpSrv(new TcpServer<RtmpSession>());
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论