Commit b78c14a2 by xiongziliang

通过减少线程切换提高服务器性能

parent 6238f5aa
Subproject commit 15d6ebbf2e09ccde3dc0e467df84401dc57b475d Subproject commit 843a323834e9b6c164dcd620b196e3742c39f016
...@@ -232,7 +232,7 @@ inline bool HttpSession::checkLiveFlvStream(){ ...@@ -232,7 +232,7 @@ inline bool HttpSession::checkLiveFlvStream(){
(*this) << SocketFlags(kSockFlags); (*this) << SocketFlags(kSockFlags);
try{ try{
start(mediaSrc); start(getPoller(),mediaSrc);
}catch (std::exception &ex){ }catch (std::exception &ex){
//该rtmp源不存在 //该rtmp源不存在
shutdown(); shutdown();
...@@ -683,36 +683,21 @@ inline void HttpSession::sendNotFound(bool bClose) { ...@@ -683,36 +683,21 @@ inline void HttpSession::sendNotFound(bool bClose) {
void HttpSession::onWrite(const Buffer::Ptr &buffer) { void HttpSession::onWrite(const Buffer::Ptr &buffer) {
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); _ticker.resetTime();
async([weakSelf,buffer](){ _ui64TotalBytes += buffer->size();
auto strongSelf = weakSelf.lock(); send(buffer);
if(!strongSelf) {
return;
}
strongSelf->_ticker.resetTime();
strongSelf->_ui64TotalBytes += buffer->size();
strongSelf->send(buffer);
});
} }
void HttpSession::onWrite(const char *data, int len) { void HttpSession::onWrite(const char *data, int len) {
BufferRaw::Ptr buffer(new BufferRaw); BufferRaw::Ptr buffer(new BufferRaw);
buffer->assign(data,len); buffer->assign(data,len);
_ticker.resetTime();
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); _ui64TotalBytes += buffer->size();
async([weakSelf,buffer](){ send(buffer);
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->_ticker.resetTime();
strongSelf->_ui64TotalBytes += buffer->size();
strongSelf->send(buffer);
});
} }
void HttpSession::onDetach() { void HttpSession::onDetach() {
safeShutdown(); shutdown();
} }
std::shared_ptr<FlvMuxer> HttpSession::getSharedPtr(){ std::shared_ptr<FlvMuxer> HttpSession::getSharedPtr(){
......
...@@ -203,7 +203,7 @@ inline void RtmpPusher::send_metaData(){ ...@@ -203,7 +203,7 @@ inline void RtmpPusher::send_metaData(){
sendRtmp(pkt->typeId, _ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId ); sendRtmp(pkt->typeId, _ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId );
}); });
_pRtmpReader = src->getRing()->attach(); _pRtmpReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this()); weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
_pRtmpReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ _pRtmpReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
......
...@@ -336,7 +336,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -336,7 +336,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
onSendMedia(pkt); onSendMedia(pkt);
}); });
_pRingReader = src->getRing()->attach(); _pRingReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
SockUtil::setNoDelay(_sock->rawFD(), false); SockUtil::setNoDelay(_sock->rawFD(), false);
_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { _pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
...@@ -344,20 +344,14 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -344,20 +344,14 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
strongSelf->async([weakSelf, pkt]() { strongSelf->onSendMedia(pkt);
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onSendMedia(pkt);
});
}); });
_pRingReader->setDetachCB([weakSelf]() { _pRingReader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
strongSelf->safeShutdown(); strongSelf->shutdown();
}); });
_pPlayerSrc = src; _pPlayerSrc = src;
if (src->getRing()->readerCount() == 1) { if (src->getRing()->readerCount() == 1) {
...@@ -446,13 +440,7 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) { ...@@ -446,13 +440,7 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) {
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->async([weakSelf,pkt]() { strongSelf->onSendMedia(pkt);
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onSendMedia(pkt);
});
}); });
} }
} }
......
...@@ -38,7 +38,7 @@ FlvMuxer::FlvMuxer() { ...@@ -38,7 +38,7 @@ FlvMuxer::FlvMuxer() {
FlvMuxer::~FlvMuxer() { FlvMuxer::~FlvMuxer() {
} }
void FlvMuxer::start(const RtmpMediaSource::Ptr &media) { void FlvMuxer::start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media) {
if(!media){ if(!media){
throw std::runtime_error("RtmpMediaSource 无效"); throw std::runtime_error("RtmpMediaSource 无效");
} }
...@@ -46,7 +46,7 @@ void FlvMuxer::start(const RtmpMediaSource::Ptr &media) { ...@@ -46,7 +46,7 @@ void FlvMuxer::start(const RtmpMediaSource::Ptr &media) {
onWriteFlvHeader(media); onWriteFlvHeader(media);
std::weak_ptr<FlvMuxer> weakSelf = getSharedPtr(); std::weak_ptr<FlvMuxer> weakSelf = getSharedPtr();
_ring_reader = media->getRing()->attach(); _ring_reader = media->getRing()->attach(poller);
_ring_reader->setDetachCB([weakSelf](){ _ring_reader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
...@@ -189,11 +189,11 @@ void FlvMuxer::stop() { ...@@ -189,11 +189,11 @@ void FlvMuxer::stop() {
} }
///////////////////////////////////////////////////////FlvRecorder///////////////////////////////////////////////////// ///////////////////////////////////////////////////////FlvRecorder/////////////////////////////////////////////////////
void FlvRecorder::startRecord(const string &vhost, const string &app, const string &stream,const string &file_path) { void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const string &vhost, const string &app, const string &stream,const string &file_path) {
startRecord(dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,vhost,app,stream,false)),file_path); startRecord(poller,dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,vhost,app,stream,false)),file_path);
} }
void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &file_path) { void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media, const string &file_path) {
stop(); stop();
lock_guard<recursive_mutex> lck(_file_mtx); lock_guard<recursive_mutex> lck(_file_mtx);
//开辟文件写缓存 //开辟文件写缓存
...@@ -215,7 +215,7 @@ void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &f ...@@ -215,7 +215,7 @@ void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &f
//设置文件写缓存 //设置文件写缓存
setvbuf( _file.get(), fileBuf.get(),_IOFBF, FILE_BUF_SIZE); setvbuf( _file.get(), fileBuf.get(),_IOFBF, FILE_BUF_SIZE);
start(media); start(poller,media);
} }
void FlvRecorder::onWrite(const Buffer::Ptr &data) { void FlvRecorder::onWrite(const Buffer::Ptr &data) {
......
...@@ -41,7 +41,7 @@ public: ...@@ -41,7 +41,7 @@ public:
virtual ~FlvMuxer(); virtual ~FlvMuxer();
void stop(); void stop();
protected: protected:
void start(const RtmpMediaSource::Ptr &media); void start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media);
virtual void onWrite(const Buffer::Ptr &data) = 0; virtual void onWrite(const Buffer::Ptr &data) = 0;
virtual void onWrite(const char *data,int len) = 0; virtual void onWrite(const char *data,int len) = 0;
virtual void onDetach() = 0; virtual void onDetach() = 0;
...@@ -55,6 +55,7 @@ private: ...@@ -55,6 +55,7 @@ private:
RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; RtmpMediaSource::RingType::RingReader::Ptr _ring_reader;
uint32_t _aui32FirstStamp[2] = {0}; uint32_t _aui32FirstStamp[2] = {0};
uint32_t _previousTagSize = 0; uint32_t _previousTagSize = 0;
}; };
class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this<FlvRecorder>{ class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this<FlvRecorder>{
...@@ -62,8 +63,8 @@ public: ...@@ -62,8 +63,8 @@ public:
typedef std::shared_ptr<FlvRecorder> Ptr; typedef std::shared_ptr<FlvRecorder> Ptr;
FlvRecorder(); FlvRecorder();
virtual ~FlvRecorder(); virtual ~FlvRecorder();
void startRecord(const string &vhost,const string &app,const string &stream,const string &file_path); void startRecord(const EventPoller::Ptr &poller,const string &vhost,const string &app,const string &stream,const string &file_path);
void startRecord(const RtmpMediaSource::Ptr &media,const string &file_path); void startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media,const string &file_path);
private: private:
virtual void onWrite(const Buffer::Ptr &data) override ; virtual void onWrite(const Buffer::Ptr &data) override ;
virtual void onWrite(const char *data,int len) override; virtual void onWrite(const char *data,int len) override;
......
...@@ -124,7 +124,7 @@ RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strVhost,c ...@@ -124,7 +124,7 @@ RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strVhost,c
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
_apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr); _apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr);
} }
_pReader = src->getRing()->attach(); _pReader = src->getRing()->attach(nullptr);
_pReader->setReadCB([this](const RtpPacket::Ptr &pkt){ _pReader->setReadCB([this](const RtpPacket::Ptr &pkt){
int i = (int)(pkt->type); int i = (int)(pkt->type);
auto &pSock = _apUdpSock[i]; auto &pSock = _apUdpSock[i];
......
...@@ -74,7 +74,7 @@ static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; ...@@ -74,7 +74,7 @@ static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
//设置15秒发送超时时间 //设置15秒发送超时时间
pSock->setSendTimeOutSecond(15); pSock->setSendTimeOutSecond(45);
DebugL << get_peer_ip(); DebugL << get_peer_ip();
} }
...@@ -628,7 +628,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { ...@@ -628,7 +628,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->safeShutdown(); strongSelf->shutdown();
}); });
} }
int iSrvPort = _pBrdcaster->getPort(trackRef->_type); int iSrvPort = _pBrdcaster->getPort(trackRef->_type);
...@@ -731,28 +731,22 @@ bool RtspSession::handleReq_Play(const Parser &parser) { ...@@ -731,28 +731,22 @@ bool RtspSession::handleReq_Play(const Parser &parser) {
if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) { if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pRtpReader = pMediaSrc->getRing()->attach(useBuf); _pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf);
_pRtpReader->setDetachCB([weakSelf]() { _pRtpReader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->safeShutdown(); strongSelf->shutdown();
}); });
_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { _pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->async([weakSelf,pack](){ if(strongSelf->_enableSendRtp) {
auto strongSelf = weakSelf.lock(); strongSelf->sendRtpPacket(pack);
if(!strongSelf) { }
return;
}
if(strongSelf->_enableSendRtp) {
strongSelf->sendRtpPacket(pack);
}
});
}); });
} }
}; };
......
...@@ -192,7 +192,7 @@ static onceToken s_token([](){ ...@@ -192,7 +192,7 @@ static onceToken s_token([](){
auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv"; auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv";
FlvRecorder::Ptr recorder(new FlvRecorder); FlvRecorder::Ptr recorder(new FlvRecorder);
try{ try{
recorder->startRecord(dynamic_pointer_cast<RtmpMediaSource>(sender.shared_from_this()),path); recorder->startRecord(nullptr,dynamic_pointer_cast<RtmpMediaSource>(sender.shared_from_this()),path);
s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder; s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder;
}catch(std::exception &ex){ }catch(std::exception &ex){
WarnL << ex.what(); WarnL << ex.what();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论