Commit 2749631a by xiongziliang

完成按需拉流事件广播

parent 2b9245b3
...@@ -62,6 +62,7 @@ const char kBroadcastRtmpPublish[] = "kBroadcastRtmpPublish"; ...@@ -62,6 +62,7 @@ const char kBroadcastRtmpPublish[] = "kBroadcastRtmpPublish";
const char kBroadcastFlowReport[] = "kBroadcastFlowReport"; const char kBroadcastFlowReport[] = "kBroadcastFlowReport";
const char kBroadcastReloadConfig[] = "kBroadcastReloadConfig"; const char kBroadcastReloadConfig[] = "kBroadcastReloadConfig";
const char kBroadcastShellLogin[] = "kBroadcastShellLogin"; const char kBroadcastShellLogin[] = "kBroadcastShellLogin";
const char kBroadcastNotFoundStream[] = "kBroadcastNotFoundStream";
const char kFlowThreshold[] = "broadcast.flowThreshold"; const char kFlowThreshold[] = "broadcast.flowThreshold";
......
...@@ -110,6 +110,10 @@ extern const char kBroadcastShellLogin[]; ...@@ -110,6 +110,10 @@ extern const char kBroadcastShellLogin[];
extern const char kBroadcastFlowReport[]; extern const char kBroadcastFlowReport[];
#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,TcpSession &sender #define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,TcpSession &sender
//未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了
extern const char kBroadcastNotFoundStream[];
#define BroadcastNotFoundStreamArgs const MediaInfo &args,TcpSession &sender
//流量汇报事件流量阈值,单位KB,默认1MB //流量汇报事件流量阈值,单位KB,默认1MB
extern const char kFlowThreshold[]; extern const char kFlowThreshold[];
......
...@@ -209,77 +209,73 @@ void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) { ...@@ -209,77 +209,73 @@ void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
throw std::runtime_error(StrPrinter << "Stop publishing." << endl); throw std::runtime_error(StrPrinter << "Stop publishing." << endl);
} }
void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shared_ptr<onceToken> &pToken) { void RtmpSession::findStream(const function<void(const RtmpMediaSource::Ptr &src)> &cb,bool retry) {
//获取流对象
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA, auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
_mediaInfo._vhost, _mediaInfo._vhost,
_mediaInfo._app, _mediaInfo._app,
_mediaInfo._streamid, _mediaInfo._streamid,
true)); true));
//是否鉴权成功 if(src || !retry){
bool authSuccess = err.empty(); cb(src);
if(authSuccess && !src && tryDelay ){ return;
//校验成功,但是流不存在而导致的不能播放 }
//所以我们注册rtmp注册事件,等rtmp推流端推流成功后再告知播放器开始播放
auto task_id = this;
auto media_info = _mediaInfo;
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
NoticeCenter::Instance().addListener(task_id,Broadcast::kBroadcastMediaChanged,
[task_id,weakSelf,media_info,pToken](BroadcastMediaChangedArgs){
if(bRegist &&
schema == media_info._schema &&
vhost == media_info._vhost &&
app == media_info._app &&
stream == media_info._streamid){
//播发器请求的rtmp流终于注册上了
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
//切换到自己的线程再回复
//如果触发 kBroadcastMediaChanged 事件的线程与本RtmpSession绑定的线程相同,
//那么strongSelf->async操作可能是同步操作,
//通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行,
//以便防止遍历事件监听对象map时做删除操作
strongSelf->async([task_id,weakSelf,pToken,media_info](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
DebugL << "收到rtmp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/" << media_info._app << "/" << media_info._streamid;
//回复播放器
strongSelf->doPlayResponse("",false,pToken);
//取消延时任务,防止多次回复
strongSelf->cancelDelyaTask();
//取消事件监听
//在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃
NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged);
}, false);
}
});
//5秒后执行延时任务 //广播未找到流
doDelay(5,[task_id,weakSelf,pToken](){ NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,_mediaInfo,*this);
//取消监听该事件,该延时任务可以在本对象析构时或到达指定延时后调用
//所以该对象在销毁前一定会被取消事件监听
NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged);
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
auto task_id = this;
auto media_info = _mediaInfo;
auto onRegist = [task_id, weakSelf, media_info, cb](BroadcastMediaChangedArgs) {
if(bRegist &&
schema == media_info._schema &&
vhost == media_info._vhost &&
app == media_info._app &&
stream == media_info._streamid){
//播发器请求的rtmp流终于注册上了
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
//5秒后,我们不管流有没有注册上都回复播放器 //切换到自己的线程再回复
strongSelf->doPlayResponse("",false,pToken); //如果触发 kBroadcastMediaChanged 事件的线程与本RtmpSession绑定的线程相同,
}); //那么strongSelf->async操作可能是同步操作,
return; //通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行,
} //以便防止遍历事件监听对象map时做删除操作
strongSelf->async([task_id,weakSelf,media_info,cb](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
DebugL << "收到rtmp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/" << media_info._app << "/" << media_info._streamid;
//再找一遍媒体源,一般能找到
strongSelf->findStream(cb,false);
///////回复流程/////// //取消延时任务,防止多次回复
strongSelf->cancelDelyaTask();
//是否播放成功 //取消事件监听
//在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃
NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged);
}, false);
}
};
NoticeCenter::Instance().addListener(task_id, Broadcast::kBroadcastMediaChanged, onRegist);
//5秒后执行失败回调
doDelay(5, [cb,task_id]() {
//取消监听该事件,该延时任务可以在本对象析构时或到达指定延时后调用
//所以该对象在销毁前一定会被取消事件监听
NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged);
cb(nullptr);
});
}
void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){
bool authSuccess = err.empty();
bool ok = (src.operator bool() && authSuccess); bool ok = (src.operator bool() && authSuccess);
if (ok) { if (ok) {
//stream begin //stream begin
...@@ -294,8 +290,7 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar ...@@ -294,8 +290,7 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar
status.set("clientid", "0"); status.set("clientid", "0");
sendReply("onStatus", nullptr, status); sendReply("onStatus", nullptr, status);
if (!ok) { if (!ok) {
WarnL << "onPlayed:" WarnL << (authSuccess ? "No such stream:" : err.data()) << " "
<< (authSuccess ? "No such stream:" : err.data()) << " "
<< _mediaInfo._vhost << " " << _mediaInfo._vhost << " "
<< _mediaInfo._app << " " << _mediaInfo._app << " "
<< _mediaInfo._streamid << _mediaInfo._streamid
...@@ -377,6 +372,25 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar ...@@ -377,6 +372,25 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar
SockUtil::setNoDelay(_sock->rawFD(),false); SockUtil::setNoDelay(_sock->rawFD(),false);
} }
void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool)> &cb){
if(!err.empty()){
//鉴权失败,直接返回播放失败
sendPlayResponse(err, nullptr);
cb(false);
return;
}
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
//鉴权成功,查找媒体源并回复
findStream([weakSelf,cb](const RtmpMediaSource::Ptr &src){
auto strongSelf = weakSelf.lock();
if(strongSelf){
strongSelf->sendPlayResponse("", src);
}
cb(src.operator bool());
});
}
void RtmpSession::doPlay(AMFDecoder &dec){ void RtmpSession::doPlay(AMFDecoder &dec){
std::shared_ptr<Ticker> pTicker(new Ticker); std::shared_ptr<Ticker> pTicker(new Ticker);
std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker](){ std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker](){
...@@ -393,13 +407,13 @@ void RtmpSession::doPlay(AMFDecoder &dec){ ...@@ -393,13 +407,13 @@ void RtmpSession::doPlay(AMFDecoder &dec){
if(!strongSelf){ if(!strongSelf){
return; return;
} }
strongSelf->doPlayResponse(err,true,pToken); strongSelf->doPlayResponse(err,[pToken](bool){});
}); });
}; };
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this); auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
if(!flag){ if(!flag){
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
doPlayResponse("",true,pToken); doPlayResponse("",[pToken](bool){});
} }
} }
void RtmpSession::onCmd_play2(AMFDecoder &dec) { void RtmpSession::onCmd_play2(AMFDecoder &dec) {
......
...@@ -60,7 +60,9 @@ private: ...@@ -60,7 +60,9 @@ private:
void onCmd_play(AMFDecoder &dec); void onCmd_play(AMFDecoder &dec);
void onCmd_play2(AMFDecoder &dec); void onCmd_play2(AMFDecoder &dec);
void doPlay(AMFDecoder &dec); void doPlay(AMFDecoder &dec);
void doPlayResponse(const string &err,bool tryDelay,const std::shared_ptr<onceToken> &token); void doPlayResponse(const string &err,const std::function<void(bool)> &cb);
void sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src);
void onCmd_seek(AMFDecoder &dec); void onCmd_seek(AMFDecoder &dec);
void onCmd_pause(AMFDecoder &dec); void onCmd_pause(AMFDecoder &dec);
void setMetaData(AMFDecoder &dec); void setMetaData(AMFDecoder &dec);
...@@ -87,6 +89,7 @@ private: ...@@ -87,6 +89,7 @@ private:
void doDelay(int delaySec,const std::function<void()> &fun); void doDelay(int delaySec,const std::function<void()> &fun);
void cancelDelyaTask(); void cancelDelyaTask();
void findStream(const function<void(const RtmpMediaSource::Ptr &src)> &cb ,bool retry = true);
private: private:
std::string _strTcUrl; std::string _strTcUrl;
MediaInfo _mediaInfo; MediaInfo _mediaInfo;
......
...@@ -247,6 +247,7 @@ bool RtspSession::handleReq_Describe() { ...@@ -247,6 +247,7 @@ bool RtspSession::handleReq_Describe() {
if(!success){ if(!success){
//未找到相应的MediaSource //未找到相应的MediaSource
WarnL << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
strongSelf->send_StreamNotFound(); strongSelf->send_StreamNotFound();
strongSelf->shutdown(); strongSelf->shutdown();
return; return;
...@@ -933,6 +934,9 @@ void RtspSession::findStream(const function<void(bool)> &cb) { ...@@ -933,6 +934,9 @@ void RtspSession::findStream(const function<void(bool)> &cb) {
return; return;
} }
//广播未找到流
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,_mediaInfo,*this);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto task_id = this; auto task_id = this;
auto media_info = _mediaInfo; auto media_info = _mediaInfo;
...@@ -943,7 +947,7 @@ void RtspSession::findStream(const function<void(bool)> &cb) { ...@@ -943,7 +947,7 @@ void RtspSession::findStream(const function<void(bool)> &cb) {
vhost == media_info._vhost && vhost == media_info._vhost &&
app == media_info._app && app == media_info._app &&
stream == media_info._streamid) { stream == media_info._streamid) {
//播发器请求的rtmp流终于注册上了 //播发器请求的rtsp流终于注册上了
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
...@@ -983,7 +987,6 @@ inline bool RtspSession::findStream() { ...@@ -983,7 +987,6 @@ inline bool RtspSession::findStream() {
RtspMediaSource::Ptr pMediaSrc = RtspMediaSource::Ptr pMediaSrc =
dynamic_pointer_cast<RtspMediaSource>( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) ); dynamic_pointer_cast<RtspMediaSource>( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) );
if (!pMediaSrc) { if (!pMediaSrc) {
WarnL << "No such stream:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
return false; return false;
} }
_strSdp = pMediaSrc->getSdp(); _strSdp = pMediaSrc->getSdp();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论