Commit cc7844a9 by xiongziliang

适配ZLToolKit代码,简化程序退出流程代码

parent 315cd180
Subproject commit d3a1bd9d8a3162d0237954034d85f1313008cab2 Subproject commit 98d824bdc6604b4e761657922c913ea46bd90223
...@@ -26,18 +26,14 @@ ...@@ -26,18 +26,14 @@
#include "UDPServer.h" #include "UDPServer.h"
#include "Util/TimeTicker.h" #include "Util/TimeTicker.h"
#include "Util/onceToken.h"
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
UDPServer &UDPServer::Instance() { INSTANCE_IMP(UDPServer);
static UDPServer *instance(new UDPServer());
return *instance;
}
void UDPServer::Destory() {
delete &UDPServer::Instance();
}
UDPServer::UDPServer() { UDPServer::UDPServer() {
} }
......
...@@ -27,8 +27,9 @@ ...@@ -27,8 +27,9 @@
#ifndef RTSP_UDPSERVER_H_ #ifndef RTSP_UDPSERVER_H_
#define RTSP_UDPSERVER_H_ #define RTSP_UDPSERVER_H_
#include <mutex>
#include <stdint.h> #include <stdint.h>
#include <mutex>
#include <memory>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include "Util/util.h" #include "Util/util.h"
...@@ -40,18 +41,23 @@ using namespace toolkit; ...@@ -40,18 +41,23 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
class UDPServer { class UDPServer : public std::enable_shared_from_this<UDPServer> {
public: public:
typedef function< bool(int, const Buffer::Ptr &, struct sockaddr *)> onRecvData; typedef function< bool(int, const Buffer::Ptr &, struct sockaddr *)> onRecvData;
UDPServer(); ~UDPServer();
virtual ~UDPServer();
static UDPServer &Instance(); static UDPServer &Instance();
static void Destory();
/**
* 废弃的接口,无实际操作
* @deprecated
*/
static void Destory(){};
Socket::Ptr getSock(const char *strLocalIp, int iTrackIndex,uint16_t iLocalPort = 0); Socket::Ptr getSock(const char *strLocalIp, int iTrackIndex,uint16_t iLocalPort = 0);
void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb);
void stopListenPeer(const char *strPeerIp, void *pSelf); void stopListenPeer(const char *strPeerIp, void *pSelf);
private: private:
UDPServer();
void onRcvData(int iTrackId, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); void onRcvData(int iTrackId, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr);
void onErr(const string &strKey,const SockException &err); void onErr(const string &strKey,const SockException &err);
unordered_map<string, Socket::Ptr> _mapUpdSock; unordered_map<string, Socket::Ptr> _mapUpdSock;
......
...@@ -142,7 +142,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -142,7 +142,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
return false; return false;
} }
WarnL << "不支持的rtp类型:" << nal.type << " " << rtppack->sequence; WarnL << "不支持的rtp类型:" << (int)nal.type << " " << rtppack->sequence;
return false; return false;
// 29 FU-B 单NAL单元B模式 // 29 FU-B 单NAL单元B模式
// 24 STAP-A 单一时间的组合包 // 24 STAP-A 单一时间的组合包
......
...@@ -41,9 +41,10 @@ using namespace mediakit; ...@@ -41,9 +41,10 @@ using namespace mediakit;
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
//设置退出信号处理函数 //设置退出信号处理函数
signal(SIGINT, [](int) { EventPoller::Instance().shutdown(); }); signal(SIGINT, [](int) { EventPollerPool::Instance().shutdown(); });
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace)); Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
if (argc != 5) { if (argc != 5) {
...@@ -55,42 +56,33 @@ int main(int argc, char *argv[]) { ...@@ -55,42 +56,33 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
{ list<MediaPlayer::Ptr> playerList;
list<MediaPlayer::Ptr> playerList; auto playerCnt = atoi(argv[1]);//启动的播放器个数
auto playerCnt = atoi(argv[1]);//启动的播放器个数 atomic_int alivePlayerCnt(0);
atomic_int alivePlayerCnt(0); //每隔若干毫秒启动一个播放器(如果一次性全部启动,服务器和客户端可能都承受不了)
//每隔若干毫秒启动一个播放器(如果一次性全部启动,服务器和客户端可能都承受不了) AsyncTaskThread::Instance().DoTaskDelay(0, atoi(argv[2]), [&]() {
AsyncTaskThread::Instance().DoTaskDelay(0, atoi(argv[2]), [&]() { MediaPlayer::Ptr player(new MediaPlayer());
MediaPlayer::Ptr player(new MediaPlayer()); player->setOnPlayResult([&](const SockException &ex) {
player->setOnPlayResult([&](const SockException &ex) { if (!ex) {
if (!ex) { ++alivePlayerCnt;
++alivePlayerCnt; }
}
});
player->setOnShutdown([&](const SockException &ex) {
--alivePlayerCnt;
});
(*player)[RtspPlayer::kRtpType] = atoi(argv[4]);
player->play(argv[3]);
playerList.push_back(player);
return playerCnt--;
}); });
player->setOnShutdown([&](const SockException &ex) {
AsyncTaskThread::Instance().DoTaskDelay(0, 1000, [&]() { --alivePlayerCnt;
InfoL << "存活播放器个数:" << alivePlayerCnt.load();
return true;
}); });
EventPoller::Instance().runLoop(); (*player)[RtspPlayer::kRtpType] = atoi(argv[4]);
AsyncTaskThread::Instance().CancelTask(0); player->play(argv[3]);
} playerList.push_back(player);
return playerCnt--;
});
static onceToken token(nullptr, []() { AsyncTaskThread::Instance().DoTaskDelay(0, 1000, [&]() {
WorkThreadPool::Instance(); InfoL << "存活播放器个数:" << alivePlayerCnt.load();
UDPServer::Destory(); return true;
EventPoller::Destory();
AsyncTaskThread::Destory();
Logger::Destory();
}); });
EventPollerPool::Instance().wait();
AsyncTaskThread::Instance().CancelTask(0);
return 0; return 0;
} }
...@@ -102,10 +102,11 @@ static onceToken s_token([](){ ...@@ -102,10 +102,11 @@ static onceToken s_token([](){
int main(int argc,char *argv[]){ int main(int argc,char *argv[]){
//设置退出信号处理函数 //设置退出信号处理函数
signal(SIGINT, [](int){EventPoller::Instance().shutdown();}); signal(SIGINT, [](int){EventPollerPool::Instance().shutdown();});
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace)); Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
//加载配置文件,如果配置文件不存在就创建一个 //加载配置文件,如果配置文件不存在就创建一个
loadIniConfig(); loadIniConfig();
...@@ -132,14 +133,7 @@ int main(int argc,char *argv[]){ ...@@ -132,14 +133,7 @@ int main(int argc,char *argv[]){
InfoL << "你可以在浏览器输入:http://127.0.0.1/api/my_api?key0=val0&key1=参数1" << endl; InfoL << "你可以在浏览器输入:http://127.0.0.1/api/my_api?key0=val0&key1=参数1" << endl;
EventPoller::Instance().runLoop(); EventPollerPool::Instance().wait();
static onceToken s_token(nullptr,[]() {
//TcpServer用到了WorkThreadPool
WorkThreadPool::Destory();
EventPoller::Destory();
Logger::Destory();
});
return 0; return 0;
} }
...@@ -43,7 +43,8 @@ int main(int argc, char *argv[]) { ...@@ -43,7 +43,8 @@ int main(int argc, char *argv[]) {
//设置退出信号处理函数 //设置退出信号处理函数
signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); }); signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); });
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace)); Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
if (argc != 3) { if (argc != 3) {
ErrorL << "\r\n测试方法:./test_player rtxp_url rtp_type\r\n" ErrorL << "\r\n测试方法:./test_player rtxp_url rtp_type\r\n"
...@@ -54,51 +55,46 @@ int main(int argc, char *argv[]) { ...@@ -54,51 +55,46 @@ int main(int argc, char *argv[]) {
} }
{ MediaPlayer::Ptr player(new MediaPlayer());
MediaPlayer::Ptr player(new MediaPlayer()); weak_ptr<MediaPlayer> weakPlayer = player;
weak_ptr<MediaPlayer> weakPlayer = player; player->setOnPlayResult([weakPlayer](const SockException &ex) {
player->setOnPlayResult([weakPlayer](const SockException &ex) { InfoL << "OnPlayResult:" << ex.what();
InfoL << "OnPlayResult:" << ex.what(); auto strongPlayer = weakPlayer.lock();
auto strongPlayer = weakPlayer.lock(); if (ex || !strongPlayer) {
if (ex || !strongPlayer) { return;
return; }
}
auto viedoTrack = strongPlayer->getTrack(TrackVideo); auto viedoTrack = strongPlayer->getTrack(TrackVideo);
if (!viedoTrack || viedoTrack->getCodecId() != CodecH264) { if (!viedoTrack || viedoTrack->getCodecId() != CodecH264) {
WarnL << "没有视频或者视频不是264编码!"; WarnL << "没有视频或者视频不是264编码!";
return; return;
} }
SDLDisplayerHelper::Instance().doTask([viedoTrack]() { SDLDisplayerHelper::Instance().doTask([viedoTrack]() {
std::shared_ptr<H264Decoder> decoder(new H264Decoder); std::shared_ptr<H264Decoder> decoder(new H264Decoder);
std::shared_ptr<YuvDisplayer> displayer(new YuvDisplayer); std::shared_ptr<YuvDisplayer> displayer(new YuvDisplayer);
viedoTrack->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([decoder, displayer](const Frame::Ptr &frame) { viedoTrack->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([decoder, displayer](const Frame::Ptr &frame) {
SDLDisplayerHelper::Instance().doTask([decoder, displayer, frame]() { SDLDisplayerHelper::Instance().doTask([decoder, displayer, frame]() {
AVFrame *pFrame = nullptr; AVFrame *pFrame = nullptr;
bool flag = decoder->inputVideo((unsigned char *) frame->data(), frame->size(), bool flag = decoder->inputVideo((unsigned char *) frame->data(), frame->size(),
frame->stamp(), &pFrame); frame->stamp(), &pFrame);
if (flag) { if (flag) {
displayer->displayYUV(pFrame); displayer->displayYUV(pFrame);
} }
return true; return true;
}); });
})); }));
return true; return true;
});
}); });
});
player->setOnShutdown([](const SockException &ex) { player->setOnShutdown([](const SockException &ex) {
ErrorL << "OnShutdown:" << ex.what(); ErrorL << "OnShutdown:" << ex.what();
}); });
(*player)[RtspPlayer::kRtpType] = atoi(argv[2]); (*player)[RtspPlayer::kRtpType] = atoi(argv[2]);
player->play(argv[1]); player->play(argv[1]);
SDLDisplayerHelper::Instance().runLoop();
} SDLDisplayerHelper::Instance().runLoop();
UDPServer::Destory();
EventPoller::Destory();
AsyncTaskThread::Destory();
Logger::Destory();
return 0; return 0;
} }
...@@ -83,37 +83,26 @@ void rePushDelay(const string &app, const string &stream, const string &url) { ...@@ -83,37 +83,26 @@ void rePushDelay(const string &app, const string &stream, const string &url) {
//这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了
int domain(const string &playUrl, const string &pushUrl) { int domain(const string &playUrl, const string &pushUrl) {
//设置退出信号处理函数 //设置退出信号处理函数
signal(SIGINT, [](int) { EventPoller::Instance().shutdown(); }); signal(SIGINT, [](int) { EventPollerPool::Instance().shutdown(); });
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace)); Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
{ //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream"
//拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码)
//你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream"));
PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream")); player->play(playUrl.data());
player->play(playUrl.data());
//监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发
//监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发 NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged,
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, [pushUrl](BroadcastMediaChangedArgs) {
[pushUrl](BroadcastMediaChangedArgs) { //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源
//媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 if(bRegist && schema == RTMP_SCHEMA){
if(bRegist && schema == RTMP_SCHEMA){ createPusher(app, stream, pushUrl);
createPusher(app, stream, pushUrl); }
} });
}); //事件轮询
EventPollerPool::Instance().wait();
//事件轮询
EventPoller::Instance().runLoop();
pusher.reset();
}
//删除事件监听
NoticeCenter::Instance().delListener(nullptr);
//清理程序
EventPoller::Destory();
AsyncTaskThread::Destory();
Logger::Destory();
return 0; return 0;
} }
......
...@@ -92,9 +92,9 @@ void rePushDelay(const string &app,const string &stream,const string &url){ ...@@ -92,9 +92,9 @@ void rePushDelay(const string &app,const string &stream,const string &url){
//这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了
int domain(const string & filePath,const string & pushUrl){ int domain(const string & filePath,const string & pushUrl){
//设置退出信号处理函数 //设置退出信号处理函数
signal(SIGINT, [](int){EventPoller::Instance().shutdown();}); signal(SIGINT, [](int){EventPollerPool::Instance().shutdown();});
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("stdout", LTrace)); Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
//录像应用名称默认为record //录像应用名称默认为record
...@@ -104,16 +104,7 @@ int domain(const string & filePath,const string & pushUrl){ ...@@ -104,16 +104,7 @@ int domain(const string & filePath,const string & pushUrl){
createPusher(appName,filePath,pushUrl); createPusher(appName,filePath,pushUrl);
//开始事件轮询 //开始事件轮询
EventPoller::Instance().runLoop(); EventPollerPool::Instance().wait();
//删除事件监听
NoticeCenter::Instance().delListener(nullptr);
//销毁推流器
pusher.reset();
//程序清理
EventPoller::Destory();
AsyncTaskThread::Destory();
Logger::Destory();
return 0; return 0;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论