Commit 539d6299 by xiongziliang

rtp服务器单端口模式收到相同ssrc推流时延时关闭RtpSession对象(防止频繁创建销毁对象)

parent c2e10834
...@@ -42,7 +42,7 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { ...@@ -42,7 +42,7 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
} }
if (it != _map_rtp_process.end() && makeNew) { if (it != _map_rtp_process.end() && makeNew) {
//已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题 //已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题
throw std::runtime_error(StrPrinter << "RtpProcess(" << stream_id << ") already existed"); throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id << ") already existed");
} }
RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id];
if (!ref) { if (!ref) {
......
...@@ -44,6 +44,13 @@ public: ...@@ -44,6 +44,13 @@ public:
RtpSelector() = default; RtpSelector() = default;
~RtpSelector() = default; ~RtpSelector() = default;
class ProcessExisted : public std::runtime_error {
public:
template<typename ...T>
ProcessExisted(T && ...args) : std::runtime_error(std::forward<T>(args)...) {}
~ProcessExisted() override = default;
};
static bool getSSRC(const char *data,size_t data_len, uint32_t &ssrc); static bool getSSRC(const char *data,size_t data_len, uint32_t &ssrc);
static RtpSelector &Instance(); static RtpSelector &Instance();
......
...@@ -76,6 +76,15 @@ void RtpSession::onManager() { ...@@ -76,6 +76,15 @@ void RtpSession::onManager() {
} }
void RtpSession::onRtpPacket(const char *data, size_t len) { void RtpSession::onRtpPacket(const char *data, size_t len) {
if (_delay_close) {
// 正在延时关闭中,忽略所有数据
return;
}
if (!isRtp(data, len)) {
// 忽略非rtp数据
WarnP(this) << "Not rtp packet";
return;
}
if (!_is_udp) { if (!_is_udp) {
if (_search_rtp) { if (_search_rtp) {
//搜索上下文期间,数据丢弃 //搜索上下文期间,数据丢弃
...@@ -94,10 +103,6 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { ...@@ -94,10 +103,6 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
} }
} }
if (!_process) { if (!_process) {
if (!isRtp(data, len)) {
WarnP(this) << "Not rtp packet";
return;
}
//未设置ssrc时,尝试获取ssrc //未设置ssrc时,尝试获取ssrc
if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) { if (!_ssrc && !RtpSelector::getSSRC(data, len, _ssrc)) {
return; return;
...@@ -106,8 +111,18 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { ...@@ -106,8 +111,18 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
//未指定流id就使用ssrc为流id //未指定流id就使用ssrc为流id
_stream_id = printSSRC(_ssrc); _stream_id = printSSRC(_ssrc);
} }
//tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess try {
_process = RtpSelector::Instance().getProcess(_stream_id, true); _process = RtpSelector::Instance().getProcess(_stream_id, true);
} catch (RtpSelector::ProcessExisted &ex) {
if (!_is_udp) {
// tcp情况下立即断开连接
throw;
}
// udp情况下延时断开连接(等待超时自动关闭),防止频繁创建销毁RtpSession对象
WarnP(this) << ex.what();
_delay_close = true;
return;
}
_process->setOnlyAudio(_only_audio); _process->setOnlyAudio(_only_audio);
_process->setDelegate(dynamic_pointer_cast<RtpSession>(shared_from_this())); _process->setDelegate(dynamic_pointer_cast<RtpSession>(shared_from_this()));
} }
......
...@@ -43,6 +43,7 @@ protected: ...@@ -43,6 +43,7 @@ protected:
const char *onSearchPacketTail(const char *data, size_t len) override; const char *onSearchPacketTail(const char *data, size_t len) override;
private: private:
bool _delay_close = false;
bool _is_udp = false; bool _is_udp = false;
bool _search_rtp = false; bool _search_rtp = false;
bool _search_rtp_finished = false; bool _search_rtp_finished = false;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论