Commit 9f0c15a4 by xiongziliang

startSendRtp接口支持rtcp接收超时主动停止

parent b7ef7667
ZLToolKit @ 658271fd
Subproject commit 4e127f53a93dd1b7665f04a0ec7312d67a34ab6f Subproject commit 658271fdf4fb497b0665c06544627789844a003a
...@@ -1562,6 +1562,12 @@ ...@@ -1562,6 +1562,12 @@
"value": "1", "value": "1",
"description": "rtp es方式打包时,是否只打包音频,该参数非必选参数", "description": "rtp es方式打包时,是否只打包音频,该参数非必选参数",
"disabled": true "disabled": true
},
{
"key": "udp_rtcp_timeout",
"value": "0",
"description": "udp方式推流时,是否开启rtcp发送和rtcp接收超时判断,开启后(默认关闭),如果接收rr rtcp超时,将导致主动停止rtp发送",
"disabled": true
} }
] ]
} }
......
...@@ -1147,7 +1147,8 @@ void installWebApi() { ...@@ -1147,7 +1147,8 @@ void installWebApi() {
args.src_port = allArgs["src_port"]; args.src_port = allArgs["src_port"];
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>(); args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>(); args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>();
args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as<bool>(); args.only_audio = allArgs["only_audio"].as<bool>();
args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"];
TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
src->getOwnerPoller()->async([=]() mutable { src->getOwnerPoller()->async([=]() mutable {
...@@ -1178,7 +1179,7 @@ void installWebApi() { ...@@ -1178,7 +1179,7 @@ void installWebApi() {
args.src_port = allArgs["src_port"]; args.src_port = allArgs["src_port"];
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>(); args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>(); args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>();
args.only_audio = allArgs["only_audio"].empty() ? false : allArgs["only_audio"].as<bool>(); args.only_audio = allArgs["only_audio"].as<bool>();
TraceL << "startSendRtpPassive, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; TraceL << "startSendRtpPassive, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio;
src->getOwnerPoller()->async([=]() mutable { src->getOwnerPoller()->async([=]() mutable {
......
...@@ -111,6 +111,15 @@ public: ...@@ -111,6 +111,15 @@ public:
uint16_t dst_port; uint16_t dst_port;
// 发送目标主机地址,可以是ip或域名 // 发送目标主机地址,可以是ip或域名
std::string dst_url; std::string dst_url;
//udp发送时,是否开启rr rtcp接收超时判断
bool udp_rtcp_timeout = false;
//tcp被动发送服务器延时关闭事件,单位毫秒
uint32_t tcp_passive_close_delay_ms = 5 * 1000;
//udp 发送时,rr rtcp包接收超时时间,单位毫秒
uint32_t rtcp_timeout_ms = 30 * 1000;
//udp 发送时,发送sr rtcp包间隔,单位毫秒
uint32_t rtcp_send_interval_ms = 5 * 1000;
}; };
// 开始发送ps-rtp // 开始发送ps-rtp
......
...@@ -223,7 +223,7 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const MediaSourceEvent:: ...@@ -223,7 +223,7 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const MediaSourceEvent::
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
auto rtp_sender = std::make_shared<RtpSender>(); auto rtp_sender = std::make_shared<RtpSender>();
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this(); weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
rtp_sender->startSend(args, [args, weak_self, rtp_sender, cb](uint16_t local_port, const SockException &ex) { rtp_sender->startSend(args, [args, weak_self, rtp_sender, cb](uint16_t local_port, const SockException &ex) mutable {
cb(local_port, ex); cb(local_port, ex);
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self || ex) { if (!strong_self || ex) {
...@@ -233,7 +233,15 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const MediaSourceEvent:: ...@@ -233,7 +233,15 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const MediaSourceEvent::
rtp_sender->addTrack(track); rtp_sender->addTrack(track);
} }
rtp_sender->addTrackCompleted(); rtp_sender->addTrackCompleted();
strong_self->_rtp_sender[args.ssrc] = rtp_sender;
auto ssrc = args.ssrc;
rtp_sender->setOnClose([weak_self, ssrc]() {
if (auto strong_self = weak_self.lock()) {
strong_self->_rtp_sender.erase(ssrc);
WarnL << "stream:" << strong_self->_get_origin_url() << " stop send rtp:" << ssrc;
}
});
strong_self->_rtp_sender[args.ssrc] = std::move(rtp_sender);
}); });
#else #else
cb(0, SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); cb(0, SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏"));
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "PSEncoder.h" #include "PSEncoder.h"
#include "Extension/CommonRtp.h" #include "Extension/CommonRtp.h"
#include "Rtcp/RtcpContext.h"
namespace mediakit{ namespace mediakit{
...@@ -53,6 +54,8 @@ public: ...@@ -53,6 +54,8 @@ public:
*/ */
virtual void resetTracks() override; virtual void resetTracks() override;
void setOnClose(std::function<void()> on_close);
private: private:
//合并写输出 //合并写输出
void onFlushRtpList(std::shared_ptr<toolkit::List<toolkit::Buffer::Ptr> > rtp_list); void onFlushRtpList(std::shared_ptr<toolkit::List<toolkit::Buffer::Ptr> > rtp_list);
...@@ -60,14 +63,23 @@ private: ...@@ -60,14 +63,23 @@ private:
void onConnect(); void onConnect();
//异常断开socket事件 //异常断开socket事件
void onErr(const toolkit::SockException &ex, bool is_connect = false); void onErr(const toolkit::SockException &ex, bool is_connect = false);
void createRtcpSocket();
void onRecvRtcp(RtcpHeader *rtcp);
void onSendRtpUdp(const toolkit::Buffer::Ptr &buf, bool check);
void onClose();
private: private:
bool _is_connect = false; bool _is_connect = false;
MediaSourceEvent::SendRtpArgs _args; MediaSourceEvent::SendRtpArgs _args;
toolkit::Socket::Ptr _socket; toolkit::Socket::Ptr _socket_rtp;
toolkit::Socket::Ptr _socket_rtcp;
toolkit::EventPoller::Ptr _poller; toolkit::EventPoller::Ptr _poller;
toolkit::Timer::Ptr _connect_timer; toolkit::Timer::Ptr _connect_timer;
MediaSinkInterface::Ptr _interface; MediaSinkInterface::Ptr _interface;
std::shared_ptr<RtcpContext> _rtcp_context;
toolkit::Ticker _rtcp_send_ticker;
toolkit::Ticker _rtcp_recv_ticker;
std::function<void()> _on_close;
}; };
}//namespace mediakit }//namespace mediakit
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论