Commit b169f94c by xiongziliang

大幅提高rtsp服务器性能

parent 1352e159
ZLToolKit @ 681be205
Subproject commit 58a74f8c5ab802a0dd9fdcdcc0fe4c5a3d841964 Subproject commit 681be205ef164db08effd83f925bb750eb1fe149
...@@ -109,13 +109,18 @@ RtpMultiCaster::RtpMultiCaster(const EventPoller::Ptr &poller,const string &strL ...@@ -109,13 +109,18 @@ RtpMultiCaster::RtpMultiCaster(const EventPoller::Ptr &poller,const string &strL
_apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr); _apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr);
} }
_pReader = src->getRing()->attach(poller); _pReader = src->getRing()->attach(poller);
_pReader->setReadCB([this](const RtpPacket::Ptr &pkt){ _pReader->setReadCB([this](const RtspMediaSource::RingDataType &pkt){
int i = (int)(pkt->type); int i = 0;
auto &pSock = _apUdpSock[i]; int size = pkt->size();
auto &peerAddr = _aPeerUdpAddr[i]; pkt->for_each([&](const RtpPacket::Ptr &rtp) {
BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); int i = (int) (rtp->type);
pSock->send(buffer); auto &pSock = _apUdpSock[i];
auto &peerAddr = _aPeerUdpAddr[i];
BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
pSock->send(buffer, nullptr, 0, ++i == size);
});
}); });
_pReader->setDetachCB([this](){ _pReader->setDetachCB([this](){
unordered_map<void * , onDetach > _mapDetach_copy; unordered_map<void * , onDetach > _mapDetach_copy;
{ {
......
...@@ -27,10 +27,95 @@ ...@@ -27,10 +27,95 @@
#include "Thread/ThreadPool.h" #include "Thread/ThreadPool.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
#define RTP_GOP_SIZE 512
namespace mediakit {
#define RTP_GOP_SIZE 2048 class RtpVideoCache {
public:
namespace mediakit { RtpVideoCache() {
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
virtual ~RtpVideoCache() = default;
void inputVideoRtp(const RtpPacket::Ptr &rtp, bool key_pos) {
if (_last_rtp_stamp != rtp->timeStamp) {
//时间戳发生变化了
flushAll();
} else if (_cache->size() > RTP_GOP_SIZE) {
//这个逻辑用于避免时间戳异常的流导致的内存暴增问题
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_rtp_stamp = rtp->timeStamp;
if (key_pos) {
_key_pos = key_pos;
}
}
virtual void onFlushVideoRtp(std::shared_ptr<List<RtpPacket::Ptr> > &, bool key_pos) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushVideoRtp(_cache, _key_pos);
_cache = std::make_shared<List<RtpPacket::Ptr> >();
_key_pos = false;
}
private:
std::shared_ptr<List<RtpPacket::Ptr> > _cache;
uint32_t _last_rtp_stamp = 0;
bool _key_pos = false;
};
class RtpAudioCache {
public:
RtpAudioCache() {
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
virtual ~RtpAudioCache() = default;
void inputAudioRtp(const RtpPacket::Ptr &rtp) {
if (rtp->timeStamp > _last_rtp_stamp + 100) {
//累积了100ms的音频数据
flushAll();
} else if (_cache->size() > 10) {
//或者audio rtp缓存超过10个
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_rtp_stamp = rtp->timeStamp;
}
virtual void onFlushAudioRtp(std::shared_ptr<List<RtpPacket::Ptr> > &) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushAudioRtp(_cache);
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
private:
std::shared_ptr<List<RtpPacket::Ptr> > _cache;
uint32_t _last_rtp_stamp = 0;
};
/** /**
* rtsp媒体源的数据抽象 * rtsp媒体源的数据抽象
...@@ -38,11 +123,12 @@ namespace mediakit { ...@@ -38,11 +123,12 @@ namespace mediakit {
* 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了 * 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了
* rtsp推拉流协议中,先传递sdp,然后再协商传输方式(tcp/udp/组播),最后一直传递rtp * rtsp推拉流协议中,先传递sdp,然后再协商传输方式(tcp/udp/组播),最后一直传递rtp
*/ */
class RtspMediaSource : public MediaSource, public RingDelegate<RtpPacket::Ptr> { class RtspMediaSource : public MediaSource, public RingDelegate<RtpPacket::Ptr>, public RtpVideoCache, public RtpAudioCache {
public: public:
typedef ResourcePool<RtpPacket> PoolType; typedef ResourcePool<RtpPacket> PoolType;
typedef std::shared_ptr<RtspMediaSource> Ptr; typedef std::shared_ptr<RtspMediaSource> Ptr;
typedef RingBuffer<RtpPacket::Ptr> RingType; typedef std::shared_ptr<List<RtpPacket::Ptr> > RingDataType;
typedef RingBuffer<RingDataType> RingType;
/** /**
* 构造函数 * 构造函数
...@@ -173,10 +259,34 @@ public: ...@@ -173,10 +259,34 @@ public:
regist(); regist();
} }
} }
//不存在视频,为了减少缓存延时,那么关闭GOP缓存
_ring->write(rtp, _have_video ? keyPos : true); if(rtp->type == TrackVideo){
RtpVideoCache::inputVideoRtp(rtp, keyPos);
}else{
RtpAudioCache::inputAudioRtp(rtp);
}
} }
private: private:
/**
* 批量flush时间戳相同的视频rtp包时触发该函数
* @param rtp_list 时间戳相同的rtp包列表
* @param key_pos 是否包含关键帧
*/
void onFlushVideoRtp(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override {
_ring->write(rtp_list, key_pos);
}
/**
* 批量flush一定数量的音频rtp包时触发该函数
* @param rtp_list rtp包列表
*/
void onFlushAudioRtp(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list) override{
//只有音频的话,就不存在gop缓存的意义
_ring->write(rtp_list, !_have_video);
}
/** /**
* 每次增减消费者都会触发该函数 * 每次增减消费者都会触发该函数
*/ */
......
...@@ -301,23 +301,36 @@ void RtspPusher::sendOptions() { ...@@ -301,23 +301,36 @@ void RtspPusher::sendOptions() {
sendRtspRequest("OPTIONS",_strContentBase); sendRtspRequest("OPTIONS",_strContentBase);
} }
inline void RtspPusher::sendRtpPacket(const RtpPacket::Ptr & pkt) { inline void RtspPusher::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
//InfoL<<(int)pkt.Interleaved; //InfoL<<(int)pkt.Interleaved;
switch (_eType) { switch (_eType) {
case Rtsp::RTP_TCP: { case Rtsp::RTP_TCP: {
BufferRtp::Ptr buffer(new BufferRtp(pkt)); int i = 0;
send(buffer); int size = pkt->size();
setSendFlushFlag(false);
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
if (++i == size) {
setSendFlushFlag(true);
}
BufferRtp::Ptr buffer(new BufferRtp(rtp));
send(buffer);
});
} }
break; break;
case Rtsp::RTP_UDP: { case Rtsp::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackType(pkt->type); int i = 0;
auto &pSock = _apUdpSock[iTrackIndex]; int size = pkt->size();
if (!pSock) { pkt->for_each([&](const RtpPacket::Ptr &rtp) {
shutdown(SockException(Err_shutdown,"udp sock not opened yet")); int iTrackIndex = getTrackIndexByTrackType(rtp->type);
return; auto &pSock = _apUdpSock[iTrackIndex];
} if (!pSock) {
BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); shutdown(SockException(Err_shutdown,"udp sock not opened yet"));
pSock->send(buffer); return;
}
BufferRtp::Ptr buffer(new BufferRtp(rtp,4));
pSock->send(buffer, nullptr, 0, ++i == size);
});
} }
break; break;
default: default:
...@@ -337,7 +350,6 @@ inline int RtspPusher::getTrackIndexByTrackType(TrackType type) { ...@@ -337,7 +350,6 @@ inline int RtspPusher::getTrackIndexByTrackType(TrackType type) {
return -1; return -1;
} }
void RtspPusher::sendRecord() { void RtspPusher::sendRecord() {
_onHandshake = [this](const Parser& parser){ _onHandshake = [this](const Parser& parser){
auto src = _pMediaSrc.lock(); auto src = _pMediaSrc.lock();
...@@ -347,7 +359,7 @@ void RtspPusher::sendRecord() { ...@@ -347,7 +359,7 @@ void RtspPusher::sendRecord() {
_pRtspReader = src->getRing()->attach(getPoller()); _pRtspReader = src->getRing()->attach(getPoller());
weak_ptr<RtspPusher> weakSelf = dynamic_pointer_cast<RtspPusher>(shared_from_this()); weak_ptr<RtspPusher> weakSelf = dynamic_pointer_cast<RtspPusher>(shared_from_this());
_pRtspReader->setReadCB([weakSelf](const RtpPacket::Ptr &pkt){ _pRtspReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pkt){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
......
...@@ -67,7 +67,7 @@ private: ...@@ -67,7 +67,7 @@ private:
inline int getTrackIndexByTrackType(TrackType type); inline int getTrackIndexByTrackType(TrackType type);
void sendRtpPacket(const RtpPacket::Ptr & pkt) ; void sendRtpPacket(const RtspMediaSource::RingDataType & pkt) ;
void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" ); void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" );
void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = ""); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = "");
......
...@@ -796,7 +796,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { ...@@ -796,7 +796,7 @@ void RtspSession::handleReq_Play(const Parser &parser) {
} }
strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached")); strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached"));
}); });
_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { _pRtpReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
...@@ -1123,23 +1123,36 @@ int RtspSession::totalReaderCount(MediaSource &sender) { ...@@ -1123,23 +1123,36 @@ int RtspSession::totalReaderCount(MediaSource &sender) {
return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount(); return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
} }
void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
//InfoP(this) <<(int)pkt.Interleaved; //InfoP(this) <<(int)pkt.Interleaved;
switch (_rtpType) { switch (_rtpType) {
case Rtsp::RTP_TCP: { case Rtsp::RTP_TCP: {
send(pkt); int i = 0;
int size = pkt->size();
setSendFlushFlag(false);
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
if (++i == size) {
setSendFlushFlag(true);
}
send(rtp);
});
} }
break; break;
case Rtsp::RTP_UDP: { case Rtsp::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackType(pkt->type); int i = 0;
auto &pSock = _apRtpSock[iTrackIndex]; int size = pkt->size();
if (!pSock) { pkt->for_each([&](const RtpPacket::Ptr &rtp) {
shutdown(SockException(Err_shutdown,"udp sock not opened yet")); int iTrackIndex = getTrackIndexByTrackType(rtp->type);
return; auto &pSock = _apRtpSock[iTrackIndex];
} if (!pSock) {
BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); shutdown(SockException(Err_shutdown, "udp sock not opened yet"));
_ui64TotalBytes += buffer->size(); return;
pSock->send(buffer); }
BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
_ui64TotalBytes += buffer->size();
pSock->send(buffer, nullptr, 0, ++i == size);
});
} }
break; break;
default: default:
......
...@@ -162,7 +162,7 @@ private: ...@@ -162,7 +162,7 @@ private:
void onAuthDigest(const string &realm,const string &strMd5); void onAuthDigest(const string &realm,const string &strMd5);
//发送rtp给客户端 //发送rtp给客户端
void sendRtpPacket(const RtpPacket::Ptr &pkt); void sendRtpPacket(const RtspMediaSource::RingDataType &pkt);
//回复客户端 //回复客户端
bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0");
bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0");
...@@ -186,7 +186,7 @@ private: ...@@ -186,7 +186,7 @@ private:
//rtsp播放器绑定的直播源 //rtsp播放器绑定的直播源
std::weak_ptr<RtspMediaSource> _pMediaSrc; std::weak_ptr<RtspMediaSource> _pMediaSrc;
//直播源读取器 //直播源读取器
RingBuffer<RtpPacket::Ptr>::RingReader::Ptr _pRtpReader; RtspMediaSource::RingType::RingReader::Ptr _pRtpReader;
//推流或拉流客户端采用的rtp传输方式 //推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid; Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid;
//sdp里面有效的track,包含音频或视频 //sdp里面有效的track,包含音频或视频
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论