Commit 9e26a02f by xiongziliang

实现rtsp/rtmp按需转协议功能

parent aff9963f
......@@ -276,13 +276,11 @@ typedef void* mk_publish_auth_invoker;
/**
* 执行Broadcast::PublishAuthInvoker
* @param err_msg 为空或null则代表鉴权成功
* @param enable_rtxp rtmp推流时是否运行转rtsp;rtsp推流时,是否允许转rtmp
* @param enable_hls 是否允许转换hls
* @param enable_mp4 是否运行MP4录制
*/
API_EXPORT void API_CALL mk_publish_auth_invoker_do(const mk_publish_auth_invoker ctx,
const char *err_msg,
int enable_rtxp,
int enable_hls,
int enable_mp4);
......
......@@ -101,11 +101,10 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
s_events.on_mk_media_publish((mk_media_info) &args,
(mk_publish_auth_invoker) &invoker,
(mk_sock_info) &sender);
}else{
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
invoker("",toRtxp,toHls,toMP4);
} else {
GET_CONFIG(bool, toHls, General::kPublishToHls);
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
invoker("", toHls, toMP4);
}
});
......
......@@ -382,12 +382,11 @@ API_EXPORT void API_CALL mk_rtsp_auth_invoker_clone_release(const mk_rtsp_auth_i
///////////////////////////////////////////Broadcast::PublishAuthInvoker/////////////////////////////////////////////
API_EXPORT void API_CALL mk_publish_auth_invoker_do(const mk_publish_auth_invoker ctx,
const char *err_msg,
int enable_rtxp,
int enable_hls,
int enable_mp4){
assert(ctx);
Broadcast::PublishAuthInvoker *invoker = (Broadcast::PublishAuthInvoker *)ctx;
(*invoker)(err_msg ? err_msg : "", enable_rtxp, enable_hls, enable_mp4);
(*invoker)(err_msg ? err_msg : "", enable_hls, enable_mp4);
}
API_EXPORT mk_publish_auth_invoker API_CALL mk_publish_auth_invoker_clone(const mk_publish_auth_invoker ctx){
......
......@@ -61,8 +61,8 @@ void API_CALL on_mk_media_publish(const mk_media_info url_info,
mk_media_info_get_stream(url_info),
mk_media_info_get_params(url_info));
//允许推流,并且允许转rtxp/hls/mp4
mk_publish_auth_invoker_do(invoker, NULL, 1, 1, 1);
//允许推流,并且允许转hls/mp4
mk_publish_auth_invoker_do(invoker, NULL, 1, 1);
}
/**
......
......@@ -40,8 +40,6 @@ addMuteAudio=1
#拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始,
#如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写)
resetWhenRePlay=1
#是否默认推流时转换成rtsp或rtmp,hook接口(on_publish)中可以覆盖该设置
publishToRtxp=1
#是否默认推流时转换成hls,hook接口(on_publish)中可以覆盖该设置
publishToHls=1
#是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置
......
......@@ -195,11 +195,10 @@ void installWebHook(){
GET_CONFIG(string,hook_http_access,Hook::kOnHttpAccess);
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){
invoker("",toRtxp,toHls,toMP4);
invoker("", toHls, toMP4);
return;
}
//异步执行该hook api,防止阻塞NoticeCenter
......@@ -211,27 +210,20 @@ void installWebHook(){
do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){
if(err.empty()){
//推流鉴权成功
bool enableRtxp = toRtxp;
bool enableHls = toHls;
bool enableMP4 = toMP4;
//兼容用户不传递enableRtxp、enableHls、enableMP4参数
if(obj.isMember("enableRtxp")){
enableRtxp = obj["enableRtxp"].asBool();
}
if(obj.isMember("enableHls")){
//兼容用户不传递enableHls、enableMP4参数
if (obj.isMember("enableHls")) {
enableHls = obj["enableHls"].asBool();
}
if(obj.isMember("enableMP4")){
if (obj.isMember("enableMP4")) {
enableMP4 = obj["enableMP4"].asBool();
}
invoker(err,enableRtxp,enableHls,enableMP4);
}else{
invoker(err, enableHls, enableMP4);
} else {
//推流鉴权失败
invoker(err,false, false, false);
invoker(err, false, false);
}
});
......
......@@ -93,13 +93,10 @@ bool MediaSource::close(bool force) {
return listener->close(*this,force);
}
void MediaSource::onNoneReader(){
void MediaSource::onReaderChanged(int size) {
auto listener = _listener.lock();
if(!listener){
return;
}
if (listener->totalReaderCount(*this) == 0) {
listener->onNoneReader(*this);
if (listener) {
listener->onReaderChanged(*this, size);
}
}
......@@ -475,45 +472,47 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &
/////////////////////////////////////MediaSourceEvent//////////////////////////////////////
void MediaSourceEvent::onNoneReader(MediaSource &sender){
void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){
if (size || totalReaderCount(sender)) {
//还有人观看该视频,不触发关闭事件
return;
}
//没有任何人观看该视频源,表明该源可以关闭了
GET_CONFIG(string, record_app, Record::kAppName);
GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS);
//如果mp4点播, 无人观看时我们强制关闭点播
bool is_mp4_vod = sender.getApp() == record_app;
weak_ptr<MediaSource> weak_sender = sender.shared_from_this();
//没有任何人观看该视频源,表明该源可以关闭了
weak_ptr<MediaSource> weakSender = sender.shared_from_this();
_async_close_timer = std::make_shared<Timer>(stream_none_reader_delay / 1000.0, [weakSender,is_mp4_vod]() {
auto strongSender = weakSender.lock();
if (!strongSender) {
_async_close_timer = std::make_shared<Timer>(stream_none_reader_delay / 1000.0, [weak_sender, is_mp4_vod]() {
auto strong_sender = weak_sender.lock();
if (!strong_sender) {
//对象已经销毁
return false;
}
if (strongSender->totalReaderCount() != 0) {
//还有人消费
if (strong_sender->totalReaderCount()) {
//还有人观看该视频,不触发关闭事件
return false;
}
if(!is_mp4_vod){
if (!is_mp4_vod) {
//直播时触发无人观看事件,让开发者自行选择是否关闭
WarnL << "无人观看事件:"
<< strongSender->getSchema() << "/"
<< strongSender->getVhost() << "/"
<< strongSender->getApp() << "/"
<< strongSender->getId();
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strongSender);
}else{
<< strong_sender->getSchema() << "/"
<< strong_sender->getVhost() << "/"
<< strong_sender->getApp() << "/"
<< strong_sender->getId();
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strong_sender);
} else {
//这个是mp4点播,我们自动关闭
WarnL << "MP4点播无人观看,自动关闭:"
<< strongSender->getSchema() << "/"
<< strongSender->getVhost() << "/"
<< strongSender->getApp() << "/"
<< strongSender->getId();
strongSender->close(false);
<< strong_sender->getSchema() << "/"
<< strong_sender->getVhost() << "/"
<< strong_sender->getApp() << "/"
<< strong_sender->getId();
strong_sender->close(false);
}
return false;
}, nullptr);
}
......@@ -542,13 +541,13 @@ int MediaSourceEventInterceptor::totalReaderCount(MediaSource &sender) {
return listener->totalReaderCount(sender);
}
void MediaSourceEventInterceptor::onNoneReader(MediaSource &sender) {
void MediaSourceEventInterceptor::onReaderChanged(MediaSource &sender, int size) {
auto listener = _listener.lock();
if (!listener) {
MediaSourceEvent::onNoneReader(sender);
return;
MediaSourceEvent::onReaderChanged(sender, size);
} else {
listener->onReaderChanged(sender, size);
}
listener->onNoneReader(sender);
}
void MediaSourceEventInterceptor::onRegist(MediaSource &sender, bool regist) {
......
......@@ -49,8 +49,8 @@ public:
virtual bool close(MediaSource &sender, bool force) { return false; }
// 获取观看总人数
virtual int totalReaderCount(MediaSource &sender) = 0;
// 通知无人观看
virtual void onNoneReader(MediaSource &sender);
// 通知观看人数变化
virtual void onReaderChanged(MediaSource &sender, int size);
//流注册或注销事件
virtual void onRegist(MediaSource &sender, bool regist) {};
......@@ -79,7 +79,7 @@ public:
bool seekTo(MediaSource &sender, uint32_t stamp) override;
bool close(MediaSource &sender, bool force) override;
int totalReaderCount(MediaSource &sender) override;
void onNoneReader(MediaSource &sender) override;
void onReaderChanged(MediaSource &sender, int size) override;
void onRegist(MediaSource &sender, bool regist) override;
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
bool isRecording(MediaSource &sender, Recorder::type type) override;
......@@ -160,8 +160,8 @@ public:
bool seekTo(uint32_t stamp);
// 关闭该流
bool close(bool force);
// 该流无人观看
void onNoneReader();
// 该流观看人数变化
void onReaderChanged(int size);
// 开启或关闭录制
bool setupRecord(Recorder::type type, bool start, const string &custom_path);
// 获取录制状态
......@@ -249,6 +249,10 @@ public:
}
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list> &, bool key_pos) = 0;
private:
......
......@@ -18,21 +18,17 @@ MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, con
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
if (enable_rtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleMeta>(dur_sec));
_enable_rtxp = true;
}
if (enable_rtsp) {
_rtsp = std::make_shared<RtspMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleSdp>(dur_sec));
_enable_rtxp = true;
}
if (enable_hls) {
_hls = Recorder::createRecorder(Recorder::type_hls, vhost, app, stream);
_enable_record = true;
}
if (enable_mp4) {
_mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream);
_enable_record = true;
}
}
......@@ -101,7 +97,6 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo
//停止录制
_hls = nullptr;
}
_enable_record = _hls || _mp4;
return true;
}
case Recorder::type_mp4 : {
......@@ -112,7 +107,6 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo
//停止录制
_mp4 = nullptr;
}
_enable_record = _hls || _mp4;
return true;
}
default : return false;
......@@ -164,7 +158,7 @@ void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) {
}
bool MultiMuxerPrivate::isEnabled(){
return _enable_rtxp || _enable_record;
return (_rtmp ? _rtmp->isEnabled() : false) || (_rtsp ? _rtsp->isEnabled() : false) || _hls || _mp4;
}
void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) {
......
......@@ -49,8 +49,6 @@ private:
MediaSource::Ptr getHlsMediaSource() const;
private:
bool _enable_rtxp = false;
bool _enable_record = false;
Listener *_track_listener = nullptr;
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;
......
......@@ -63,7 +63,6 @@ const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS";
const string kEnableVhost = GENERAL_FIELD"enableVhost";
const string kAddMuteAudio = GENERAL_FIELD"addMuteAudio";
const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay";
const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp";
const string kPublishToHls = GENERAL_FIELD"publishToHls";
const string kPublishToMP4 = GENERAL_FIELD"publishToMP4";
const string kMergeWriteMS = GENERAL_FIELD"mergeWriteMS";
......@@ -76,7 +75,6 @@ onceToken token([](){
mINI::Instance()[kEnableVhost] = 0;
mINI::Instance()[kAddMuteAudio] = 1;
mINI::Instance()[kResetWhenRePlay] = 1;
mINI::Instance()[kPublishToRtxp] = 1;
mINI::Instance()[kPublishToHls] = 1;
mINI::Instance()[kPublishToMP4] = 0;
mINI::Instance()[kMergeWriteMS] = 0;
......
......@@ -86,8 +86,7 @@ extern const string kBroadcastOnRtspAuth;
//如果errMessage为空则代表鉴权成功
//enableHls: 是否允许转换hls
//enableMP4: 是否运行MP4录制
//enableRtxp: rtmp推流时是否运行转rtsp;rtsp推流时,是否允许转rtmp
typedef std::function<void(const string &errMessage,bool enableRtxp,bool enableHls,bool enableMP4)> PublishAuthInvoker;
typedef std::function<void(const string &errMessage, bool enableHls, bool enableMP4)> PublishAuthInvoker;
//收到rtsp/rtmp推流事件广播,通过该事件控制推流鉴权
extern const string kBroadcastMediaPublish;
......@@ -165,8 +164,6 @@ extern const string kAddMuteAudio;
//拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始,
//如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写)
extern const string kResetWhenRePlay;
//是否默认推流时转换成rtsp或rtmp,hook接口(on_publish)中可以覆盖该设置
extern const string kPublishToRtxp ;
//是否默认推流时转换成hls,hook接口(on_publish)中可以覆盖该设置
extern const string kPublishToHls ;
//是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置
......
......@@ -26,7 +26,7 @@ public:
_ring = std::make_shared<RingType>();
}
virtual ~HlsMediaSource() = default;
~HlsMediaSource() override = default;
/**
* 获取媒体源的环形缓冲
......@@ -47,10 +47,10 @@ public:
* 注册hls
*/
void registHls(){
if(!_registed){
regist();
if (!_registed) {
_registed = true;
onNoneReader();
onReaderChanged(0);
regist();
}
}
......@@ -62,12 +62,10 @@ private:
void modifyReaderCount(bool add) {
if (add) {
++_readerCount;
return;
}
if (--_readerCount == 0) {
onNoneReader();
} else {
--_readerCount;
}
onReaderChanged(_readerCount);
}
private:
atomic_int _readerCount;
......
......@@ -60,7 +60,7 @@ public:
MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
}
virtual ~RtmpMediaSource() {}
~RtmpMediaSource() override{}
/**
* 获取媒体源的环形缓冲
......@@ -134,7 +134,7 @@ public:
if (!_ring) {
weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
......@@ -174,6 +174,11 @@ public:
return ret;
}
void clearCache() override{
PacketCache<RtmpPacket>::clearCache();
_ring->clearCache();
}
private:
/**
* 批量flush rtmp包时触发该函数
......@@ -185,15 +190,6 @@ private:
_ring->write(rtmp_list, _have_video ? key_pos : true);
}
/**
* 每次增减消费者都会触发该函数
*/
void onReaderChanged(int size) {
if (size == 0) {
onNoneReader();
}
}
private:
bool _have_video = false;
int _ring_size;
......
......@@ -77,13 +77,12 @@ public:
/**
* 设置协议转换
* @param enableRtsp 是否转换成rtsp
* @param enableHls 是否转换成hls
* @param enableMP4 是否mp4录制
*/
void setProtocolTranslation(bool enableRtsp, bool enableHls, bool enableMP4) {
void setProtocolTranslation(bool enableHls, bool enableMP4) {
//不重复生成rtmp
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), enableRtsp, false, enableHls, enableMP4);
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), true, false, enableHls, enableMP4);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(static_pointer_cast<RtmpMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)
......
......@@ -16,7 +16,8 @@
namespace mediakit {
class RtmpMediaSourceMuxer : public RtmpMuxer {
class RtmpMediaSourceMuxer : public RtmpMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<RtmpMediaSourceMuxer> {
public:
typedef std::shared_ptr<RtmpMediaSourceMuxer> Ptr;
......@@ -27,10 +28,11 @@ public:
_media_src = std::make_shared<RtmpMediaSource>(vhost, strApp, strId);
getRtmpRing()->setDelegate(_media_src);
}
virtual ~RtmpMediaSourceMuxer(){}
~RtmpMediaSourceMuxer() override{}
void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_media_src->setListener(listener);
_listener = listener;
}
void setTimeStamp(uint32_t stamp){
......@@ -43,10 +45,36 @@ public:
void onAllTrackReady(){
makeConfigPacket();
_media_src->setListener(shared_from_this());
_media_src->setMetaData(getMetadata());
}
void onReaderChanged(MediaSource &sender, int size) override {
_enabled = size;
if (!size) {
_clear_cache = true;
}
MediaSourceEventInterceptor::onReaderChanged(sender, size);
}
void inputFrame(const Frame::Ptr &frame) override {
if (_clear_cache) {
_clear_cache = false;
_media_src->clearCache();
}
if (_enabled) {
RtmpMuxer::inputFrame(frame);
}
}
bool isEnabled() {
//缓存尚未清空时,还允许触发inputFrame函数,以便及时清空缓存
return _clear_cache ? true : _enabled;
}
private:
bool _enabled = true;
bool _clear_cache = false;
RtmpMediaSource::Ptr _media_src;
};
......
......@@ -126,7 +126,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_media_info.parse(_tc_url + "/" + getStreamId(dec.load<std::string>()));
_media_info._schema = RTMP_SCHEMA;
auto on_res = [this,pToken](const string &err, bool enableRtxp, bool enableHls, bool enableMP4){
auto on_res = [this,pToken](const string &err, bool enableHls, bool enableMP4){
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
_media_info._vhost,
_media_info._app,
......@@ -150,7 +150,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_publisher_src.reset(new RtmpMediaSourceImp(_media_info._vhost, _media_info._app, _media_info._streamid));
_publisher_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
//设置转协议
_publisher_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4);
_publisher_src->setProtocolTranslation(enableHls, enableMP4);
//如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能
getSock()->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
......@@ -159,30 +159,29 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
if(_media_info._app.empty() || _media_info._streamid.empty()){
//不允许莫名其妙的推流url
on_res("rtmp推流url非法", false, false, false);
on_res("rtmp推流url非法", false, false);
return;
}
Broadcast::PublishAuthInvoker invoker = [weak_self,on_res,pToken](const string &err, bool enableRtxp, bool enableHls, bool enableMP4){
Broadcast::PublishAuthInvoker invoker = [weak_self, on_res, pToken](const string &err, bool enableHls, bool enableMP4) {
auto strongSelf = weak_self.lock();
if(!strongSelf){
if (!strongSelf) {
return;
}
strongSelf->async([weak_self,on_res,err,pToken,enableRtxp,enableHls,enableMP4](){
strongSelf->async([weak_self, on_res, err, pToken, enableHls, enableMP4]() {
auto strongSelf = weak_self.lock();
if(!strongSelf){
if (!strongSelf) {
return;
}
on_res(err, enableRtxp, enableHls, enableMP4);
on_res(err, enableHls, enableMP4);
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认鉴权成功
GET_CONFIG(bool,to_rtxp,General::kPublishToRtxp);
GET_CONFIG(bool,to_hls,General::kPublishToHls);
GET_CONFIG(bool,to_mp4,General::kPublishToMP4);
on_res("", to_rtxp, to_hls, to_mp4);
on_res("", to_hls, to_mp4);
}
}
......
......@@ -243,7 +243,7 @@ void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableRtxp, bool enableHls, bool enableMP4) {
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
......@@ -252,7 +252,7 @@ void RtpProcess::emitOnPublish() {
strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost,
strongSelf->_media_info._app,
strongSelf->_media_info._streamid, 0,
enableRtxp, enableRtxp, enableHls, enableMP4);
true, true, enableHls, enableMP4);
strongSelf->_muxer->setMediaListener(strongSelf->_listener);
InfoP(strongSelf) << "允许RTP推流";
} else {
......@@ -264,10 +264,9 @@ void RtpProcess::emitOnPublish() {
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool, toRtxp, General::kPublishToRtxp);
GET_CONFIG(bool, toHls, General::kPublishToHls);
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
invoker("", toRtxp, toHls, toMP4);
invoker("", toHls, toMP4);
}
}
......
......@@ -56,7 +56,7 @@ public:
int ring_size = RTP_GOP_SIZE) :
MediaSource(RTSP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
virtual ~RtspMediaSource() {}
~RtspMediaSource() override{}
/**
* 获取媒体源的环形缓冲
......@@ -166,7 +166,7 @@ public:
}
if (!_ring) {
weak_ptr<RtspMediaSource> weakSelf = dynamic_pointer_cast<RtspMediaSource>(shared_from_this());
auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
......@@ -184,6 +184,11 @@ public:
PacketCache<RtpPacket>::inputPacket(rtp->type == TrackVideo, rtp, keyPos);
}
void clearCache() override{
PacketCache<RtpPacket>::clearCache();
_ring->clearCache();
}
private:
/**
* 批量flush rtp包时触发该函数
......@@ -195,15 +200,6 @@ private:
_ring->write(rtp_list, _have_video ? key_pos : true);
}
/**
* 每次增减消费者都会触发该函数
*/
void onReaderChanged(int size) {
if (size == 0) {
onNoneReader();
}
}
private:
bool _have_video = false;
int _ring_size;
......
......@@ -68,13 +68,12 @@ public:
/**
* 设置协议转换
* @param enableRtmp 是否转换成rtmp
* @param enableHls 是否转换成hls
* @param enableMP4 是否mp4录制
*/
void setProtocolTranslation(bool enableRtmp,bool enableHls,bool enableMP4){
void setProtocolTranslation(bool enableHls,bool enableMP4){
//不重复生成rtsp
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), false, enableRtmp, enableHls, enableMP4);
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), false, true, enableHls, enableMP4);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(static_pointer_cast<RtspMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)
......
......@@ -16,7 +16,8 @@
namespace mediakit {
class RtspMediaSourceMuxer : public RtspMuxer {
class RtspMediaSourceMuxer : public RtspMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<RtspMediaSourceMuxer> {
public:
typedef std::shared_ptr<RtspMediaSourceMuxer> Ptr;
......@@ -24,29 +25,56 @@ public:
const string &strApp,
const string &strId,
const TitleSdp::Ptr &title = nullptr) : RtspMuxer(title){
_mediaSouce = std::make_shared<RtspMediaSource>(vhost,strApp,strId);
getRtpRing()->setDelegate(_mediaSouce);
_media_src = std::make_shared<RtspMediaSource>(vhost,strApp,strId);
getRtpRing()->setDelegate(_media_src);
}
virtual ~RtspMediaSourceMuxer(){}
~RtspMediaSourceMuxer() override{}
void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_mediaSouce->setListener(listener);
_listener = listener;
}
int readerCount() const{
return _mediaSouce->readerCount();
return _media_src->readerCount();
}
void setTimeStamp(uint32_t stamp){
_mediaSouce->setTimeStamp(stamp);
_media_src->setTimeStamp(stamp);
}
void onAllTrackReady(){
_mediaSouce->setSdp(getSdp());
_media_src->setListener(shared_from_this());
_media_src->setSdp(getSdp());
}
void onReaderChanged(MediaSource &sender, int size) override {
_enabled = size;
if (!size) {
_clear_cache = true;
}
MediaSourceEventInterceptor::onReaderChanged(sender, size);
}
void inputFrame(const Frame::Ptr &frame) override {
if (_clear_cache) {
_clear_cache = false;
_media_src->clearCache();
}
if (_enabled) {
RtspMuxer::inputFrame(frame);
}
}
bool isEnabled() {
//缓存尚未清空时,还允许触发inputFrame函数,以便及时清空缓存
return _clear_cache ? true : _enabled;
}
private:
RtspMediaSource::Ptr _mediaSouce;
bool _enabled = true;
bool _clear_cache = false;
RtspMediaSource::Ptr _media_src;
};
......
......@@ -252,7 +252,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
send_SessionNotFound();
throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any availabe track when record" : "session not found when record");
}
auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto onRes = [this](const string &err, bool enableHls, bool enableMP4){
bool authSuccess = err.empty();
if(!authSuccess){
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
......@@ -261,7 +261,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
}
//设置转协议
_push_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4);
_push_src->setProtocolTranslation(enableHls, enableMP4);
_StrPrinter rtp_info;
for(auto &track : _sdp_track){
......@@ -283,17 +283,17 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
};
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
Broadcast::PublishAuthInvoker invoker = [weakSelf, onRes](const string &err, bool enableHls, bool enableMP4) {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
if (!strongSelf) {
return;
}
strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){
strongSelf->async([weakSelf, onRes, err, enableHls, enableMP4]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
if (!strongSelf) {
return;
}
onRes(err,enableRtxp,enableHls,enableMP4);
onRes(err, enableHls, enableMP4);
});
};
......@@ -301,10 +301,9 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){
//该事件无人监听,默认不鉴权
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
onRes("",toRtxp,toHls,toMP4);
onRes("",toHls,toMP4);
}
}
......
......@@ -141,7 +141,7 @@ void initEventListener() {
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) {
DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " "
<< args._param_strs;
invoker("", true, true, false);//鉴权成功
invoker("", true, false);//鉴权成功
//invoker("this is auth failed message");//鉴权失败
});
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论