Commit b0bf216b by xiongziliang

添加无人观看主动断开事件

parent 3917b645
......@@ -343,7 +343,7 @@ void installWebApi() {
});
//主动关断流,包括关断拉流、推流
//测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs
//测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1
API_REGIST(api,close_stream,{
CHECK_SECRET();
CHECK_ARGS("schema","vhost","app","stream");
......@@ -353,7 +353,7 @@ void installWebApi() {
allArgs["app"],
allArgs["stream"]);
if(src){
bool flag = src->close();
bool flag = src->close(allArgs["force"].as<bool>());
val["code"] = flag ? 0 : -1;
val["msg"] = flag ? "success" : "close failed";
}else{
......@@ -511,6 +511,13 @@ void installWebApi() {
//shell登录调试事件
throw SuccessException();
});
API_REGIST(hook,on_stream_none_reader,{
//无人观看流默认关闭
val["close"] = true;
});
}
void unInstallWebApi(){
......
......@@ -42,6 +42,7 @@ const char kOnStreamChanged[] = HOOK_FIELD"on_stream_changed";
const char kOnStreamNotFound[] = HOOK_FIELD"on_stream_not_found";
const char kOnRecordMp4[] = HOOK_FIELD"on_record_mp4";
const char kOnShellLogin[] = HOOK_FIELD"on_shell_login";
const char kOnStreamNoneReader[] = HOOK_FIELD"on_stream_none_reader";
const char kAdminParams[] = HOOK_FIELD"admin_params";
onceToken token([](){
......@@ -56,6 +57,7 @@ onceToken token([](){
mINI::Instance()[kOnStreamNotFound] = "https://127.0.0.1/index/hook/on_stream_not_found";
mINI::Instance()[kOnRecordMp4] = "https://127.0.0.1/index/hook/on_record_mp4";
mINI::Instance()[kOnShellLogin] = "https://127.0.0.1/index/hook/on_shell_login";
mINI::Instance()[kOnStreamNoneReader] = "https://127.0.0.1/index/hook/on_stream_none_reader";
mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc";
},nullptr);
}//namespace Hook
......@@ -159,7 +161,7 @@ void installWebHook(){
GET_CONFIG_AND_REGISTER(string,hook_stream_not_found,Hook::kOnStreamNotFound);
GET_CONFIG_AND_REGISTER(string,hook_record_mp4,Hook::kOnRecordMp4);
GET_CONFIG_AND_REGISTER(string,hook_shell_login,Hook::kOnShellLogin);
GET_CONFIG_AND_REGISTER(string,hook_stream_none_reader,Hook::kOnStreamNoneReader);
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){
if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty()){
......@@ -325,6 +327,29 @@ void installWebHook(){
});
});
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs){
if(!hook_enable || hook_stream_none_reader.empty()){
return;
}
ArgsType body;
body["schema"] = sender.getSchema();
body["vhost"] = sender.getVhost();
body["app"] = sender.getApp();
body["stream"] = sender.getId();
weak_ptr<MediaSource> weakSrc = sender.shared_from_this();
//执行hook
do_http_hook(hook_stream_none_reader,body, [weakSrc](const Value &obj,const string &err){
bool flag = obj["close"].asBool();
auto strongSrc = weakSrc.lock();
if(!flag || !err.empty() || !strongSrc){
return;
}
strongSrc->close(false);
});
});
}
void unInstallWebHook(){
......
......@@ -44,8 +44,8 @@ using namespace toolkit;
namespace mediakit {
class MediaSourceEvent
{
class MediaSource;
class MediaSourceEvent{
public:
MediaSourceEvent(){};
virtual ~MediaSourceEvent(){};
......@@ -55,15 +55,18 @@ public:
return false;
}
virtual bool close() {
virtual bool close(bool force) {
//通知其停止推流
return false;
}
virtual void onReaderChanged(const EventPoller::Ptr &poller,int size,bool add_flag){}
virtual void onNoneReader(MediaSource &sender){
//没有任何读取器消费该源,表明该源可以关闭了
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender);
}
};
class MediaInfo
{
class MediaInfo{
public:
MediaInfo(){}
MediaInfo(const string &url){
......@@ -85,7 +88,6 @@ public:
string _streamid;
StrCaseMap _params;
string _param_strs;
};
......@@ -144,12 +146,12 @@ public:
virtual uint32_t getTimeStamp(TrackType trackType) = 0;
bool close() {
bool close(bool force) {
auto listener = _listener.lock();
if(!listener){
return false;
}
return listener->close();
return listener->close(force);
}
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener;
......
......@@ -65,11 +65,15 @@ const char kBroadcastFlowReport[] = "kBroadcastFlowReport";
const char kBroadcastReloadConfig[] = "kBroadcastReloadConfig";
const char kBroadcastShellLogin[] = "kBroadcastShellLogin";
const char kBroadcastNotFoundStream[] = "kBroadcastNotFoundStream";
const char kBroadcastStreamNoneReader[] = "kBroadcastStreamNoneReader";
const char kFlowThreshold[] = "broadcast.flowThreshold";
const char kStreamNoneReaderDelayMS[] = "broadcast.streamNoneReaderDelayMS";
onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024;
mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000;
},nullptr);
} //namespace Broadcast
......
......@@ -116,9 +116,18 @@ extern const char kBroadcastFlowReport[];
extern const char kBroadcastNotFoundStream[];
#define BroadcastNotFoundStreamArgs const MediaInfo &args,TcpSession &sender
//某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑
extern const char kBroadcastStreamNoneReader[];
#define BroadcastStreamNoneReaderArgs MediaSource &sender
//流量汇报事件流量阈值,单位KB,默认1MB
extern const char kFlowThreshold[];
//流无人观看并且超过若干时间后才触发kBroadcastStreamNoneReader事件
//默认连续5秒无人观看然后触发kBroadcastStreamNoneReader事件
extern const char kStreamNoneReaderDelayMS[];
//更新配置文件事件广播,执行loadIniConfig函数加载配置文件成功后会触发该广播
extern const char kBroadcastReloadConfig[];
#define BroadcastReloadConfigArgs void
......
......@@ -166,11 +166,21 @@ void MediaReader::startReadMP4() {
seek(ui32Stamp);
return true;
}
bool MediaReader::close(){
bool MediaReader::close(bool force){
if(!force && _mediaMuxer->readerCount() != 0 ){
return false;
}
_timer.reset();
return true;
}
void MediaReader::onNoneReader(MediaSource &sender) {
if(_mediaMuxer->readerCount() != 0){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
bool MediaReader::readSample(int iTimeInc,bool justSeekSyncFrame) {
TimeTicker();
lock_guard<recursive_mutex> lck(_mtx);
......
......@@ -68,7 +68,7 @@ public:
* 关闭MediaReader的流化进程,会触发该对象放弃自持有
* @return
*/
bool close() override;
bool close(bool force) override;
/**
* 自动生成MediaReader对象然后查找相关的MediaSource对象
......@@ -86,9 +86,11 @@ public:
const string &strId,
const string &filePath = "",
bool checkApp = true);
#ifdef ENABLE_MP4V2
private:
void seek(uint32_t iSeekTime,bool bReStart = true);
void onNoneReader(MediaSource &sender) override;
#ifdef ENABLE_MP4V2
void seek(uint32_t iSeekTime,bool bReStart = true);
inline void setSeekTime(uint32_t iSeekTime);
inline uint32_t getVideoCurrentTime();
inline MP4SampleId getVideoSampleId(int iTimeInc = 0);
......
......@@ -145,7 +145,11 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
return false;
}, getPoller());
}
bool PlayerProxy::close() {
bool PlayerProxy::close(bool force) {
if(!force && _mediaMuxer->readerCount() != 0){
return false;
}
//通知其停止推流
weak_ptr<PlayerProxy> weakSlef = dynamic_pointer_cast<PlayerProxy>(shared_from_this());
getPoller()->async_first([weakSlef]() {
......@@ -161,6 +165,12 @@ bool PlayerProxy::close() {
return true;
}
void PlayerProxy::onNoneReader(MediaSource &sender) {
if(_mediaMuxer->readerCount() != 0){
return;
}
MediaSourceEvent::onNoneReader(sender);
}
class MuteAudioMaker : public FrameRingInterfaceDelegate{
public:
......
......@@ -79,8 +79,9 @@ public:
* 被主动关闭
* @return
*/
bool close() override;
bool close(bool force) override;
private:
void onNoneReader(MediaSource &sender) override;
void rePlay(const string &strUrl,int iFailedCnt);
void onPlaySuccess();
private:
......
......@@ -53,10 +53,13 @@ public:
typedef std::shared_ptr<RtmpMediaSource> Ptr;
typedef RingBuffer<RtmpPacket::Ptr> RingType;
RtmpMediaSource(const string &vhost,const string &strApp, const string &strId,int ringSize = 0) :
RtmpMediaSource(const string &vhost,
const string &strApp,
const string &strId,
int ringSize = 0) :
MediaSource(RTMP_SCHEMA,vhost,strApp,strId),
_pRing(new RingBuffer<RtmpPacket::Ptr>(ringSize)) {
}
_ringSize(ringSize) {}
virtual ~RtmpMediaSource() {}
const RingType::Ptr &getRing() const {
......@@ -65,7 +68,7 @@ public:
}
int readerCount() override {
return _pRing->readerCount();
return _pRing ? _pRing->readerCount() : 0;
}
const AMFValue &getMetaData() const {
......@@ -89,15 +92,26 @@ public:
lock_guard<recursive_mutex> lock(_mtxMap);
if (pkt->isCfgFrame()) {
_mapCfgFrame[pkt->typeId] = pkt;
} else{
if(!_bRegisted){
regist();
_bRegisted = true;
}
_mapStamp[pkt->typeId] = pkt->timeStamp;
_pRing->write(pkt,pkt->isVideoKeyFrame());
return;
}
}
_mapStamp[pkt->typeId] = pkt->timeStamp;
if(!_pRing){
weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
_pRing = std::make_shared<RingType>(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->onReaderChanged(size);
});
onReaderChanged(0);
regist();
}
_pRing->write(pkt,pkt->isVideoKeyFrame());
checkNoneReader();
}
uint32_t getTimeStamp(TrackType trackType) override {
lock_guard<recursive_mutex> lock(_mtxMap);
......@@ -110,13 +124,38 @@ public:
return MAX(_mapStamp[MSG_VIDEO],_mapStamp[MSG_AUDIO]);
}
}
private:
void onReaderChanged(int size){
if(size != 0 || readerCount() != 0){
//还有消费者正在观看该流,我们记录最后一次活动时间
_readerTicker.resetTime();
_asyncEmitNoneReader = false;
return;
}
_asyncEmitNoneReader = true;
}
void checkNoneReader(){
GET_CONFIG_AND_REGISTER(int,stream_none_reader_delay,Broadcast::kStreamNoneReaderDelayMS);
if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){
_asyncEmitNoneReader = false;
auto listener = _listener.lock();
if(!listener){
return;
}
listener->onNoneReader(*this);
}
}
protected:
AMFValue _metadata;
unordered_map<int, RtmpPacket::Ptr> _mapCfgFrame;
unordered_map<int,uint32_t> _mapStamp;
mutable recursive_mutex _mtxMap;
RingBuffer<RtmpPacket::Ptr>::Ptr _pRing; //rtp环形缓冲
bool _bRegisted = false;
int _ringSize;
Ticker _readerTicker;
bool _asyncEmitNoneReader = false;
};
} /* namespace mediakit */
......
......@@ -360,7 +360,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
strongSelf->shutdown();
});
_pPlayerSrc = src;
if (src->getRing()->readerCount() == 1) {
if (src->readerCount() == 1) {
src->seekTo(0);
}
......
......@@ -81,7 +81,10 @@ private:
sendResponse(MSG_CMD, invoke.data());
}
bool close() override {
bool close(bool force) override {
if(!force && _pPublisherSrc->readerCount() != 0){
return false;
}
InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
safeShutdown();
return true;
......
......@@ -91,7 +91,7 @@ public:
}
int readerCount() override {
return RtmpMediaSource::readerCount() + _rtspMuxer->readerCount();
return RtmpMediaSource::readerCount() + (_rtspMuxer ? _rtspMuxer->readerCount() : 0);
}
private:
RtmpDemuxer::Ptr _rtmpDemuxer;
......
......@@ -49,7 +49,7 @@ public:
_mediaSouce->setListener(listener);
}
int readerCount() const{
return _mediaSouce->getRing()->readerCount();
return _mediaSouce->readerCount();
}
private:
void onAllTrackReady() override {
......
......@@ -55,10 +55,13 @@ public:
typedef std::shared_ptr<RtspMediaSource> Ptr;
typedef RingBuffer<RtpPacket::Ptr> RingType;
RtspMediaSource(const string &strVhost,const string &strApp, const string &strId,int ringSize = 0) :
RtspMediaSource(const string &strVhost,
const string &strApp,
const string &strId,
int ringSize = 0) :
MediaSource(RTSP_SCHEMA,strVhost,strApp,strId),
_pRing(new RingBuffer<RtpPacket::Ptr>(ringSize)) {
}
_ringSize(ringSize){}
virtual ~RtspMediaSource() {}
const RingType::Ptr &getRing() const {
......@@ -67,7 +70,7 @@ public:
}
int readerCount() override {
return _pRing->readerCount();
return _pRing ? _pRing->readerCount() : 0;
}
const string& getSdp() const {
......@@ -114,7 +117,6 @@ public:
//派生类设置该媒体源媒体描述信息
_strSdp = sdp;
_sdpAttr.load(sdp);
regist();
}
void onWrite(const RtpPacket::Ptr &rtppt, bool keyPos) override {
......@@ -124,12 +126,50 @@ public:
track->_time_stamp = rtppt->timeStamp;
track->_ssrc = rtppt->ssrc;
}
if(!_pRing){
weak_ptr<RtspMediaSource> weakSelf = dynamic_pointer_cast<RtspMediaSource>(shared_from_this());
_pRing = std::make_shared<RingType>(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->onReaderChanged(size);
});
onReaderChanged(0);
regist();
}
_pRing->write(rtppt,keyPos);
checkNoneReader();
}
private:
void onReaderChanged(int size){
if(size != 0 || readerCount() != 0){
//还有消费者正在观看该流,我们记录最后一次活动时间
_readerTicker.resetTime();
_asyncEmitNoneReader = false;
return;
}
_asyncEmitNoneReader = true;
}
void checkNoneReader(){
GET_CONFIG_AND_REGISTER(int,stream_none_reader_delay,Broadcast::kStreamNoneReaderDelayMS);
if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){
_asyncEmitNoneReader = false;
auto listener = _listener.lock();
if(!listener){
return;
}
listener->onNoneReader(*this);
}
}
protected:
SdpAttr _sdpAttr;
string _strSdp; //媒体描述信息
RingType::Ptr _pRing; //rtp环形缓冲
int _ringSize;
Ticker _readerTicker;
bool _asyncEmitNoneReader = false;
};
} /* namespace mediakit */
......
......@@ -1161,7 +1161,10 @@ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
return -1;
}
bool RtspSession::close() {
bool RtspSession::close(bool force) {
if(!force && _pushSrc->readerCount() != 0){
return false;
}
InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
safeShutdown();
return true;
......
......@@ -105,7 +105,7 @@ protected:
//RtpReceiver override
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
//MediaSourceEvent override
bool close() override ;
bool close(bool force) override ;
//TcpSession override
int send(const Buffer::Ptr &pkt) override;
......
......@@ -65,8 +65,7 @@ public:
_rtmpMuxer = std::make_shared<RtmpMediaSourceMuxer>(getVhost(),
getApp(),
getId(),
std::make_shared<TitleMete>(
_rtspDemuxer->getDuration()));
std::make_shared<TitleMete>(_rtspDemuxer->getDuration()));
for (auto &track : _rtspDemuxer->getTracks(false)) {
_rtmpMuxer->addTrack(track);
_recorder->addTrack(track);
......@@ -86,7 +85,7 @@ public:
}
}
int readerCount() override {
return RtspMediaSource::readerCount() + _rtmpMuxer->readerCount();
return RtspMediaSource::readerCount() + (_rtmpMuxer ? _rtmpMuxer->readerCount() : 0);
}
private:
RtspDemuxer::Ptr _rtspDemuxer;
......
......@@ -39,8 +39,9 @@ public:
RtspMediaSourceMuxer(const string &vhost,
const string &strApp,
const string &strId,
const TitleSdp::Ptr &title = nullptr) : RtspMuxer(title){
_mediaSouce = std::make_shared<RtspMediaSource>(vhost,strApp,strId);
const TitleSdp::Ptr &title = nullptr,
bool masterSrc = true) : RtspMuxer(title){
_mediaSouce = std::make_shared<RtspMediaSource>(vhost,strApp,strId,0,masterSrc);
getRtpRing()->setDelegate(_mediaSouce);
}
virtual ~RtspMediaSourceMuxer(){}
......@@ -49,7 +50,7 @@ public:
_mediaSouce->setListener(listener);
}
int readerCount() const{
return _mediaSouce->getRing()->readerCount();
return _mediaSouce->readerCount();
}
void setTimeStamp(uint32_t stamp){
_mediaSouce->setTimeStamp(stamp);
......
......@@ -51,7 +51,7 @@ public:
if(!media) {
break;
}
if(!media->close()) {
if(!media->close(true)) {
break;
}
(*stream) << "\t踢出成功:"
......@@ -86,9 +86,12 @@ public:
}
};
static onceToken s_token([]() {
REGIST_CMD(media);
}, nullptr);
void installShellCMD(){
static onceToken s_token([]() {
REGIST_CMD(media);
}, nullptr);
}
} /* namespace mediakit */
\ No newline at end of file
......@@ -33,7 +33,10 @@ using namespace toolkit;
namespace mediakit {
extern void installShellCMD();
ShellSession::ShellSession(const Socket::Ptr &_sock) : TcpSession(_sock) {
installShellCMD();
pleaseInputUser();
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论