Commit ace20071 by xiongziliang

修复无人播放事件触发延时不准确的bug

parent 1168174c
...@@ -44,3 +44,4 @@ ...@@ -44,3 +44,4 @@
/3rdpart/media-server/.idea/ /3rdpart/media-server/.idea/
/build/ /build/
/3rdpart/media-server/.idea/ /3rdpart/media-server/.idea/
/ios/
...@@ -71,15 +71,6 @@ protected: ...@@ -71,15 +71,6 @@ protected:
return true; return true;
} }
// 通知无人观看
void onNoneReader(MediaSource &sender) override{
if(_channel->totalReaderCount()){
//统计有误,还有人在看
return;
}
MediaSourceEvent::onNoneReader(sender);
}
// 观看总人数 // 观看总人数
int totalReaderCount(MediaSource &sender) override{ int totalReaderCount(MediaSource &sender) override{
return _channel->totalReaderCount(); return _channel->totalReaderCount();
......
...@@ -236,21 +236,12 @@ bool FFmpegSource::close(MediaSource &sender, bool force) { ...@@ -236,21 +236,12 @@ bool FFmpegSource::close(MediaSource &sender, bool force) {
return true; return true;
} }
void FFmpegSource::onNoneReader(MediaSource &sender) {
auto listener = _listener.lock();
if(listener){
listener->onNoneReader(sender);
}else{
MediaSourceEvent::onNoneReader(sender);
}
}
int FFmpegSource::totalReaderCount(MediaSource &sender) { int FFmpegSource::totalReaderCount(MediaSource &sender) {
auto listener = _listener.lock(); auto listener = _listener.lock();
if(listener){ if(listener){
return listener->totalReaderCount(sender); return listener->totalReaderCount(sender);
} }
return 0; return sender.readerCount();
} }
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
......
...@@ -59,7 +59,6 @@ private: ...@@ -59,7 +59,6 @@ private:
//MediaSourceEvent override //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override ;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
private: private:
Process _process; Process _process;
......
...@@ -117,7 +117,9 @@ void MediaSource::onNoneReader(){ ...@@ -117,7 +117,9 @@ void MediaSource::onNoneReader(){
if(!listener){ if(!listener){
return; return;
} }
if (listener->totalReaderCount(*this) == 0) {
listener->onNoneReader(*this); listener->onNoneReader(*this);
}
} }
void MediaSource::for_each_media(const function<void(const MediaSource::Ptr &src)> &cb) { void MediaSource::for_each_media(const function<void(const MediaSource::Ptr &src)> &cb) {
...@@ -382,16 +384,31 @@ void MediaInfo::parse(const string &url){ ...@@ -382,16 +384,31 @@ void MediaInfo::parse(const string &url){
void MediaSourceEvent::onNoneReader(MediaSource &sender){ void MediaSourceEvent::onNoneReader(MediaSource &sender){
//没有任何读取器消费该源,表明该源可以关闭了 //没有任何读取器消费该源,表明该源可以关闭了
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
weak_ptr<MediaSource> weakPtr = sender.shared_from_this();
//异步广播该事件,防止同步调用sender.close()导致在接收rtp或rtmp包时清空包缓存等操作 weak_ptr<MediaSource> weakSender = sender.shared_from_this();
EventPollerPool::Instance().getPoller()->async([weakPtr](){ _async_close_timer = std::make_shared<Timer>(stream_none_reader_delay / 1000.0, [weakSender]() {
auto strongPtr = weakPtr.lock(); auto strongSender = weakSender.lock();
if(strongPtr){ if (!strongSender) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,*strongPtr); //对象已经销毁
return false;
} }
},false);
if (strongSender->totalReaderCount() != 0) {
//还有人消费
return false;
}
WarnL << "onNoneReader:"
<< strongSender->getSchema() << "/"
<< strongSender->getVhost() << "/"
<< strongSender->getApp() << "/"
<< strongSender->getId();
//触发消息广播
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strongSender);
return false;
}, nullptr);
} }
......
...@@ -52,24 +52,21 @@ namespace mediakit { ...@@ -52,24 +52,21 @@ namespace mediakit {
class MediaSource; class MediaSource;
class MediaSourceEvent{ class MediaSourceEvent{
public: public:
friend class MediaSource;
MediaSourceEvent(){}; MediaSourceEvent(){};
virtual ~MediaSourceEvent(){}; virtual ~MediaSourceEvent(){};
// 通知拖动进度条 // 通知拖动进度条
virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){ virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){ return false; }
return false;
}
// 通知其停止推流 // 通知其停止推流
virtual bool close(MediaSource &sender,bool force) { virtual bool close(MediaSource &sender,bool force) { return false;}
return false;
}
// 通知无人观看
virtual void onNoneReader(MediaSource &sender);
// 观看总人数 // 观看总人数
virtual int totalReaderCount(MediaSource &sender) = 0; virtual int totalReaderCount(MediaSource &sender) = 0;
private:
// 通知无人观看
void onNoneReader(MediaSource &sender);
private:
Timer::Ptr _async_close_timer;
}; };
/** /**
......
...@@ -196,13 +196,6 @@ bool PlayerProxy::close(MediaSource &sender,bool force) { ...@@ -196,13 +196,6 @@ bool PlayerProxy::close(MediaSource &sender,bool force) {
return true; return true;
} }
void PlayerProxy::onNoneReader(MediaSource &sender) {
if(!_mediaMuxer || totalReaderCount()){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
int PlayerProxy::totalReaderCount(){ int PlayerProxy::totalReaderCount(){
return (_mediaMuxer ? _mediaMuxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0); return (_mediaMuxer ? _mediaMuxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0);
} }
......
...@@ -83,7 +83,6 @@ public: ...@@ -83,7 +83,6 @@ public:
private: private:
//MediaSourceEvent override //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
void rePlay(const string &strUrl,int iFailedCnt); void rePlay(const string &strUrl,int iFailedCnt);
void onPlaySuccess(); void onPlaySuccess();
......
...@@ -79,7 +79,7 @@ private: ...@@ -79,7 +79,7 @@ private:
return; return;
} }
if (--_readerCount == 0 && totalReaderCount() == 0) { if (--_readerCount == 0) {
onNoneReader(); onNoneReader();
} }
} }
......
...@@ -190,13 +190,6 @@ bool MP4Reader::close(MediaSource &sender,bool force){ ...@@ -190,13 +190,6 @@ bool MP4Reader::close(MediaSource &sender,bool force){
return true; return true;
} }
void MP4Reader::onNoneReader(MediaSource &sender) {
if(!_mediaMuxer || _mediaMuxer->totalReaderCount()){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
int MP4Reader::totalReaderCount(MediaSource &sender) { int MP4Reader::totalReaderCount(MediaSource &sender) {
return _mediaMuxer ? _mediaMuxer->totalReaderCount() : sender.readerCount(); return _mediaMuxer ? _mediaMuxer->totalReaderCount() : sender.readerCount();
} }
......
...@@ -77,7 +77,6 @@ private: ...@@ -77,7 +77,6 @@ private:
//MediaSourceEvent override //MediaSourceEvent override
bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override; bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override;
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
#ifdef ENABLE_MP4V2 #ifdef ENABLE_MP4V2
void seek(uint32_t iSeekTime,bool bReStart = true); void seek(uint32_t iSeekTime,bool bReStart = true);
......
...@@ -161,7 +161,6 @@ public: ...@@ -161,7 +161,6 @@ public:
_track_stamps_map[pkt->typeId] = pkt->timeStamp; _track_stamps_map[pkt->typeId] = pkt->timeStamp;
//不存在视频,为了减少缓存延时,那么关闭GOP缓存 //不存在视频,为了减少缓存延时,那么关闭GOP缓存
_ring->write(pkt, _have_video ? pkt->isVideoKeyFrame() : true); _ring->write(pkt, _have_video ? pkt->isVideoKeyFrame() : true);
checkNoneReader();
} }
/** /**
...@@ -184,33 +183,15 @@ private: ...@@ -184,33 +183,15 @@ private:
* 每次增减消费者都会触发该函数 * 每次增减消费者都会触发该函数
*/ */
void onReaderChanged(int size) { void onReaderChanged(int size) {
//我们记录最后一次活动时间 if (size == 0) {
_reader_changed_ticker.resetTime();
if (size != 0 || totalReaderCount() != 0) {
//还有消费者正在观看该流
_async_emit_none_reader = false;
return;
}
_async_emit_none_reader = true;
}
/**
* 检查是否无人消费该流,
* 如果无人消费且超过一定时间会触发onNoneReader事件
*/
void checkNoneReader() {
GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) {
_async_emit_none_reader = false;
onNoneReader(); onNoneReader();
} }
} }
protected:
private:
int _ring_size; int _ring_size;
bool _async_emit_none_reader = false;
bool _have_video = false; bool _have_video = false;
mutable recursive_mutex _mtx; mutable recursive_mutex _mtx;
Ticker _reader_changed_ticker;
AMFValue _metadata; AMFValue _metadata;
RingBuffer<RtmpPacket::Ptr>::Ptr _ring; RingBuffer<RtmpPacket::Ptr>::Ptr _ring;
unordered_map<int, uint32_t> _track_stamps_map; unordered_map<int, uint32_t> _track_stamps_map;
......
...@@ -544,14 +544,6 @@ bool RtmpSession::close(MediaSource &sender,bool force) { ...@@ -544,14 +544,6 @@ bool RtmpSession::close(MediaSource &sender,bool force) {
return true; return true;
} }
void RtmpSession::onNoneReader(MediaSource &sender) {
//此回调在其他线程触发
if(!_pPublisherSrc || _pPublisherSrc->totalReaderCount()){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
int RtmpSession::totalReaderCount(MediaSource &sender) { int RtmpSession::totalReaderCount(MediaSource &sender) {
return _pPublisherSrc ? _pPublisherSrc->totalReaderCount() : sender.readerCount(); return _pPublisherSrc ? _pPublisherSrc->totalReaderCount() : sender.readerCount();
} }
......
...@@ -85,7 +85,6 @@ private: ...@@ -85,7 +85,6 @@ private:
//MediaSourceEvent override //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override ; bool close(MediaSource &sender,bool force) override ;
void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
void setSocketFlags(); void setSocketFlags();
......
...@@ -142,13 +142,6 @@ bool RtpProcessHelper::close(MediaSource &sender, bool force) { ...@@ -142,13 +142,6 @@ bool RtpProcessHelper::close(MediaSource &sender, bool force) {
return true; return true;
} }
void RtpProcessHelper::onNoneReader(MediaSource &sender) {
if(!_process || _process->totalReaderCount()){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
int RtpProcessHelper::totalReaderCount(MediaSource &sender) { int RtpProcessHelper::totalReaderCount(MediaSource &sender) {
return _process ? _process->totalReaderCount() : sender.totalReaderCount(); return _process ? _process->totalReaderCount() : sender.totalReaderCount();
} }
......
...@@ -47,8 +47,6 @@ public: ...@@ -47,8 +47,6 @@ public:
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
// 通知无人观看
void onNoneReader(MediaSource &sender) override;
// 观看总人数 // 观看总人数
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
private: private:
......
...@@ -87,14 +87,6 @@ bool RtpSession::close(MediaSource &sender, bool force) { ...@@ -87,14 +87,6 @@ bool RtpSession::close(MediaSource &sender, bool force) {
return true; return true;
} }
void RtpSession::onNoneReader(MediaSource &sender) {
//此回调在其他线程触发
if(!_process || _process->totalReaderCount()){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
int RtpSession::totalReaderCount(MediaSource &sender) { int RtpSession::totalReaderCount(MediaSource &sender) {
//此回调在其他线程触发 //此回调在其他线程触发
return _process ? _process->totalReaderCount() : sender.totalReaderCount(); return _process ? _process->totalReaderCount() : sender.totalReaderCount();
......
...@@ -46,8 +46,6 @@ public: ...@@ -46,8 +46,6 @@ public:
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
// 通知无人观看
void onNoneReader(MediaSource &sender) override;
// 观看总人数 // 观看总人数
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
void onRtpPacket(const char *data,uint64_t len) override; void onRtpPacket(const char *data,uint64_t len) override;
......
...@@ -191,39 +191,19 @@ public: ...@@ -191,39 +191,19 @@ public:
} }
//不存在视频,为了减少缓存延时,那么关闭GOP缓存 //不存在视频,为了减少缓存延时,那么关闭GOP缓存
_ring->write(rtp, _have_video ? keyPos : true); _ring->write(rtp, _have_video ? keyPos : true);
checkNoneReader();
} }
private: private:
/** /**
* 每次增减消费者都会触发该函数 * 每次增减消费者都会触发该函数
*/ */
void onReaderChanged(int size) { void onReaderChanged(int size) {
//我们记录最后一次活动时间 if (size == 0) {
_reader_changed_ticker.resetTime();
if (size != 0 || totalReaderCount() != 0) {
//还有消费者正在观看该流
_async_emit_none_reader = false;
return;
}
_async_emit_none_reader = true;
}
/**
* 检查是否无人消费该流,
* 如果无人消费且超过一定时间会触发onNoneReader事件
*/
void checkNoneReader() {
GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
if (_async_emit_none_reader && _reader_changed_ticker.elapsedTime() > stream_none_reader_delay) {
_async_emit_none_reader = false;
onNoneReader(); onNoneReader();
} }
} }
protected: private:
int _ring_size; int _ring_size;
bool _async_emit_none_reader = false;
bool _have_video = false; bool _have_video = false;
Ticker _reader_changed_ticker;
SdpParser _sdp_parser; SdpParser _sdp_parser;
string _sdp; string _sdp;
RingType::Ptr _ring; RingType::Ptr _ring;
......
...@@ -1135,15 +1135,6 @@ bool RtspSession::close(MediaSource &sender,bool force) { ...@@ -1135,15 +1135,6 @@ bool RtspSession::close(MediaSource &sender,bool force) {
return true; return true;
} }
void RtspSession::onNoneReader(MediaSource &sender){
//此回调在其他线程触发
if(!_pushSrc || _pushSrc->totalReaderCount()){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
int RtspSession::totalReaderCount(MediaSource &sender) { int RtspSession::totalReaderCount(MediaSource &sender) {
return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount(); return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
} }
......
...@@ -107,7 +107,6 @@ protected: ...@@ -107,7 +107,6 @@ protected:
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
//MediaSourceEvent override //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override ; bool close(MediaSource &sender,bool force) override ;
void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
//TcpSession override //TcpSession override
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论