Commit b23cbaa0 by xiongziliang

on_publish hook新增continue_push_ms参数,用于断连续推延时控制

parent b0beea77
...@@ -323,6 +323,9 @@ void installWebHook(){ ...@@ -323,6 +323,9 @@ void installWebHook(){
if (obj.isMember("enable_fmp4")) { if (obj.isMember("enable_fmp4")) {
option.enable_fmp4 = obj["enable_fmp4"].asBool(); option.enable_fmp4 = obj["enable_fmp4"].asBool();
} }
if (obj.isMember("continue_push_ms")) {
option.continue_push_ms = obj["continue_push_ms"].asUInt();
}
invoker(err, option); invoker(err, option);
} else { } else {
//推流鉴权失败 //推流鉴权失败
......
...@@ -26,11 +26,14 @@ ProtocolOption::ProtocolOption() { ...@@ -26,11 +26,14 @@ ProtocolOption::ProtocolOption() {
GET_CONFIG(bool, s_to_mp4, General::kPublishToMP4); GET_CONFIG(bool, s_to_mp4, General::kPublishToMP4);
GET_CONFIG(bool, s_enabel_audio, General::kEnableAudio); GET_CONFIG(bool, s_enabel_audio, General::kEnableAudio);
GET_CONFIG(bool, s_add_mute_audio, General::kAddMuteAudio); GET_CONFIG(bool, s_add_mute_audio, General::kAddMuteAudio);
GET_CONFIG(uint32_t, s_continue_push_ms, General::kContinuePushMS);
enable_hls = s_to_hls; enable_hls = s_to_hls;
enable_mp4 = s_to_mp4; enable_mp4 = s_to_mp4;
enable_audio = s_enabel_audio; enable_audio = s_enabel_audio;
add_mute_audio = s_add_mute_audio; add_mute_audio = s_add_mute_audio;
continue_push_ms = s_continue_push_ms;
} }
static std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, size_t max_second){ static std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, size_t max_second){
......
...@@ -52,6 +52,9 @@ public: ...@@ -52,6 +52,9 @@ public:
//hls录制保存路径 //hls录制保存路径
std::string hls_save_path; std::string hls_save_path;
//断连续推延时,单位毫秒,默认采用配置文件
uint32_t continue_push_ms;
}; };
class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public std::enable_shared_from_this<MultiMediaSourceMuxer>{ class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
......
...@@ -44,14 +44,13 @@ void RtmpSession::onError(const SockException& err) { ...@@ -44,14 +44,13 @@ void RtmpSession::onError(const SockException& err) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, is_player, static_cast<SockInfo &>(*this)); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, is_player, static_cast<SockInfo &>(*this));
} }
GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS);
//如果是主动关闭的,那么不延迟注销 //如果是主动关闭的,那么不延迟注销
if (_push_src && continue_push_ms && err.getErrCode() != Err_shutdown) { if (_push_src && _continue_push_ms && err.getErrCode() != Err_shutdown) {
//取消所有权 //取消所有权
_push_src_ownership = nullptr; _push_src_ownership = nullptr;
//延时10秒注销流 //延时10秒注销流
auto push_src = std::move(_push_src); auto push_src = std::move(_push_src);
getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; });
} }
} }
...@@ -184,7 +183,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -184,7 +183,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
} }
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_continue_push_ms = option.continue_push_ms;
sendStatus({"level", "status", sendStatus({"level", "status",
"code", "NetStream.Publish.Start", "code", "NetStream.Publish.Start",
"description", "Started publishing stream.", "description", "Started publishing stream.",
......
...@@ -89,6 +89,8 @@ private: ...@@ -89,6 +89,8 @@ private:
private: private:
bool _set_meta_data = false; bool _set_meta_data = false;
double _recv_req_id = 0; double _recv_req_id = 0;
//断连续推延时
uint32_t _continue_push_ms = 0;
//消耗的总流量 //消耗的总流量
uint64_t _total_bytes = 0; uint64_t _total_bytes = 0;
std::string _tc_url; std::string _tc_url;
......
...@@ -86,14 +86,13 @@ void RtspSession::onError(const SockException &err) { ...@@ -86,14 +86,13 @@ void RtspSession::onError(const SockException &err) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, is_player, static_cast<SockInfo &>(*this)); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, is_player, static_cast<SockInfo &>(*this));
} }
GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS);
//如果是主动关闭的,那么不延迟注销 //如果是主动关闭的,那么不延迟注销
if (_push_src && continue_push_ms && err.getErrCode() != Err_shutdown) { if (_push_src && _continue_push_ms && err.getErrCode() != Err_shutdown) {
//取消所有权 //取消所有权
_push_src_ownership = nullptr; _push_src_ownership = nullptr;
//延时10秒注销流 //延时10秒注销流
auto push_src = std::move(_push_src); auto push_src = std::move(_push_src);
getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; });
} }
} }
...@@ -280,6 +279,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { ...@@ -280,6 +279,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
} }
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_continue_push_ms = option.continue_push_ms;
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
}; };
......
...@@ -163,6 +163,9 @@ private: ...@@ -163,6 +163,9 @@ private:
private: private:
//是否已经触发on_play事件 //是否已经触发on_play事件
bool _emit_on_play = false; bool _emit_on_play = false;
bool _send_sr_rtcp[2] = {true, true};
//断连续推延时
uint32_t _continue_push_ms = 0;
//推流或拉流客户端采用的rtp传输方式 //推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid; Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid;
//收到的seq,回复时一致 //收到的seq,回复时一致
...@@ -213,7 +216,6 @@ private: ...@@ -213,7 +216,6 @@ private:
toolkit::Ticker _rtcp_send_tickers[2]; toolkit::Ticker _rtcp_send_tickers[2];
//统计rtp并发送rtcp //统计rtp并发送rtcp
std::vector<RtcpContext::Ptr> _rtcp_context; std::vector<RtcpContext::Ptr> _rtcp_context;
bool _send_sr_rtcp[2] = {true, true};
}; };
/** /**
......
...@@ -16,8 +16,9 @@ using namespace mediakit; ...@@ -16,8 +16,9 @@ using namespace mediakit;
WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller, WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller,
const RtspMediaSourceImp::Ptr &src, const RtspMediaSourceImp::Ptr &src,
const std::shared_ptr<void> &ownership, const std::shared_ptr<void> &ownership,
const MediaInfo &info) { const MediaInfo &info,
WebRtcPusher::Ptr ret(new WebRtcPusher(poller, src, ownership, info), [](WebRtcPusher *ptr) { const mediakit::ProtocolOption &option) {
WebRtcPusher::Ptr ret(new WebRtcPusher(poller, src, ownership, info, option), [](WebRtcPusher *ptr) {
ptr->onDestory(); ptr->onDestory();
delete ptr; delete ptr;
}); });
...@@ -28,10 +29,12 @@ WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller, ...@@ -28,10 +29,12 @@ WebRtcPusher::Ptr WebRtcPusher::create(const EventPoller::Ptr &poller,
WebRtcPusher::WebRtcPusher(const EventPoller::Ptr &poller, WebRtcPusher::WebRtcPusher(const EventPoller::Ptr &poller,
const RtspMediaSourceImp::Ptr &src, const RtspMediaSourceImp::Ptr &src,
const std::shared_ptr<void> &ownership, const std::shared_ptr<void> &ownership,
const MediaInfo &info) : WebRtcTransportImp(poller) { const MediaInfo &info,
const mediakit::ProtocolOption &option) : WebRtcTransportImp(poller) {
_media_info = info; _media_info = info;
_push_src = src; _push_src = src;
_push_src_ownership = ownership; _push_src_ownership = ownership;
_continue_push_ms = option.continue_push_ms;
CHECK(_push_src); CHECK(_push_src);
} }
...@@ -130,13 +133,12 @@ void WebRtcPusher::onDestory() { ...@@ -130,13 +133,12 @@ void WebRtcPusher::onDestory() {
} }
} }
GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS); if (_push_src && _continue_push_ms) {
if (_push_src && continue_push_ms) {
//取消所有权 //取消所有权
_push_src_ownership = nullptr; _push_src_ownership = nullptr;
//延时10秒注销流 //延时10秒注销流
auto push_src = std::move(_push_src); auto push_src = std::move(_push_src);
getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; }); getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; });
} }
} }
......
...@@ -18,7 +18,7 @@ public: ...@@ -18,7 +18,7 @@ public:
using Ptr = std::shared_ptr<WebRtcPusher>; using Ptr = std::shared_ptr<WebRtcPusher>;
~WebRtcPusher() override = default; ~WebRtcPusher() override = default;
static Ptr create(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src, static Ptr create(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src,
const std::shared_ptr<void> &ownership, const mediakit::MediaInfo &info); const std::shared_ptr<void> &ownership, const mediakit::MediaInfo &info, const mediakit::ProtocolOption &option);
protected: protected:
///////WebRtcTransportImp override/////// ///////WebRtcTransportImp override///////
...@@ -42,10 +42,12 @@ protected: ...@@ -42,10 +42,12 @@ protected:
private: private:
WebRtcPusher(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src, WebRtcPusher(const EventPoller::Ptr &poller, const mediakit::RtspMediaSourceImp::Ptr &src,
const std::shared_ptr<void> &ownership, const mediakit::MediaInfo &info); const std::shared_ptr<void> &ownership, const mediakit::MediaInfo &info, const mediakit::ProtocolOption &option);
private: private:
bool _simulcast = false; bool _simulcast = false;
//断连续推延时
uint32_t _continue_push_ms = 0;
//媒体相关元数据 //媒体相关元数据
mediakit::MediaInfo _media_info; mediakit::MediaInfo _media_info;
//推流的rtsp源 //推流的rtsp源
......
...@@ -1086,7 +1086,7 @@ void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &arg ...@@ -1086,7 +1086,7 @@ void push_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &arg
push_src_ownership = push_src->getOwnership(); push_src_ownership = push_src->getOwnership();
push_src->setProtocolOption(option); push_src->setProtocolOption(option);
} }
auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info); auto rtc = WebRtcPusher::create(EventPollerPool::Instance().getPoller(), push_src, push_src_ownership, info, option);
push_src->setListener(rtc); push_src->setListener(rtc);
cb(*rtc); cb(*rtc);
}; };
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论