Commit 492d083f by xiongziliang

大幅优化性能

parent 237a7d71
Subproject commit e3d2a2eb5d7b816b28f308eac8c53d147db80533 Subproject commit 9bfa63bdca291123621e7805908195b999f15aec
...@@ -53,8 +53,6 @@ static TcpServer::Ptr s_pHttpSrv; ...@@ -53,8 +53,6 @@ static TcpServer::Ptr s_pHttpSrv;
API_EXPORT void API_CALL onAppStart(){ API_EXPORT void API_CALL onAppStart(){
static onceToken s_token([](){ static onceToken s_token([](){
Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace)); Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace));
EventPoller::Instance().runLoop(false);
cleaner::Instance().push_back([](){ cleaner::Instance().push_back([](){
s_pRtspSrv.reset(); s_pRtspSrv.reset();
s_pRtmpSrv.reset(); s_pRtmpSrv.reset();
......
...@@ -192,13 +192,16 @@ void PlayerProxy::initMedia() { ...@@ -192,13 +192,16 @@ void PlayerProxy::initMedia() {
bool PlayerProxy::shutDown() { bool PlayerProxy::shutDown() {
//通知其停止推流 //通知其停止推流
weak_ptr<PlayerProxy> weakSlef = dynamic_pointer_cast<PlayerProxy>(shared_from_this()); weak_ptr<PlayerProxy> weakSlef = dynamic_pointer_cast<PlayerProxy>(shared_from_this());
ASYNC_TRACE([weakSlef](){ auto executor = getExecutor();
if(executor) {
executor->async_first([weakSlef]() {
auto stronSelf = weakSlef.lock(); auto stronSelf = weakSlef.lock();
if(stronSelf){ if (stronSelf) {
stronSelf->m_pChn.reset(); stronSelf->m_pChn.reset();
stronSelf->teardown(); stronSelf->teardown();
} }
}); });
}
return true; return true;
} }
......
...@@ -56,6 +56,13 @@ void MediaPlayer::play(const char* strUrl) { ...@@ -56,6 +56,13 @@ void MediaPlayer::play(const char* strUrl) {
m_parser->play(strUrl); m_parser->play(strUrl);
} }
TaskExecutor::Ptr MediaPlayer::getExecutor(){
auto parser = dynamic_pointer_cast<SocketHelper>(m_parser);
if(!parser){
return nullptr;
}
return parser->getExecutor();
}
void MediaPlayer::pause(bool bPause) { void MediaPlayer::pause(bool bPause) {
if (m_parser) { if (m_parser) {
......
...@@ -33,10 +33,12 @@ ...@@ -33,10 +33,12 @@
#include "PlayerBase.h" #include "PlayerBase.h"
#include "Rtsp/RtspPlayer.h" #include "Rtsp/RtspPlayer.h"
#include "Rtmp/RtmpPlayer.h" #include "Rtmp/RtmpPlayer.h"
#include "Thread/TaskExecutor.h"
using namespace std; using namespace std;
using namespace ZL::Rtsp; using namespace ZL::Rtsp;
using namespace ZL::Rtmp; using namespace ZL::Rtmp;
using namespace ZL::Thread;
namespace ZL { namespace ZL {
namespace Player { namespace Player {
...@@ -50,6 +52,7 @@ public: ...@@ -50,6 +52,7 @@ public:
void play(const char* strUrl) override; void play(const char* strUrl) override;
void pause(bool bPause) override; void pause(bool bPause) override;
void teardown() override; void teardown() override;
TaskExecutor::Ptr getExecutor();
private: private:
string m_strPrefix; string m_strPrefix;
......
...@@ -46,18 +46,13 @@ const char PlayerBase::kRtspPwdIsMD5[] = "rtsp_pwd_md5"; ...@@ -46,18 +46,13 @@ const char PlayerBase::kRtspPwdIsMD5[] = "rtsp_pwd_md5";
PlayerBase::Ptr PlayerBase::createPlayer(const char* strUrl) { PlayerBase::Ptr PlayerBase::createPlayer(const char* strUrl) {
string prefix = FindField(strUrl, NULL, "://"); string prefix = FindField(strUrl, NULL, "://");
auto onDestory = [](PlayerBase *ptr){
ASYNC_TRACE([ptr](){
delete ptr;
});
};
if (strcasecmp("rtsp",prefix.data()) == 0) { if (strcasecmp("rtsp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtspPlayerImp(),onDestory); return PlayerBase::Ptr(new RtspPlayerImp());
} }
if (strcasecmp("rtmp",prefix.data()) == 0) { if (strcasecmp("rtmp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtmpPlayerImp(),onDestory); return PlayerBase::Ptr(new RtmpPlayerImp());
} }
return PlayerBase::Ptr(new RtspPlayerImp(),onDestory); return PlayerBase::Ptr(new RtspPlayerImp());
} }
} /* namespace Player */ } /* namespace Player */
......
...@@ -119,7 +119,7 @@ void RtmpPlayer::onConnect(const SockException &err){ ...@@ -119,7 +119,7 @@ void RtmpPlayer::onConnect(const SockException &err){
strongSelf->_onPlayResult(SockException(Err_timeout,"play rtmp timeout")); strongSelf->_onPlayResult(SockException(Err_timeout,"play rtmp timeout"));
strongSelf->teardown(); strongSelf->teardown();
return false; return false;
})); },getExecutor()));
startClientSession([weakSelf](){ startClientSession([weakSelf](){
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
...@@ -233,7 +233,7 @@ inline void RtmpPlayer::send_pause(bool bPause) { ...@@ -233,7 +233,7 @@ inline void RtmpPlayer::send_pause(bool bPause) {
uint32_t timeStamp = ::time(NULL); uint32_t timeStamp = ::time(NULL);
strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp); strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp);
return true; return true;
})); },getExecutor()));
} }
} }
......
...@@ -93,7 +93,7 @@ private: ...@@ -93,7 +93,7 @@ private:
return false; return false;
} }
return true; return true;
})); },getExecutor()));
} }
onPlayResult(ex); onPlayResult(ex);
} }
......
...@@ -121,7 +121,7 @@ void RtmpPusher::onConnect(const SockException &err){ ...@@ -121,7 +121,7 @@ void RtmpPusher::onConnect(const SockException &err){
strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"));
strongSelf->teardown(); strongSelf->teardown();
return false; return false;
})); },getExecutor()));
startClientSession([weakSelf](){ startClientSession([weakSelf](){
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
......
...@@ -174,7 +174,7 @@ void RtspPlayer::onConnect(const SockException &err){ ...@@ -174,7 +174,7 @@ void RtspPlayer::onConnect(const SockException &err){
strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout")); strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtsp timeout"));
strongSelf->teardown(); strongSelf->teardown();
return false; return false;
})); },getExecutor()));
} }
void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) { void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
...@@ -414,7 +414,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) ...@@ -414,7 +414,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
return false; return false;
} }
return strongSelf->sendOptions(); return strongSelf->sendOptions();
})); },getExecutor()));
pause(false); pause(false);
} }
...@@ -808,7 +808,7 @@ void RtspPlayer::onPlayResult_l(const SockException &ex) { ...@@ -808,7 +808,7 @@ void RtspPlayer::onPlayResult_l(const SockException &ex) {
return false; return false;
} }
return true; return true;
})); },getExecutor()));
} }
onPlayResult(ex); onPlayResult(ex);
} }
......
...@@ -72,8 +72,11 @@ RtspSession::~RtspSession() { ...@@ -72,8 +72,11 @@ RtspSession::~RtspSession() {
} }
void RtspSession::shutdown(){ void RtspSession::shutdown(){
shutdown_l(true);
}
void RtspSession::shutdown_l(bool close){
if (_sock) { if (_sock) {
_sock->emitErr(SockException(Err_other, "self shutdown")); _sock->emitErr(SockException(Err_other, "self shutdown"),close);
} }
if (m_bBase64need && !_sock) { if (m_bBase64need && !_sock) {
//quickTime http postter,and self is detached from tcpServer //quickTime http postter,and self is detached from tcpServer
...@@ -107,7 +110,10 @@ void RtspSession::onError(const SockException& err) { ...@@ -107,7 +110,10 @@ void RtspSession::onError(const SockException& err) {
_sock = nullptr; _sock = nullptr;
lock_guard<recursive_mutex> lock(g_mtxPostter); lock_guard<recursive_mutex> lock(g_mtxPostter);
//为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用 //为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用
try {
g_mapPostter.emplace(this, dynamic_pointer_cast<RtspSession>(shared_from_this())); g_mapPostter.emplace(this, dynamic_pointer_cast<RtspSession>(shared_from_this()));
}catch (std::exception &ex){
}
TraceL << "quickTime will not send request any more!"; TraceL << "quickTime will not send request any more!";
} }
...@@ -143,10 +149,18 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -143,10 +149,18 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
//quicktime 加密后的rtsp请求,需要解密 //quicktime 加密后的rtsp请求,需要解密
av_base64_decode((uint8_t *) m_pcBuf, pBuf->data(), sizeof(tmp)); av_base64_decode((uint8_t *) m_pcBuf, pBuf->data(), sizeof(tmp));
m_parser.Parse(m_pcBuf); //rtsp请求解析 m_parser.Parse(m_pcBuf); //rtsp请求解析
//TraceL << buf;
} else { } else {
//TraceL << request;
m_parser.Parse(pBuf->data()); //rtsp请求解析 m_parser.Parse(pBuf->data()); //rtsp请求解析
if(!m_parser.Content().empty()){
weak_ptr<TcpSession> weakSelf = shared_from_this();
BufferString::Ptr nextRecv(new BufferString(m_parser.Content()));
async([weakSelf,nextRecv](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
strongSelf->onRecv(nextRecv);
}
}, false);
}
} }
string strCmd = m_parser.Method(); //提取出请求命令字 string strCmd = m_parser.Method(); //提取出请求命令字
...@@ -811,6 +825,7 @@ bool RtspSession::handleReq_Get() { ...@@ -811,6 +825,7 @@ bool RtspSession::handleReq_Get() {
//注册GET //注册GET
lock_guard<recursive_mutex> lock(g_mtxGetter); lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter[m_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this()); g_mapGetter[m_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
//InfoL << m_strSessionCookie;
send(m_pcBuf, n); send(m_pcBuf, n);
return true; return true;
...@@ -822,6 +837,7 @@ bool RtspSession::handleReq_Post() { ...@@ -822,6 +837,7 @@ bool RtspSession::handleReq_Post() {
//Poster 找到 Getter //Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie); auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) { if (it == g_mapGetter.end()) {
//WarnL << sessioncookie;
return false; return false;
} }
m_bBase64need = true; m_bBase64need = true;
...@@ -830,6 +846,7 @@ bool RtspSession::handleReq_Post() { ...@@ -830,6 +846,7 @@ bool RtspSession::handleReq_Post() {
g_mapGetter.erase(sessioncookie); g_mapGetter.erase(sessioncookie);
if (!strongSession) { if (!strongSession) {
send_SessionNotFound(); send_SessionNotFound();
//WarnL;
return false; return false;
} }
initSender(strongSession); initSender(strongSession);
...@@ -988,6 +1005,7 @@ inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) ...@@ -988,6 +1005,7 @@ inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session)
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
//DebugL;
strongSelf->m_pSender->setOnErr([weakSelf](const SockException &err) { strongSelf->m_pSender->setOnErr([weakSelf](const SockException &err) {
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
...@@ -996,7 +1014,7 @@ inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) ...@@ -996,7 +1014,7 @@ inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session)
strongSelf->safeShutdown(); strongSelf->safeShutdown();
}); });
}; };
session->shutdown(); session->shutdown_l(false);
} }
#ifdef RTSP_SEND_RTCP #ifdef RTSP_SEND_RTCP
......
...@@ -97,7 +97,8 @@ private: ...@@ -97,7 +97,8 @@ private:
m_ui64TotalBytes += pkt->size(); m_ui64TotalBytes += pkt->size();
return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE); return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE);
} }
void shutdown() override; void shutdown() override ;
void shutdown_l(bool close);
bool handleReq_Options(); //处理options方法 bool handleReq_Options(); //处理options方法
bool handleReq_Describe(); //处理describe方法 bool handleReq_Describe(); //处理describe方法
bool handleReq_Setup(); //处理setup方法 bool handleReq_Setup(); //处理setup方法
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论