Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
e4126809
Commit
e4126809
authored
4 years ago
by
xia-chu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
实现28181 rtcp保活:#429
parent
bba18156
隐藏空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
73 行增加
和
3 行删除
+73
-3
src/Rtp/RtpServer.cpp
+73
-3
没有找到文件。
src/Rtp/RtpServer.cpp
查看文件 @
e4126809
...
@@ -11,6 +11,7 @@
...
@@ -11,6 +11,7 @@
#if defined(ENABLE_RTPPROXY)
#if defined(ENABLE_RTPPROXY)
#include "RtpServer.h"
#include "RtpServer.h"
#include "RtpSelector.h"
#include "RtpSelector.h"
#include "Rtcp/RtcpContext.h"
namespace
mediakit
{
namespace
mediakit
{
RtpServer
::
RtpServer
()
{
RtpServer
::
RtpServer
()
{
...
@@ -22,20 +23,86 @@ RtpServer::~RtpServer() {
...
@@ -22,20 +23,86 @@ RtpServer::~RtpServer() {
}
}
}
}
class
RtcpHelper
:
public
RtcpContext
,
public
std
::
enable_shared_from_this
<
RtcpHelper
>
{
public
:
using
Ptr
=
std
::
shared_ptr
<
RtcpHelper
>
;
RtcpHelper
(
Socket
::
Ptr
rtcp_sock
,
uint32_t
sample_rate
)
:
RtcpContext
(
sample_rate
,
true
){
_rtcp_sock
=
std
::
move
(
rtcp_sock
);
_sample_rate
=
sample_rate
;
}
void
onRecvRtp
(
const
Buffer
::
Ptr
&
buf
,
struct
sockaddr
*
addr
,
int
addr_len
){
//统计rtp接受情况,用于发送rr包
auto
header
=
(
RtpHeader
*
)
buf
->
data
();
onRtp
(
ntohs
(
header
->
seq
),
ntohl
(
header
->
stamp
)
*
uint64_t
(
1000
)
/
_sample_rate
,
buf
->
size
());
sendRtcp
(
ntohl
(
header
->
ssrc
),
addr
,
addr_len
);
}
void
startRtcp
(){
weak_ptr
<
RtcpHelper
>
weak_self
=
shared_from_this
();
_rtcp_sock
->
setOnRead
([
weak_self
](
const
Buffer
::
Ptr
&
buf
,
struct
sockaddr
*
addr
,
int
addr_len
)
{
//用于接受rtcp打洞包
auto
strong_self
=
weak_self
.
lock
();
if
(
!
strong_self
)
{
return
;
}
if
(
!
strong_self
->
_rtcp_addr
)
{
//只设置一次rtcp对端端口
strong_self
->
_rtcp_addr
=
std
::
make_shared
<
struct
sockaddr
>
();
memcpy
(
strong_self
->
_rtcp_addr
.
get
(),
addr
,
addr_len
);
}
auto
rtcps
=
RtcpHeader
::
loadFromBytes
(
buf
->
data
(),
buf
->
size
());
for
(
auto
&
rtcp
:
rtcps
)
{
strong_self
->
onRtcp
(
rtcp
);
}
});
}
private
:
void
sendRtcp
(
uint32_t
rtp_ssrc
,
struct
sockaddr
*
addr
,
int
addr_len
){
//每5秒发送一次rtcp
if
(
_ticker
.
elapsedTime
()
<
5000
)
{
return
;
}
_ticker
.
resetTime
();
auto
rtcp_addr
=
_rtcp_addr
.
get
();
if
(
!
rtcp_addr
)
{
//默认的,rtcp端口为rtp端口+1
((
sockaddr_in
*
)
addr
)
->
sin_port
=
htons
(
ntohs
(((
sockaddr_in
*
)
addr
)
->
sin_port
)
+
1
);
//未收到rtcp打洞包时,采用默认的rtcp端口
rtcp_addr
=
addr
;
}
_rtcp_sock
->
send
(
createRtcpRR
(
rtp_ssrc
+
1
,
rtp_ssrc
),
rtcp_addr
,
addr_len
);
}
private
:
Ticker
_ticker
;
Socket
::
Ptr
_rtcp_sock
;
uint32_t
_sample_rate
;
std
::
shared_ptr
<
struct
sockaddr
>
_rtcp_addr
;
};
void
RtpServer
::
start
(
uint16_t
local_port
,
const
string
&
stream_id
,
bool
enable_tcp
,
const
char
*
local_ip
)
{
void
RtpServer
::
start
(
uint16_t
local_port
,
const
string
&
stream_id
,
bool
enable_tcp
,
const
char
*
local_ip
)
{
//创建udp服务器
//创建udp服务器
Socket
::
Ptr
udp_server
=
Socket
::
createSocket
(
nullptr
,
true
);
Socket
::
Ptr
udp_server
=
Socket
::
createSocket
(
nullptr
,
true
);
Socket
::
Ptr
rtcp_server
=
Socket
::
createSocket
(
nullptr
,
true
);
if
(
local_port
==
0
)
{
if
(
local_port
==
0
)
{
//随机端口,rtp端口采用偶数
//随机端口,rtp端口采用偶数
Socket
::
Ptr
rtcp_server
=
Socket
::
createSocket
(
nullptr
,
true
);
auto
pair
=
std
::
make_pair
(
udp_server
,
rtcp_server
);
auto
pair
=
std
::
make_pair
(
udp_server
,
rtcp_server
);
makeSockPair
(
pair
,
local_ip
);
makeSockPair
(
pair
,
local_ip
);
//取偶数端口
//取偶数端口
udp_server
=
pair
.
first
;
udp_server
=
pair
.
first
;
rtcp_server
=
pair
.
second
;
}
else
if
(
!
udp_server
->
bindUdpSock
(
local_port
,
local_ip
))
{
}
else
if
(
!
udp_server
->
bindUdpSock
(
local_port
,
local_ip
))
{
//用户指定端口
//用户指定端口
throw
std
::
runtime_error
(
StrPrinter
<<
"bindUdpSock on "
<<
local_ip
<<
":"
<<
local_port
<<
" failed:"
<<
get_uv_errmsg
(
true
));
throw
std
::
runtime_error
(
StrPrinter
<<
"创建rtp端口 "
<<
local_ip
<<
":"
<<
local_port
<<
" 失败:"
<<
get_uv_errmsg
(
true
));
}
else
if
(
!
rtcp_server
->
bindUdpSock
(
udp_server
->
get_local_port
()
+
1
,
local_ip
))
{
// rtcp端口
throw
std
::
runtime_error
(
StrPrinter
<<
"创建rtcp端口 "
<<
local_ip
<<
":"
<<
local_port
<<
" 失败:"
<<
get_uv_errmsg
(
true
));
}
}
//设置udp socket读缓存
//设置udp socket读缓存
SockUtil
::
setRecvBuf
(
udp_server
->
rawFD
(),
4
*
1024
*
1024
);
SockUtil
::
setRecvBuf
(
udp_server
->
rawFD
(),
4
*
1024
*
1024
);
...
@@ -51,7 +118,10 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable
...
@@ -51,7 +118,10 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable
if
(
!
stream_id
.
empty
())
{
if
(
!
stream_id
.
empty
())
{
//指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流)
//指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流)
process
=
RtpSelector
::
Instance
().
getProcess
(
stream_id
,
true
);
process
=
RtpSelector
::
Instance
().
getProcess
(
stream_id
,
true
);
udp_server
->
setOnRead
([
udp_server
,
process
](
const
Buffer
::
Ptr
&
buf
,
struct
sockaddr
*
addr
,
int
)
{
RtcpHelper
::
Ptr
helper
=
std
::
make_shared
<
RtcpHelper
>
(
std
::
move
(
rtcp_server
),
90000
);
helper
->
startRtcp
();
udp_server
->
setOnRead
([
udp_server
,
process
,
helper
](
const
Buffer
::
Ptr
&
buf
,
struct
sockaddr
*
addr
,
int
addr_len
)
{
helper
->
onRecvRtp
(
buf
,
addr
,
addr_len
);
process
->
inputRtp
(
true
,
udp_server
,
buf
->
data
(),
buf
->
size
(),
addr
);
process
->
inputRtp
(
true
,
udp_server
,
buf
->
data
(),
buf
->
size
(),
addr
);
});
});
}
else
{
}
else
{
...
...
This diff is collapsed.
Click to expand it.
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论