Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
f58211fb
Commit
f58211fb
authored
Jun 09, 2022
by
xiongguangjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
optimize send nack when check packet lost send nack immediately
parent
e5ca3aa0
隐藏空白字符变更
内嵌
并排
正在显示
6 个修改的文件
包含
191 行增加
和
28 行删除
+191
-28
srt/Common.hpp
+8
-3
srt/NackContext.cpp
+109
-0
srt/NackContext.hpp
+31
-0
srt/PacketQueue.cpp
+2
-9
srt/SrtTransport.cpp
+35
-11
srt/SrtTransport.hpp
+6
-5
没有找到文件。
srt/Common.hpp
查看文件 @
f58211fb
#
ifndef
ZLMEDIAKIT_SRT_COMMON_H
#define ZLMEDIAKIT_SRT_COMMON_H
#if defined(_WIN32)
#include <Iphlpapi.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <Iphlpapi.h>
#pragma comment (lib, "Ws2_32.lib")
#pragma comment(lib,"Iphlpapi.lib")
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Iphlpapi.lib")
#else
#include <netdb.h>
#include <sys/socket.h>
#endif // defined(_WIN32)
#include <chrono>
#define MAX_SEQ 0x7fffffff
#define MAX_TS 0xffffffff
namespace
SRT
{
using
SteadyClock
=
std
::
chrono
::
steady_clock
;
...
...
@@ -59,6 +61,9 @@ static inline void storeUint16LE(uint8_t *buf, uint16_t val) {
static
inline
uint32_t
srtVersion
(
int
major
,
int
minor
,
int
patch
)
{
return
patch
+
minor
*
0x100
+
major
*
0x10000
;
}
static
inline
uint32_t
genExpectedSeq
(
uint32_t
seq
)
{
return
MAX_SEQ
&
seq
;
}
class
UTicker
{
public
:
...
...
srt/NackContext.cpp
0 → 100644
查看文件 @
f58211fb
#include "NackContext.hpp"
namespace
SRT
{
void
NackContext
::
update
(
TimePoint
now
,
std
::
list
<
PacketQueue
::
LostPair
>
&
lostlist
)
{
for
(
auto
item
:
lostlist
)
{
mergeItem
(
now
,
item
);
}
}
void
NackContext
::
getLostList
(
TimePoint
now
,
uint32_t
rtt
,
uint32_t
rtt_variance
,
std
::
list
<
PacketQueue
::
LostPair
>
&
lostlist
)
{
lostlist
.
clear
();
std
::
list
<
uint32_t
>
tmp_list
;
for
(
auto
it
=
_nack_map
.
begin
();
it
!=
_nack_map
.
end
();
++
it
)
{
if
(
!
it
->
second
.
_is_nack
)
{
tmp_list
.
push_back
(
it
->
first
);
it
->
second
.
_ts
=
now
;
it
->
second
.
_is_nack
=
true
;
}
else
{
if
(
DurationCountMicroseconds
(
now
-
it
->
second
.
_ts
)
>
rtt
)
{
tmp_list
.
push_back
(
it
->
first
);
it
->
second
.
_ts
=
now
;
}
}
}
tmp_list
.
sort
();
if
(
tmp_list
.
empty
())
{
return
;
}
uint32_t
min
=
*
tmp_list
.
begin
();
uint32_t
max
=
*
tmp_list
.
rbegin
();
if
((
max
-
min
)
>=
(
MAX_SEQ
>>
1
))
{
while
((
max
-
tmp_list
.
front
())
>
(
MAX_SEQ
>>
1
))
{
tmp_list
.
push_back
(
tmp_list
.
front
());
tmp_list
.
pop_front
();
}
}
PacketQueue
::
LostPair
lost
;
bool
finish
=
true
;
for
(
auto
cur
=
tmp_list
.
begin
();
cur
!=
tmp_list
.
end
();
++
cur
)
{
if
(
finish
)
{
lost
.
first
=
*
cur
;
lost
.
second
=
genExpectedSeq
(
*
cur
+
1
);
finish
=
false
;
}
else
{
if
(
lost
.
second
==
*
cur
)
{
lost
.
second
=
genExpectedSeq
(
*
cur
+
1
);
}
else
{
finish
=
true
;
lostlist
.
push_back
(
lost
);
}
}
}
}
void
NackContext
::
drop
(
uint32_t
seq
)
{
if
(
_nack_map
.
empty
())
return
;
uint32_t
min
=
_nack_map
.
begin
()
->
first
;
uint32_t
max
=
_nack_map
.
rbegin
()
->
first
;
bool
is_cycle
=
false
;
if
((
max
-
min
)
>=
(
MAX_SEQ
>>
1
))
{
is_cycle
=
true
;
}
for
(
auto
it
=
_nack_map
.
begin
();
it
!=
_nack_map
.
end
();)
{
if
(
!
is_cycle
)
{
// 不回环
if
(
it
->
first
<=
seq
)
{
it
=
_nack_map
.
erase
(
it
);
}
else
{
it
++
;
}
}
else
{
if
(
it
->
first
<=
seq
)
{
if
((
seq
-
it
->
first
)
>=
(
MAX_SEQ
>>
1
))
{
WarnL
<<
"cycle seq "
<<
seq
<<
" "
<<
it
->
first
;
it
++
;
}
else
{
it
=
_nack_map
.
erase
(
it
);
}
}
else
{
if
((
it
->
first
-
seq
)
>=
(
MAX_SEQ
>>
1
))
{
it
=
_nack_map
.
erase
(
it
);
WarnL
<<
"cycle seq "
<<
seq
<<
" "
<<
it
->
first
;
}
else
{
it
++
;
}
}
}
}
}
void
NackContext
::
mergeItem
(
TimePoint
now
,
PacketQueue
::
LostPair
&
item
)
{
for
(
uint32_t
i
=
item
.
first
;
i
<
item
.
second
;
++
i
)
{
auto
it
=
_nack_map
.
find
(
i
);
if
(
it
!=
_nack_map
.
end
())
{
}
else
{
NackItem
tmp
;
tmp
.
_is_nack
=
false
;
_nack_map
.
emplace
(
i
,
tmp
);
}
}
}
}
//
namespace
SRT
\ No newline at end of file
srt/NackContext.hpp
0 → 100644
查看文件 @
f58211fb
#ifndef ZLMEDIAKIT_SRT_NACK_CONTEXT_H
#define ZLMEDIAKIT_SRT_NACK_CONTEXT_H
#include "Common.hpp"
#include "PacketQueue.hpp"
#include <list>
namespace
SRT
{
class
NackContext
{
public
:
NackContext
()
=
default
;
~
NackContext
()
=
default
;
void
update
(
TimePoint
now
,
std
::
list
<
PacketQueue
::
LostPair
>
&
lostlist
);
void
getLostList
(
TimePoint
now
,
uint32_t
rtt
,
uint32_t
rtt_variance
,
std
::
list
<
PacketQueue
::
LostPair
>
&
lostlist
);
void
drop
(
uint32_t
seq
);
private
:
void
mergeItem
(
TimePoint
now
,
PacketQueue
::
LostPair
&
item
);
private
:
class
NackItem
{
public
:
bool
_is_nack
=
false
;
TimePoint
_ts
;
// send nak time
};
std
::
map
<
uint32_t
,
NackItem
>
_nack_map
;
};
}
// namespace SRT
#endif // ZLMEDIAKIT_SRT_NACK_CONTEXT_H
\ No newline at end of file
srt/PacketQueue.cpp
查看文件 @
f58211fb
...
...
@@ -2,13 +2,6 @@
namespace
SRT
{
#define MAX_SEQ 0x7fffffff
#define MAX_TS 0xffffffff
static
inline
uint32_t
genExpectedSeq
(
uint32_t
seq
)
{
return
MAX_SEQ
&
seq
;
}
static
inline
bool
isSeqEdge
(
uint32_t
seq
,
uint32_t
cap
)
{
if
(
seq
>
(
MAX_SEQ
-
cap
))
{
return
true
;
...
...
@@ -160,9 +153,9 @@ std::list<PacketQueue::LostPair> PacketQueue::getLostSeq() {
if
(
finish
)
{
finish
=
false
;
lost
.
first
=
i
;
lost
.
second
=
i
+
1
;
lost
.
second
=
genExpectedSeq
(
i
+
1
)
;
}
else
{
lost
.
second
=
i
+
1
;
lost
.
second
=
genExpectedSeq
(
i
+
1
)
;
}
}
else
{
if
(
!
finish
)
{
...
...
srt/SrtTransport.cpp
查看文件 @
f58211fb
#
include
<
stdlib
.
h
>
#include
"Util/onceToken.h"
#
include
"Util/onceToken.h"
#include
<stdlib.h>
#include "Ack.hpp"
#include "Packet.hpp"
...
...
@@ -203,7 +203,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
unregisterSelfHandshake
();
registerSelf
();
sendControlPacket
(
res
,
true
);
TraceL
<<
" buf size = "
<<
res
->
max_flow_window_size
<<
" init seq ="
<<
_init_seq_number
<<
" latency="
<<
delay
;
TraceL
<<
" buf size = "
<<
res
->
max_flow_window_size
<<
" init seq ="
<<
_init_seq_number
<<
" latency="
<<
delay
;
_recv_buf
=
std
::
make_shared
<
PacketQueue
>
(
res
->
max_flow_window_size
,
_init_seq_number
,
delay
*
1e3
);
_send_buf
=
std
::
make_shared
<
PacketSendQueue
>
(
res
->
max_flow_window_size
,
delay
*
1e3
);
_send_packet_seq_number
=
_init_seq_number
;
...
...
@@ -314,10 +315,21 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage
if
(
list
.
empty
())
{
return
;
}
uint32_t
max_seq
=
0
;
for
(
auto
data
:
list
)
{
max_seq
=
data
->
packet_seq_number
;
onSRTData
(
std
::
move
(
data
));
}
_recv_nack
.
drop
(
max_seq
);
auto
lost
=
_recv_buf
->
getLostSeq
();
_recv_nack
.
update
(
_now
,
lost
);
lost
.
clear
();
_recv_nack
.
getLostList
(
_now
,
_rtt
,
_rtt_variance
,
lost
);
if
(
!
lost
.
empty
())
{
sendNAKPacket
(
lost
);
// TraceL << "check lost send nack";
}
auto
nak_interval
=
(
_rtt
+
_rtt_variance
*
4
)
/
2
;
if
(
nak_interval
<=
20
*
1000
)
{
...
...
@@ -436,8 +448,24 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
//TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
//" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
_recv_buf
->
inputPacket
(
pkt
,
list
);
for
(
auto
data
:
list
)
{
onSRTData
(
std
::
move
(
data
));
if
(
list
.
empty
())
{
// when no data ok send nack to sender immediately
}
else
{
uint32_t
last_seq
;
for
(
auto
data
:
list
)
{
last_seq
=
data
->
packet_seq_number
;
onSRTData
(
std
::
move
(
data
));
}
_recv_nack
.
drop
(
last_seq
);
}
auto
lost
=
_recv_buf
->
getLostSeq
();
_recv_nack
.
update
(
_now
,
lost
);
lost
.
clear
();
_recv_nack
.
getLostList
(
_now
,
_rtt
,
_rtt_variance
,
lost
);
if
(
!
lost
.
empty
())
{
// TraceL << "check lost send nack immediately";
sendNAKPacket
(
lost
);
}
auto
nak_interval
=
(
_rtt
+
_rtt_variance
*
4
)
/
2
;
...
...
@@ -445,12 +473,8 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
nak_interval
=
20
*
1000
;
}
if
(
list
.
empty
())
{
// TraceL<<_recv_buf->dump()<<" nake interval:"<<nak_interval/1000<<"
// ticker:"<<_nak_ticker.elapsedTime(_now)/1000;
}
if
(
_nak_ticker
.
elapsedTime
(
_now
)
>
nak_interval
)
{
// Periodic NAK reports
auto
lost
=
_recv_buf
->
getLostSeq
();
if
(
!
lost
.
empty
())
{
sendNAKPacket
(
lost
);
...
...
srt/SrtTransport.hpp
查看文件 @
f58211fb
...
...
@@ -11,11 +11,11 @@
#include "Poller/Timer.h"
#include "Common.hpp"
#include "NackContext.hpp"
#include "Packet.hpp"
#include "PacketQueue.hpp"
#include "PacketSendQueue.hpp"
#include "Statistic.hpp"
namespace
SRT
{
using
namespace
toolkit
;
...
...
@@ -45,7 +45,7 @@ public:
virtual
void
onSendTSData
(
const
Buffer
::
Ptr
&
buffer
,
bool
flush
);
std
::
string
getIdentifier
();
void
unregisterSelf
();
void
unregisterSelf
();
void
unregisterSelfHandshake
();
protected
:
...
...
@@ -91,9 +91,9 @@ protected:
void
sendControlPacket
(
ControlPacket
::
Ptr
pkt
,
bool
flush
=
true
);
private
:
//当前选中的udp链接
//
当前选中的udp链接
Session
::
Ptr
_selected_session
;
//链接迁移前后使用过的udp链接
//
链接迁移前后使用过的udp链接
std
::
unordered_map
<
Session
*
,
std
::
weak_ptr
<
Session
>>
_history_sessions
;
EventPoller
::
Ptr
_poller
;
...
...
@@ -119,6 +119,7 @@ private:
PacketSendQueue
::
Ptr
_send_buf
;
uint32_t
_buf_delay
=
120
;
PacketQueue
::
Ptr
_recv_buf
;
NackContext
_recv_nack
;
uint32_t
_rtt
=
100
*
1000
;
uint32_t
_rtt_variance
=
50
*
1000
;
uint32_t
_light_ack_pkt_count
=
0
;
...
...
@@ -133,7 +134,7 @@ private:
UTicker
_nak_ticker
;
//保持发送的握手消息,防止丢失重发
//
保持发送的握手消息,防止丢失重发
HandshakePacket
::
Ptr
_handleshake_res
;
ResourcePool
<
BufferRaw
>
_packet_pool
;
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论