Commit 9247cb95 by xiongziliang

支持客户端自定义设置EventPoller对象,提高线程安全性

parent 6e2002e4
......@@ -32,13 +32,17 @@ using namespace toolkit;
namespace mediakit {
MediaPlayer::MediaPlayer() {
MediaPlayer::MediaPlayer(const EventPoller::Ptr &poller) {
_poller = poller;
if(!_poller){
_poller = EventPollerPool::Instance().getPoller();
}
}
MediaPlayer::~MediaPlayer() {
}
void MediaPlayer::play(const string &strUrl) {
_parser = PlayerBase::createPlayer(strUrl);
_parser = PlayerBase::createPlayer(_poller,strUrl);
_parser->setOnShutdown(_shutdownCB);
_parser->setOnPlayResult(_playResultCB);
_parser->setMediaSouce(_pMediaSrc);
......@@ -47,11 +51,7 @@ void MediaPlayer::play(const string &strUrl) {
}
EventPoller::Ptr MediaPlayer::getPoller(){
auto parser = dynamic_pointer_cast<SocketHelper>(_parser);
if(!parser){
return nullptr;
}
return parser->getPoller();
return _poller;
}
void MediaPlayer::pause(bool bPause) {
......
......@@ -41,13 +41,14 @@ class MediaPlayer : public PlayerImp<PlayerBase,PlayerBase> {
public:
typedef std::shared_ptr<MediaPlayer> Ptr;
MediaPlayer();
MediaPlayer(const EventPoller::Ptr &poller = nullptr);
virtual ~MediaPlayer();
void play(const string &strUrl) override;
void pause(bool bPause) override;
void teardown() override;
EventPoller::Ptr getPoller();
private:
EventPoller::Ptr _poller;
};
} /* namespace mediakit */
......
......@@ -33,7 +33,7 @@ using namespace toolkit;
namespace mediakit {
PlayerBase::Ptr PlayerBase::createPlayer(const string &strUrl) {
PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller,const string &strUrl) {
static auto releasePlayer = [](PlayerBase *ptr){
onceToken token(nullptr,[&](){
delete ptr;
......@@ -42,12 +42,12 @@ PlayerBase::Ptr PlayerBase::createPlayer(const string &strUrl) {
};
string prefix = FindField(strUrl.data(), NULL, "://");
if (strcasecmp("rtsp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtspPlayerImp(),releasePlayer);
return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer);
}
if (strcasecmp("rtmp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtmpPlayerImp(),releasePlayer);
return PlayerBase::Ptr(new RtmpPlayerImp(poller),releasePlayer);
}
return PlayerBase::Ptr(new RtspPlayerImp(),releasePlayer);
return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer);
}
PlayerBase::PlayerBase() {
......
......@@ -86,7 +86,7 @@ public:
class PlayerBase : public DemuxerBase, public mINI{
public:
typedef std::shared_ptr<PlayerBase> Ptr;
static Ptr createPlayer(const string &strUrl);
static Ptr createPlayer(const EventPoller::Ptr &poller,const string &strUrl);
PlayerBase();
virtual ~PlayerBase(){}
......@@ -154,7 +154,10 @@ class PlayerImp : public Parent
{
public:
typedef std::shared_ptr<PlayerImp> Ptr;
PlayerImp(){}
template<typename ...ArgsType>
PlayerImp(ArgsType &&...args):Parent(std::forward<ArgsType>(args)...){}
virtual ~PlayerImp(){}
void setOnShutdown(const function<void(const SockException &)> &cb) override {
if (_parser) {
......
......@@ -66,7 +66,8 @@ PlayerProxy::PlayerProxy(const string &strVhost,
const string &strSrc,
bool bEnableHls,
bool bEnableMp4,
int iRetryCount){
int iRetryCount,
const EventPoller::Ptr &poller) : MediaPlayer(poller){
_strVhost = strVhost;
_strApp = strApp;
_strSrc = strSrc;
......@@ -127,21 +128,18 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl;
strongPlayer->MediaPlayer::play(strUrl);
return false;
}, nullptr);
}, getPoller());
}
bool PlayerProxy::close() {
//通知其停止推流
weak_ptr<PlayerProxy> weakSlef = dynamic_pointer_cast<PlayerProxy>(shared_from_this());
auto poller = getPoller();
if(poller) {
poller->async_first([weakSlef]() {
auto stronSelf = weakSlef.lock();
if (stronSelf) {
stronSelf->_mediaMuxer.reset();
stronSelf->teardown();
}
});
}
//通知其停止推流
weak_ptr<PlayerProxy> weakSlef = dynamic_pointer_cast<PlayerProxy>(shared_from_this());
getPoller()->async_first([weakSlef]() {
auto stronSelf = weakSlef.lock();
if (stronSelf) {
stronSelf->_mediaMuxer.reset();
stronSelf->teardown();
}
});
return true;
}
......
......@@ -51,7 +51,8 @@ public:
const string &strSrc,
bool bEnableHls = true,
bool bEnableMp4 = false,
int iRetryCount = -1);
int iRetryCount = -1,
const EventPoller::Ptr &poller = nullptr);
virtual ~PlayerProxy();
......
......@@ -32,21 +32,27 @@ using namespace toolkit;
namespace mediakit {
MediaPusher::MediaPusher(const MediaSource::Ptr &src) {
MediaPusher::MediaPusher(const MediaSource::Ptr &src,
const EventPoller::Ptr &poller) {
_src = src;
_poller = poller;
if(!_poller){
_poller = EventPollerPool::Instance().getPoller();
}
}
MediaPusher::MediaPusher(const string &schema,
const string &strVhost,
const string &strApp,
const string &strStream) {
_src = MediaSource::find(schema,strVhost,strApp,strStream);
const string &strStream,
const EventPoller::Ptr &poller) :
MediaPusher(MediaSource::find(schema,strVhost,strApp,strStream),poller){
}
MediaPusher::~MediaPusher() {
}
void MediaPusher::publish(const string &strUrl) {
_parser = PusherBase::createPusher(_src.lock(),strUrl);
_parser = PusherBase::createPusher(_poller,_src.lock(),strUrl);
_parser->setOnShutdown(_shutdownCB);
_parser->setOnPublished(_publishCB);
_parser->mINI::operator=(*this);
......@@ -54,11 +60,7 @@ void MediaPusher::publish(const string &strUrl) {
}
EventPoller::Ptr MediaPusher::getPoller(){
auto parser = dynamic_pointer_cast<SocketHelper>(_parser);
if(!parser){
return nullptr;
}
return parser->getPoller();
return _poller;
}
......
......@@ -42,15 +42,18 @@ public:
MediaPusher(const string &schema,
const string &strVhost,
const string &strApp,
const string &strStream);
const string &strStream,
const EventPoller::Ptr &poller = nullptr);
MediaPusher(const MediaSource::Ptr &src);
MediaPusher(const MediaSource::Ptr &src,
const EventPoller::Ptr &poller = nullptr);
virtual ~MediaPusher();
void publish(const string &strUrl) override;
EventPoller::Ptr getPoller();
private:
std::weak_ptr<MediaSource> _src;
EventPoller::Ptr _poller;
};
} /* namespace mediakit */
......
......@@ -35,7 +35,8 @@ using namespace mediakit::Client;
namespace mediakit {
PusherBase::Ptr PusherBase::createPusher(const MediaSource::Ptr &src,
PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &poller,
const MediaSource::Ptr &src,
const string & strUrl) {
static auto releasePusher = [](PusherBase *ptr){
onceToken token(nullptr,[&](){
......@@ -45,12 +46,12 @@ PusherBase::Ptr PusherBase::createPusher(const MediaSource::Ptr &src,
};
string prefix = FindField(strUrl.data(), NULL, "://");
if (strcasecmp("rtsp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtspPusher(dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
}
if (strcasecmp("rtmp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtmpPusher(dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher);
return PusherBase::Ptr(new RtmpPusher(poller,dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher);
}
return PusherBase::Ptr(new RtspPusher(dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
}
PusherBase::PusherBase() {
......
......@@ -44,7 +44,8 @@ public:
typedef std::shared_ptr<PusherBase> Ptr;
typedef std::function<void(const SockException &ex)> Event;
static Ptr createPusher(const MediaSource::Ptr &src,
static Ptr createPusher(const EventPoller::Ptr &poller,
const MediaSource::Ptr &src,
const string &strUrl);
PusherBase();
......@@ -78,7 +79,10 @@ template<typename Parent,typename Parser>
class PusherImp : public Parent {
public:
typedef std::shared_ptr<PusherImp> Ptr;
PusherImp(){}
template<typename ...ArgsType>
PusherImp(ArgsType &&...args):Parent(std::forward<ArgsType>(args)...){}
virtual ~PusherImp(){}
/**
......
......@@ -36,7 +36,7 @@ using namespace mediakit::Client;
namespace mediakit {
unordered_map<string, RtmpPlayer::rtmpCMDHandle> RtmpPlayer::g_mapCmd;
RtmpPlayer::RtmpPlayer() {
RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {
static onceToken token([]() {
g_mapCmd.emplace("_error",&RtmpPlayer::onCmd_result);
g_mapCmd.emplace("_result",&RtmpPlayer::onCmd_result);
......
......@@ -48,7 +48,7 @@ namespace mediakit {
class RtmpPlayer:public PlayerBase, public TcpClient, public RtmpProtocol{
public:
typedef std::shared_ptr<RtmpPlayer> Ptr;
RtmpPlayer();
RtmpPlayer(const EventPoller::Ptr &poller);
virtual ~RtmpPlayer();
void play(const string &strUrl) override;
......
......@@ -43,7 +43,7 @@ namespace mediakit {
class RtmpPlayerImp: public PlayerImp<RtmpPlayer,RtmpDemuxer> {
public:
typedef std::shared_ptr<RtmpPlayerImp> Ptr;
RtmpPlayerImp(){};
RtmpPlayerImp(const EventPoller::Ptr &poller) : PlayerImp<RtmpPlayer,RtmpDemuxer>(poller){};
virtual ~RtmpPlayerImp(){
DebugL<<endl;
};
......
......@@ -36,7 +36,7 @@ namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtmpPusher::RtmpPusher(const RtmpMediaSource::Ptr &src){
RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){
_pMediaSrc=src;
}
......
......@@ -37,7 +37,7 @@ namespace mediakit {
class RtmpPusher: public RtmpProtocol , public TcpClient , public PusherBase{
public:
typedef std::shared_ptr<RtmpPusher> Ptr;
RtmpPusher(const RtmpMediaSource::Ptr &src);
RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src);
virtual ~RtmpPusher();
void publish(const string &strUrl) override ;
......
......@@ -43,7 +43,7 @@ using namespace mediakit::Client;
namespace mediakit {
RtspPlayer::RtspPlayer(void){
RtspPlayer::RtspPlayer(const EventPoller::Ptr &poller) : TcpClient(poller){
RtpReceiver::setPoolSize(64);
}
RtspPlayer::~RtspPlayer(void) {
......
......@@ -52,7 +52,7 @@ class RtspPlayer: public PlayerBase,public TcpClient, public RtspSplitter, publi
public:
typedef std::shared_ptr<RtspPlayer> Ptr;
RtspPlayer();
RtspPlayer(const EventPoller::Ptr &poller) ;
virtual ~RtspPlayer(void);
void play(const string &strUrl) override;
void pause(bool bPause) override;
......
......@@ -44,7 +44,7 @@ namespace mediakit {
class RtspPlayerImp: public PlayerImp<RtspPlayer,RtspDemuxer> {
public:
typedef std::shared_ptr<RtspPlayerImp> Ptr;
RtspPlayerImp(){};
RtspPlayerImp(const EventPoller::Ptr &poller) : PlayerImp<RtspPlayer,RtspDemuxer>(poller){}
virtual ~RtspPlayerImp(){
DebugL<<endl;
};
......
......@@ -13,7 +13,7 @@ namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspPusher::RtspPusher(const RtspMediaSource::Ptr &src) {
RtspPusher::RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src) : TcpClient(poller){
_pMediaSrc = src;
}
......
......@@ -25,7 +25,7 @@ namespace mediakit {
class RtspPusher : public TcpClient, public RtspSplitter, public PusherBase {
public:
typedef std::shared_ptr<RtspPusher> Ptr;
RtspPusher(const RtspMediaSource::Ptr &src);
RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src);
virtual ~RtspPusher();
void publish(const string &strUrl) override;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论