Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
a981ce8c
Commit
a981ce8c
authored
Feb 25, 2023
by
ziyue
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
优化webrtc nack算法:#2249
parent
a15053ff
隐藏空白字符变更
内嵌
并排
正在显示
3 个修改的文件
包含
124 行增加
和
72 行删除
+124
-72
tests/test_rtcp_nack.cpp
+27
-9
webrtc/Nack.cpp
+91
-60
webrtc/Nack.h
+6
-3
没有找到文件。
tests/test_rtcp_nack.cpp
查看文件 @
a981ce8c
...
@@ -15,20 +15,38 @@ using namespace std;
...
@@ -15,20 +15,38 @@ using namespace std;
using
namespace
toolkit
;
using
namespace
toolkit
;
using
namespace
mediakit
;
using
namespace
mediakit
;
extern
void
testFCI
();
int
main
()
{
int
main
()
{
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
());
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
());
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
srand
((
unsigned
)
time
(
NULL
));
srand
((
unsigned
)
time
(
NULL
));
NackContext
ctx
;
NackContext
ctx
;
for
(
int
i
=
1
;
i
<
1000
;
++
i
)
{
ctx
.
setOnNack
([](
const
FCI_NACK
&
nack
){
if
(
i
%
(
1
+
(
rand
()
%
30
))
==
0
)
{
InfoL
<<
nack
.
dumpString
();
DebugL
<<
"drop:"
<<
i
;
});
auto
drop_start
=
0
;
auto
drop_len
=
0
;
uint16_t
offset
=
0xFFFF
-
200
-
50
;
for
(
int
i
=
1
;
i
<
10000
;
++
i
)
{
if
(
i
%
100
==
0
)
{
drop_start
=
i
+
rand
()
%
16
;
drop_len
=
4
+
rand
()
%
16
;
InfoL
<<
"start drop:"
<<
(
uint16_t
)(
drop_start
+
offset
)
<<
" -> "
<<
(
uint16_t
)(
drop_start
+
offset
+
drop_len
);
}
uint16_t
seq
=
i
+
offset
;
if
((
i
>=
drop_start
&&
i
<=
drop_start
+
drop_len
)
||
seq
==
65535
||
seq
==
0
||
seq
==
1
)
{
TraceL
<<
"drop:"
<<
(
uint16_t
)(
i
+
offset
);
}
else
{
}
else
{
ctx
.
received
(
i
);
static
auto
last_seq
=
seq
;
if
(
seq
-
last_seq
>
16
)
{
ctx
.
received
(
last_seq
);
ctx
.
received
(
seq
);
DebugL
<<
"seq reduce:"
<<
last_seq
;
last_seq
=
seq
;
}
else
{
ctx
.
received
(
seq
);
}
}
}
}
}
sleep
(
1
);
sleep
(
1
);
...
...
webrtc/Nack.cpp
查看文件 @
a981ce8c
...
@@ -27,7 +27,7 @@ void NackList::pushBack(RtpPacket::Ptr rtp) {
...
@@ -27,7 +27,7 @@ void NackList::pushBack(RtpPacket::Ptr rtp) {
}
}
_cache_ms_check
=
0
;
_cache_ms_check
=
0
;
while
(
getCacheMS
()
>=
kMaxNackMS
)
{
while
(
getCacheMS
()
>=
kMaxNackMS
)
{
//需要清除部分nack缓存
//
需要清除部分nack缓存
popFront
();
popFront
();
}
}
}
}
...
@@ -36,7 +36,7 @@ void NackList::forEach(const FCI_NACK &nack, const function<void(const RtpPacket
...
@@ -36,7 +36,7 @@ void NackList::forEach(const FCI_NACK &nack, const function<void(const RtpPacket
auto
seq
=
nack
.
getPid
();
auto
seq
=
nack
.
getPid
();
for
(
auto
bit
:
nack
.
getBitArray
())
{
for
(
auto
bit
:
nack
.
getBitArray
())
{
if
(
bit
)
{
if
(
bit
)
{
//丢包
//
丢包
RtpPacket
::
Ptr
*
ptr
=
getRtp
(
seq
);
RtpPacket
::
Ptr
*
ptr
=
getRtp
(
seq
);
if
(
ptr
)
{
if
(
ptr
)
{
func
(
*
ptr
);
func
(
*
ptr
);
...
@@ -79,7 +79,7 @@ uint32_t NackList::getCacheMS() {
...
@@ -79,7 +79,7 @@ uint32_t NackList::getCacheMS() {
if
(
back_stamp
>=
front_stamp
)
{
if
(
back_stamp
>=
front_stamp
)
{
return
back_stamp
-
front_stamp
;
return
back_stamp
-
front_stamp
;
}
}
//很有可能回环了
//
很有可能回环了
return
back_stamp
+
(
UINT32_MAX
-
front_stamp
);
return
back_stamp
+
(
UINT32_MAX
-
front_stamp
);
}
}
return
0
;
return
0
;
...
@@ -95,90 +95,122 @@ int64_t NackList::getRtpStamp(uint16_t seq) {
...
@@ -95,90 +95,122 @@ int64_t NackList::getRtpStamp(uint16_t seq) {
////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////
NackContext
::
NackContext
()
{
setOnNack
(
nullptr
);
}
void
NackContext
::
received
(
uint16_t
seq
,
bool
is_rtx
)
{
void
NackContext
::
received
(
uint16_t
seq
,
bool
is_rtx
)
{
if
(
!
_last_max_seq
&&
_seq
.
empty
())
{
if
(
!
_started
)
{
_last_max_seq
=
seq
-
1
;
// 记录第一个seq
_started
=
true
;
_nack_seq
=
seq
-
1
;
}
}
if
(
is_rtx
||
(
seq
<
_last_max_seq
&&
!
(
seq
<
1024
&&
_last_max_seq
>
UINT16_MAX
-
1024
)))
{
//重传包或
if
(
seq
<
_nack_seq
&&
_nack_seq
!=
UINT16_MAX
&&
seq
<
1024
&&
_nack_seq
>
UINT16_MAX
-
1024
)
{
// seq回退,且非回环,那么这个应该是重传包
// seq回环,清空回环前状态
onRtx
(
seq
);
makeNack
(
UINT16_MAX
,
true
);
// 等待下一个seq为0的包
_nack_seq
=
UINT16_MAX
;
_seq
.
emplace
(
seq
);
return
;
}
if
(
is_rtx
||
(
seq
<
_nack_seq
&&
_nack_seq
!=
UINT16_MAX
))
{
// seq非回环回退包,猜测其为重传包,清空其nack状态
clearNackStatus
(
seq
);
return
;
}
auto
pr
=
_seq
.
emplace
(
seq
);
if
(
!
pr
.
second
)
{
// seq重复, 忽略
return
;
return
;
}
}
_seq
.
emplace
(
seq
);
auto
max_seq
=
*
_seq
.
rbegin
();
auto
max_seq
=
*
_seq
.
rbegin
();
auto
min_seq
=
*
_seq
.
begin
();
auto
min_seq
=
*
_seq
.
begin
();
auto
diff
=
max_seq
-
min_seq
;
auto
diff
=
max_seq
-
min_seq
;
if
(
!
diff
)
{
if
(
diff
>
(
UINT16_MAX
>>
1
))
{
// 回环后,收到回环前的大值seq, 忽略掉
_seq
.
erase
(
max_seq
);
return
;
return
;
}
}
if
(
min_seq
==
(
uint16_t
)(
_nack_seq
+
1
)
&&
_seq
.
size
()
==
diff
+
1
)
{
if
(
diff
>
UINT16_MAX
/
2
)
{
// 都是连续的seq,未丢包
//回环
_seq
.
clear
();
_seq
.
clear
();
_last_max_seq
=
min_seq
;
_nack_seq
=
max_seq
;
_nack_send_status
.
clear
();
}
else
{
// seq不连续,有丢包
makeNack
(
max_seq
,
false
);
}
}
void
NackContext
::
makeNack
(
uint16_t
max_seq
,
bool
flush
)
{
// 尝试移除前面部分连续的seq
eraseFrontSeq
();
if
(
max_seq
==
_nack_seq
)
{
// 完成所有seq丢包判断
return
;
return
;
}
}
if
(
_seq
.
size
()
==
(
size_t
)
diff
+
1
&&
_last_max_seq
+
1
==
min_seq
)
{
// 有丢包,丢包从_last_max_seq开始统计丢包
//都是连续的seq,未丢包
uint16_t
nack_rtp_count
;
_seq
.
clear
();
if
(
flush
)
{
_last_max_seq
=
max_seq
;
nack_rtp_count
=
max_seq
-
_nack_seq
-
1
;
nack_rtp_count
=
MIN
(
FCI_NACK
::
kBitSize
,
nack_rtp_count
);
}
else
{
}
else
{
// seq不连续,有丢包
nack_rtp_count
=
FCI_NACK
::
kBitSize
;
if
(
min_seq
==
_last_max_seq
+
1
)
{
}
//前面部分seq是连续的,未丢包,移除之
auto
max_nack
=
5u
;
eraseFrontSeq
();
// 最多生成5个nack包,防止seq大幅跳跃导致一直循环
while
(
max_nack
--
&&
max_seq
>
(
uint16_t
)(
nack_rtp_count
+
_nack_seq
))
{
vector
<
bool
>
vec
;
vec
.
resize
(
nack_rtp_count
,
false
);
for
(
size_t
i
=
0
;
i
<
nack_rtp_count
;
++
i
)
{
vec
[
i
]
=
_seq
.
find
((
uint16_t
)(
_nack_seq
+
i
+
2
))
==
_seq
.
end
();
}
}
doNack
(
FCI_NACK
(
_nack_seq
+
1
,
vec
),
true
);
//有丢包,丢包从_last_max_seq开始
_nack_seq
+=
nack_rtp_count
+
1
;
auto
nack_rtp_count
=
FCI_NACK
::
kBitSize
;
if
(
_nack_seq
>=
max_seq
)
{
if
(
max_seq
>
nack_rtp_count
+
_last_max_seq
)
{
_seq
.
clear
();
vector
<
bool
>
vec
;
}
else
{
vec
.
resize
(
FCI_NACK
::
kBitSize
,
false
);
// 返回第一个比_last_max_seq大的元素
for
(
size_t
i
=
0
;
i
<
nack_rtp_count
;
++
i
)
{
auto
it
=
_seq
.
upper_bound
(
_nack_seq
);
vec
[
i
]
=
_seq
.
find
(
_last_max_seq
+
i
+
2
)
==
_seq
.
end
();
// 移除 <=_last_max_seq 的seq
}
_seq
.
erase
(
_seq
.
begin
(),
it
);
doNack
(
FCI_NACK
(
_last_max_seq
+
1
,
vec
),
true
);
_last_max_seq
+=
nack_rtp_count
+
1
;
if
(
_last_max_seq
>=
max_seq
)
{
_seq
.
clear
();
}
else
{
auto
it
=
_seq
.
emplace_hint
(
_seq
.
begin
(),
_last_max_seq
+
1
);
_seq
.
erase
(
_seq
.
begin
(),
it
);
}
}
}
}
}
}
}
void
NackContext
::
setOnNack
(
onNack
cb
)
{
void
NackContext
::
setOnNack
(
onNack
cb
)
{
_cb
=
std
::
move
(
cb
);
if
(
cb
)
{
_cb
=
std
::
move
(
cb
);
}
else
{
_cb
=
[](
const
FCI_NACK
&
nack
)
{};
}
}
}
void
NackContext
::
doNack
(
const
FCI_NACK
&
nack
,
bool
record_nack
)
{
void
NackContext
::
doNack
(
const
FCI_NACK
&
nack
,
bool
record_nack
)
{
if
(
record_nack
)
{
if
(
record_nack
)
{
recordNack
(
nack
);
recordNack
(
nack
);
}
}
if
(
_cb
)
{
_cb
(
nack
);
_cb
(
nack
);
}
}
}
void
NackContext
::
eraseFrontSeq
()
{
void
NackContext
::
eraseFrontSeq
()
{
//前面部分seq是连续的,未丢包,移除之
//
前面部分seq是连续的,未丢包,移除之
for
(
auto
it
=
_seq
.
begin
();
it
!=
_seq
.
end
();)
{
for
(
auto
it
=
_seq
.
begin
();
it
!=
_seq
.
end
();)
{
if
(
*
it
!=
_last_max_seq
+
1
)
{
if
(
*
it
!=
(
uint16_t
)(
_nack_seq
+
1
)
)
{
// seq不连续,丢包了
// seq不连续,丢包了
break
;
break
;
}
}
_
last_max
_seq
=
*
it
;
_
nack
_seq
=
*
it
;
it
=
_seq
.
erase
(
it
);
it
=
_seq
.
erase
(
it
);
}
}
}
}
void
NackContext
::
onRtx
(
uint16_t
seq
)
{
void
NackContext
::
clearNackStatus
(
uint16_t
seq
)
{
auto
it
=
_nack_send_status
.
find
(
seq
);
auto
it
=
_nack_send_status
.
find
(
seq
);
if
(
it
==
_nack_send_status
.
end
())
{
if
(
it
==
_nack_send_status
.
end
())
{
return
;
return
;
...
@@ -187,9 +219,8 @@ void NackContext::onRtx(uint16_t seq) {
...
@@ -187,9 +219,8 @@ void NackContext::onRtx(uint16_t seq) {
_nack_send_status
.
erase
(
it
);
_nack_send_status
.
erase
(
it
);
if
(
rtt
>=
0
)
{
if
(
rtt
>=
0
)
{
// rtt不
肯
小于0
// rtt不
能
小于0
_rtt
=
rtt
;
_rtt
=
rtt
;
// InfoL << "rtt:" << rtt;
}
}
}
}
...
@@ -205,7 +236,7 @@ void NackContext::recordNack(const FCI_NACK &nack) {
...
@@ -205,7 +236,7 @@ void NackContext::recordNack(const FCI_NACK &nack) {
}
}
++
i
;
++
i
;
}
}
//记录太多了,移除一部分早期的记录
//
记录太多了,移除一部分早期的记录
while
(
_nack_send_status
.
size
()
>
kNackMaxSize
)
{
while
(
_nack_send_status
.
size
()
>
kNackMaxSize
)
{
_nack_send_status
.
erase
(
_nack_send_status
.
begin
());
_nack_send_status
.
erase
(
_nack_send_status
.
begin
());
}
}
...
@@ -216,18 +247,18 @@ uint64_t NackContext::reSendNack() {
...
@@ -216,18 +247,18 @@ uint64_t NackContext::reSendNack() {
auto
now
=
getCurrentMillisecond
();
auto
now
=
getCurrentMillisecond
();
for
(
auto
it
=
_nack_send_status
.
begin
();
it
!=
_nack_send_status
.
end
();)
{
for
(
auto
it
=
_nack_send_status
.
begin
();
it
!=
_nack_send_status
.
end
();)
{
if
(
now
-
it
->
second
.
first_stamp
>
kNackMaxMS
)
{
if
(
now
-
it
->
second
.
first_stamp
>
kNackMaxMS
)
{
//该rtp丢失太久了,不再要求重传
//
该rtp丢失太久了,不再要求重传
it
=
_nack_send_status
.
erase
(
it
);
it
=
_nack_send_status
.
erase
(
it
);
continue
;
continue
;
}
}
if
(
now
-
it
->
second
.
update_stamp
<
kNackIntervalRatio
*
_rtt
)
{
if
(
now
-
it
->
second
.
update_stamp
<
kNackIntervalRatio
*
_rtt
)
{
//距离上次nack不足2倍的rtt,不用再发送nack
//
距离上次nack不足2倍的rtt,不用再发送nack
++
it
;
++
it
;
continue
;
continue
;
}
}
//此rtp需要请求重传
//
此rtp需要请求重传
nack_rtp
.
emplace
(
it
->
first
);
nack_rtp
.
emplace
(
it
->
first
);
//更新nack发送时间戳
//
更新nack发送时间戳
it
->
second
.
update_stamp
=
now
;
it
->
second
.
update_stamp
=
now
;
if
(
++
(
it
->
second
.
nack_count
)
==
kNackMaxCount
)
{
if
(
++
(
it
->
second
.
nack_count
)
==
kNackMaxCount
)
{
// nack次数太多,移除之
// nack次数太多,移除之
...
@@ -238,7 +269,7 @@ uint64_t NackContext::reSendNack() {
...
@@ -238,7 +269,7 @@ uint64_t NackContext::reSendNack() {
}
}
if
(
_nack_send_status
.
empty
())
{
if
(
_nack_send_status
.
empty
())
{
//不需要再发送nack
//
不需要再发送nack
return
0
;
return
0
;
}
}
...
@@ -253,12 +284,12 @@ uint64_t NackContext::reSendNack() {
...
@@ -253,12 +284,12 @@ uint64_t NackContext::reSendNack() {
}
}
auto
inc
=
*
it
-
pid
;
auto
inc
=
*
it
-
pid
;
if
(
inc
>
(
ssize_t
)
FCI_NACK
::
kBitSize
)
{
if
(
inc
>
(
ssize_t
)
FCI_NACK
::
kBitSize
)
{
//新的nack包
//
新的nack包
doNack
(
FCI_NACK
(
pid
,
vec
),
false
);
doNack
(
FCI_NACK
(
pid
,
vec
),
false
);
pid
=
-
1
;
pid
=
-
1
;
continue
;
continue
;
}
}
//这个包丢了
//
这个包丢了
vec
[
inc
-
1
]
=
true
;
vec
[
inc
-
1
]
=
true
;
++
it
;
++
it
;
}
}
...
@@ -266,7 +297,7 @@ uint64_t NackContext::reSendNack() {
...
@@ -266,7 +297,7 @@ uint64_t NackContext::reSendNack() {
doNack
(
FCI_NACK
(
pid
,
vec
),
false
);
doNack
(
FCI_NACK
(
pid
,
vec
),
false
);
}
}
//重传间隔不得低于5ms
//
重传间隔不得低于5ms
return
max
(
_rtt
,
5
);
return
max
(
_rtt
,
5
);
}
}
...
...
webrtc/Nack.h
查看文件 @
a981ce8c
...
@@ -53,7 +53,7 @@ public:
...
@@ -53,7 +53,7 @@ public:
// nack重传频率,rtt的倍数
// nack重传频率,rtt的倍数
static
constexpr
auto
kNackIntervalRatio
=
1
.
0
f
;
static
constexpr
auto
kNackIntervalRatio
=
1
.
0
f
;
NackContext
()
=
default
;
NackContext
();
~
NackContext
()
=
default
;
~
NackContext
()
=
default
;
void
received
(
uint16_t
seq
,
bool
is_rtx
=
false
);
void
received
(
uint16_t
seq
,
bool
is_rtx
=
false
);
...
@@ -64,13 +64,16 @@ private:
...
@@ -64,13 +64,16 @@ private:
void
eraseFrontSeq
();
void
eraseFrontSeq
();
void
doNack
(
const
FCI_NACK
&
nack
,
bool
record_nack
);
void
doNack
(
const
FCI_NACK
&
nack
,
bool
record_nack
);
void
recordNack
(
const
FCI_NACK
&
nack
);
void
recordNack
(
const
FCI_NACK
&
nack
);
void
onRtx
(
uint16_t
seq
);
void
clearNackStatus
(
uint16_t
seq
);
void
makeNack
(
uint16_t
max
,
bool
flush
=
false
);
private
:
private
:
bool
_started
=
false
;
int
_rtt
=
50
;
int
_rtt
=
50
;
onNack
_cb
;
onNack
_cb
;
std
::
set
<
uint16_t
>
_seq
;
std
::
set
<
uint16_t
>
_seq
;
uint16_t
_last_max_seq
=
0
;
// 最新nack包中的rtp seq值
uint16_t
_nack_seq
=
0
;
struct
NackStatus
{
struct
NackStatus
{
uint64_t
first_stamp
;
uint64_t
first_stamp
;
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论