Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
ea35002b
Commit
ea35002b
authored
Sep 21, 2022
by
xiongguangjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
copy srt official packet recve rate algorithm
parent
a226794c
隐藏空白字符变更
内嵌
并排
正在显示
6 个修改的文件
包含
103 行增加
和
47 行删除
+103
-47
srt/Packet.hpp
+14
-0
srt/SrtTransport.cpp
+20
-11
srt/SrtTransport.hpp
+8
-2
srt/SrtTransportImp.cpp
+2
-0
srt/Statistic.cpp
+47
-25
srt/Statistic.hpp
+12
-9
没有找到文件。
srt/Packet.hpp
查看文件 @
ea35002b
...
...
@@ -14,6 +14,20 @@
namespace
SRT
{
using
namespace
toolkit
;
static
const
size_t
HDR_SIZE
=
4
;
// packet header size = SRT_PH_E_SIZE * sizeof(uint32_t)
// Can also be calculated as: sizeof(struct ether_header) + sizeof(struct ip) + sizeof(struct udphdr).
static
const
size_t
UDP_HDR_SIZE
=
28
;
// 20 bytes IPv4 + 8 bytes of UDP { u16 sport, dport, len, csum }.
static
const
size_t
SRT_DATA_HDR_SIZE
=
UDP_HDR_SIZE
+
HDR_SIZE
;
// Maximum transmission unit size. 1500 in case of Ethernet II (RFC 1191).
static
const
size_t
ETH_MAX_MTU_SIZE
=
1500
;
// Maximum payload size of an SRT packet.
static
const
size_t
SRT_MAX_PAYLOAD_SIZE
=
ETH_MAX_MTU_SIZE
-
SRT_DATA_HDR_SIZE
;
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
...
...
srt/SrtTransport.cpp
查看文件 @
ea35002b
...
...
@@ -22,7 +22,7 @@ SrtTransport::SrtTransport(const EventPoller::Ptr &poller)
_start_timestamp
=
SteadyClock
::
now
();
_socket_id
=
s_srt_socket_id_generate
.
fetch_add
(
1
);
_pkt_recv_rate_context
=
std
::
make_shared
<
PacketRecvRateContext
>
(
_start_timestamp
);
_recv_rate_context
=
std
::
make_shared
<
RecvRateContext
>
(
_start_timestamp
);
//
_recv_rate_context = std::make_shared<RecvRateContext>(_start_timestamp);
_estimated_link_capacity_context
=
std
::
make_shared
<
EstimatedLinkCapacityContext
>
(
_start_timestamp
);
}
...
...
@@ -105,11 +105,12 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
if
(
_handleshake_timer
){
_handleshake_timer
.
reset
();
}
_pkt_recv_rate_context
->
inputPacket
(
_now
);
_pkt_recv_rate_context
->
inputPacket
(
_now
,
len
-
HDR_SIZE
);
_estimated_link_capacity_context
->
inputPacket
(
_now
);
_recv_rate_context
->
inputPacket
(
_now
,
len
);
//
_recv_rate_context->inputPacket(_now, len);
handleDataPacket
(
buf
,
len
,
addr
);
checkAndSendAckNak
();
}
else
{
WarnL
<<
"DataPacket switch to other transport: "
<<
socketId
;
switchToOtherTransport
(
buf
,
len
,
socketId
,
addr
);
...
...
@@ -124,9 +125,10 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
switchToOtherTransport
(
buf
,
len
,
socketId
,
addr
);
return
;
}
_pkt_recv_rate_context
->
inputPacket
(
_now
);
_pkt_recv_rate_context
->
inputPacket
(
_now
,
len
);
_estimated_link_capacity_context
->
inputPacket
(
_now
);
_recv_rate_context
->
inputPacket
(
_now
,
len
);
//
_recv_rate_context->inputPacket(_now, len);
auto
it
=
s_control_functions
.
find
(
type
);
if
(
it
==
s_control_functions
.
end
())
{
...
...
@@ -135,6 +137,9 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
}
else
{
(
this
->*
(
it
->
second
))(
buf
,
len
,
addr
);
}
if
(
_is_handleshake_finished
&&
isPusher
()){
checkAndSendAckNak
();
}
}
else
{
// not reach
WarnL
<<
"not reach this"
;
...
...
@@ -195,7 +200,7 @@ void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockadd
registerSelfHandshake
();
sendControlPacket
(
res
,
true
);
_handleshake_timer
=
std
::
make_shared
<
Timer
>
(
0.
0
2
,[
this
]()
->
bool
{
_handleshake_timer
=
std
::
make_shared
<
Timer
>
(
0.2
,[
this
]()
->
bool
{
sendControlPacket
(
_handleshake_res
,
true
);
return
true
;
},
getPoller
());
...
...
@@ -389,6 +394,7 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage
std
::
list
<
DataPacket
::
Ptr
>
list
;
// TraceL<<"drop "<<pkt.first_pkt_seq_num<<" last "<<pkt.last_pkt_seq_num;
_recv_buf
->
drop
(
pkt
.
first_pkt_seq_num
,
pkt
.
last_pkt_seq_num
,
list
);
//checkAndSendAckNak();
if
(
list
.
empty
())
{
return
;
}
...
...
@@ -413,7 +419,8 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage
// TraceL << "check lost send nack";
}
*/
}
void
SrtTransport
::
checkAndSendAckNak
(){
auto
nak_interval
=
(
_rtt
+
_rtt_variance
*
4
)
/
2
;
if
(
nak_interval
<=
20
*
1000
)
{
nak_interval
=
20
*
1000
;
...
...
@@ -442,7 +449,6 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage
}
_light_ack_pkt_count
++
;
}
void
SrtTransport
::
handleUserDefinedType
(
uint8_t
*
buf
,
int
len
,
struct
sockaddr_storage
*
addr
)
{
TraceL
;
}
...
...
@@ -465,6 +471,8 @@ void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storag
}
void
SrtTransport
::
sendACKPacket
()
{
uint32_t
recv_rate
=
0
;
ACKPacket
::
Ptr
pkt
=
std
::
make_shared
<
ACKPacket
>
();
pkt
->
dst_socket_id
=
_peer_socket_id
;
pkt
->
timestamp
=
DurationCountMicroseconds
(
_now
-
_start_timestamp
);
...
...
@@ -473,9 +481,9 @@ void SrtTransport::sendACKPacket() {
pkt
->
rtt
=
_rtt
;
pkt
->
rtt_variance
=
_rtt_variance
;
pkt
->
available_buf_size
=
_recv_buf
->
getAvailableBufferSize
();
pkt
->
pkt_recv_rate
=
_pkt_recv_rate_context
->
getPacketRecvRate
();
pkt
->
pkt_recv_rate
=
_pkt_recv_rate_context
->
getPacketRecvRate
(
recv_rate
);
pkt
->
estimated_link_capacity
=
_estimated_link_capacity_context
->
getEstimatedLinkCapacity
();
pkt
->
recv_rate
=
_recv_rate_context
->
getRecvRate
()
;
pkt
->
recv_rate
=
recv_rate
;
pkt
->
storeToData
();
_ack_send_timestamp
[
pkt
->
ack_number
]
=
_now
;
_last_ack_pkt_seq_num
=
pkt
->
last_ack_pkt_seq_number
;
...
...
@@ -584,7 +592,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
sendNAKPacket(lost);
}
*/
/*
auto nak_interval = (_rtt + _rtt_variance * 4) / 2;
if (nak_interval <= 20 * 1000) {
nak_interval = 20 * 1000;
...
...
@@ -617,6 +625,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
_light_ack_pkt_count = 0;
}
_light_ack_pkt_count++;
*/
// bufCheckInterval();
}
...
...
srt/SrtTransport.hpp
查看文件 @
ea35002b
...
...
@@ -53,7 +53,9 @@ protected:
virtual
bool
isPusher
()
{
return
true
;
};
virtual
void
onSRTData
(
DataPacket
::
Ptr
pkt
)
{};
virtual
void
onShutdown
(
const
SockException
&
ex
);
virtual
void
onHandShakeFinished
(
std
::
string
&
streamid
,
struct
sockaddr_storage
*
addr
)
{};
virtual
void
onHandShakeFinished
(
std
::
string
&
streamid
,
struct
sockaddr_storage
*
addr
)
{
_is_handleshake_finished
=
true
;
};
virtual
void
sendPacket
(
Buffer
::
Ptr
pkt
,
bool
flush
=
true
);
virtual
int
getLatencyMul
()
{
return
4
;
};
virtual
int
getPktBufSize
()
{
return
8192
;
};
...
...
@@ -91,6 +93,8 @@ private:
void
createTimerForCheckAlive
();
void
checkAndSendAckNak
();
protected
:
void
sendDataPacket
(
DataPacket
::
Ptr
pkt
,
char
*
buf
,
int
len
,
bool
flush
=
false
);
void
sendControlPacket
(
ControlPacket
::
Ptr
pkt
,
bool
flush
=
true
);
...
...
@@ -137,7 +141,7 @@ private:
std
::
shared_ptr
<
PacketRecvRateContext
>
_pkt_recv_rate_context
;
std
::
shared_ptr
<
EstimatedLinkCapacityContext
>
_estimated_link_capacity_context
;
std
::
shared_ptr
<
RecvRateContext
>
_recv_rate_context
;
//
std::shared_ptr<RecvRateContext> _recv_rate_context;
UTicker
_nak_ticker
;
...
...
@@ -152,6 +156,8 @@ private:
Timer
::
Ptr
_timer
;
//刷新计时器
Ticker
_alive_ticker
;
bool
_is_handleshake_finished
=
false
;
};
class
SrtTransportManager
{
...
...
srt/SrtTransportImp.cpp
查看文件 @
ea35002b
...
...
@@ -22,6 +22,7 @@ SrtTransportImp::~SrtTransportImp() {
}
void
SrtTransportImp
::
onHandShakeFinished
(
std
::
string
&
streamid
,
struct
sockaddr_storage
*
addr
)
{
SrtTransport
::
onHandShakeFinished
(
streamid
,
addr
);
// TODO parse stream id like this zlmediakit.com/live/test?token=1213444&type=push
if
(
!
_addr
)
{
_addr
.
reset
(
new
sockaddr_storage
(
*
((
sockaddr_storage
*
)
addr
)));
...
...
@@ -100,6 +101,7 @@ void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) {
}
if
(
_decoder
)
{
_decoder
->
input
(
reinterpret_cast
<
const
uint8_t
*>
(
pkt
->
payloadData
()),
pkt
->
payloadSize
());
//TraceL<<" size "<<pkt->payloadSize();
}
else
{
WarnP
(
this
)
<<
" not reach this"
;
}
...
...
srt/Statistic.cpp
查看文件 @
ea35002b
...
...
@@ -4,36 +4,56 @@
namespace
SRT
{
void
PacketRecvRateContext
::
inputPacket
(
TimePoint
&
ts
)
{
if
(
_pkt_map
.
size
()
>
100
)
{
_pkt_map
.
erase
(
_pkt_map
.
begin
());
PacketRecvRateContext
::
PacketRecvRateContext
(
TimePoint
start
)
:
_last_arrive_time
(
start
)
{
for
(
size_t
i
=
0
;
i
<
SIZE
;
i
++
)
{
_ts_arr
[
i
]
=
1000000
;
_size_arr
[
i
]
=
SRT_MAX_PAYLOAD_SIZE
;
}
auto
tmp
=
DurationCountMicroseconds
(
ts
-
_start
);
_pkt_map
.
emplace
(
tmp
,
tmp
);
_cur_idx
=
0
;
};
void
PacketRecvRateContext
::
inputPacket
(
TimePoint
&
ts
,
size_t
len
)
{
auto
tmp
=
DurationCountMicroseconds
(
ts
-
_last_arrive_time
);
_ts_arr
[
_cur_idx
]
=
tmp
;
_size_arr
[
_cur_idx
]
=
len
;
_cur_idx
=
(
1
+
_cur_idx
)
%
SIZE
;
_last_arrive_time
=
ts
;
}
uint32_t
PacketRecvRateContext
::
getPacketRecvRate
()
{
if
(
_pkt_map
.
size
()
<
2
)
{
return
50000
;
}
int64_t
dur
=
1000
;
for
(
auto
it
=
_pkt_map
.
begin
();
it
!=
_pkt_map
.
end
();
++
it
)
{
auto
next
=
it
;
++
next
;
if
(
next
==
_pkt_map
.
end
())
{
break
;
}
uint32_t
PacketRecvRateContext
::
getPacketRecvRate
(
uint32_t
&
bytesps
)
{
int64_t
tmp_arry
[
SIZE
];
std
::
copy
(
_ts_arr
,
_ts_arr
+
SIZE
,
tmp_arry
);
std
::
nth_element
(
tmp_arry
,
tmp_arry
+
(
SIZE
/
2
),
tmp_arry
+
SIZE
);
int64_t
median
=
tmp_arry
[
SIZE
/
2
];
if
((
next
->
first
-
it
->
first
)
<
dur
)
{
dur
=
next
->
first
-
it
->
first
;
unsigned
count
=
0
;
int
sum
=
0
;
int64_t
upper
=
median
<<
3
;
int64_t
lower
=
median
>>
3
;
bytesps
=
0
;
size_t
bytes
=
0
;
const
size_t
*
bp
=
_size_arr
;
// median filtering
const
int64_t
*
p
=
_ts_arr
;
for
(
int
i
=
0
,
n
=
SIZE
;
i
<
n
;
++
i
)
{
if
((
*
p
<
upper
)
&&
(
*
p
>
lower
))
{
++
count
;
// packet counter
sum
+=
*
p
;
// usec counter
bytes
+=
*
bp
;
// byte counter
}
++
p
;
// advance packet pointer
++
bp
;
// advance bytes pointer
}
double
rate
=
1e6
/
(
double
)
dur
;
if
(
rate
<=
1000
)
{
return
50000
;
}
return
rate
;
// claculate speed, or return 0 if not enough valid value
bytesps
=
(
unsigned
long
)
ceil
(
1000000.0
/
(
double
(
sum
)
/
double
(
bytes
)));
auto
ret
=
(
uint32_t
)
ceil
(
1000000.0
/
(
sum
/
count
));
if
(
_cur_idx
==
0
)
TraceL
<<
bytesps
<<
" byte/sec "
<<
ret
<<
" pkt/sec"
;
return
ret
;
}
void
EstimatedLinkCapacityContext
::
inputPacket
(
TimePoint
&
ts
)
{
...
...
@@ -70,12 +90,13 @@ uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() {
return
(
uint32_t
)(
1.0
/
dur
);
}
/*
void RecvRateContext::inputPacket(TimePoint &ts, size_t size) {
if (_pkt_map.size() > 100) {
_pkt_map.erase(_pkt_map.begin());
}
auto tmp = DurationCountMicroseconds(ts - _start);
_pkt_map
.
emplace
(
tmp
,
tmp
);
_pkt_map.emplace(tmp,
size
);
}
uint32_t RecvRateContext::getRecvRate() {
...
...
@@ -94,5 +115,5 @@ uint32_t RecvRateContext::getRecvRate() {
double rate = (double)bytes / dur;
return (uint32_t)rate;
}
*/
}
//
namespace
SRT
\ No newline at end of file
srt/Statistic.hpp
查看文件 @
ea35002b
...
...
@@ -6,18 +6,19 @@
#include "Packet.hpp"
namespace
SRT
{
class
PacketRecvRateContext
{
public
:
PacketRecvRateContext
(
TimePoint
start
)
:
_start
(
start
)
{};
PacketRecvRateContext
(
TimePoint
start
);
~
PacketRecvRateContext
()
=
default
;
void
inputPacket
(
TimePoint
&
ts
);
uint32_t
getPacketRecvRate
();
void
inputPacket
(
TimePoint
&
ts
,
size_t
len
=
0
);
uint32_t
getPacketRecvRate
(
uint32_t
&
bytesps
);
static
const
int
SIZE
=
16
;
private
:
TimePoint
_start
;
std
::
map
<
int64_t
,
int64_t
>
_pkt_map
;
TimePoint
_last_arrive_time
;
int64_t
_ts_arr
[
SIZE
];
size_t
_size_arr
[
SIZE
];
size_t
_cur_idx
;
//std::map<int64_t, int64_t> _pkt_map;
};
class
EstimatedLinkCapacityContext
{
...
...
@@ -32,6 +33,7 @@ private:
std
::
map
<
int64_t
,
int64_t
>
_pkt_map
;
};
/*
class RecvRateContext {
public:
RecvRateContext(TimePoint start)
...
...
@@ -44,6 +46,6 @@ private:
TimePoint _start;
std::map<int64_t, size_t> _pkt_map;
};
*/
}
// namespace SRT
#endif // ZLMEDIAKIT_SRT_STATISTIC_H
\ No newline at end of file
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论