Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
9fadc22c
Commit
9fadc22c
authored
Jun 03, 2022
by
xiongguangjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
srt can push but bandwith estimate has error
parent
1891c4e3
隐藏空白字符变更
内嵌
并排
正在显示
5 个修改的文件
包含
300 行增加
和
6 行删除
+300
-6
srt/SrtSession.cpp
+2
-2
srt/SrtTransport.cpp
+2
-2
srt/SrtTransport.hpp
+2
-2
srt/SrtTransportImp.cpp
+221
-0
srt/SrtTransportImp.hpp
+73
-0
没有找到文件。
srt/SrtSession.cpp
查看文件 @
9fadc22c
#
include
"SrtSession.hpp"
#include "Packet.hpp"
#include "SrtTransport.hpp"
#include "SrtTransport
Imp
.hpp"
#include "Common/config.h"
...
...
@@ -70,7 +70,7 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) {
auto
type
=
HandshakePacket
::
getHandshakeType
(
data
,
size
);
if
(
type
==
HandshakePacket
::
HS_TYPE_INDUCTION
)
{
// 握手第一阶段
_transport
=
std
::
make_shared
<
SrtTransport
>
(
getPoller
());
_transport
=
std
::
make_shared
<
SrtTransport
Imp
>
(
getPoller
());
}
else
if
(
type
==
HandshakePacket
::
HS_TYPE_CONCLUSION
)
{
// 握手第二阶段
...
...
srt/SrtTransport.cpp
查看文件 @
9fadc22c
...
...
@@ -189,7 +189,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
sendControlPacket
(
res
,
true
);
TraceL
<<
" buf size = "
<<
res
->
max_flow_window_size
<<
" init seq ="
<<
_init_seq_number
<<
" lantency="
<<
req
->
recv_tsbpd_delay
;
_recv_buf
=
std
::
make_shared
<
PacketQueue
>
(
res
->
max_flow_window_size
,
_init_seq_number
,
req
->
recv_tsbpd_delay
*
1e6
);
onHandShakeFinished
(
_stream_id
);
onHandShakeFinished
(
_stream_id
,
addr
);
}
else
{
TraceL
<<
getIdentifier
()
<<
" CONCLUSION handle repeate "
;
sendControlPacket
(
_handleshake_res
,
true
);
...
...
@@ -356,7 +356,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
auto
list
=
_recv_buf
->
tryGetPacket
();
for
(
auto
data
:
list
){
onSRTData
(
std
::
move
(
data
));
onSRTData
(
std
::
move
(
data
)
,
addr
);
}
}
...
...
srt/SrtTransport.hpp
查看文件 @
9fadc22c
...
...
@@ -44,8 +44,8 @@ public:
void
unregisterSelfHandshake
();
void
unregisterSelf
();
protected
:
virtual
void
onHandShakeFinished
(
std
::
string
&
streamid
){};
virtual
void
onSRTData
(
DataPacket
::
Ptr
pkt
){};
virtual
void
onHandShakeFinished
(
std
::
string
&
streamid
,
struct
sockaddr_storage
*
addr
){};
virtual
void
onSRTData
(
DataPacket
::
Ptr
pkt
,
struct
sockaddr_storage
*
addr
){};
virtual
void
onShutdown
(
const
SockException
&
ex
);
private
:
...
...
srt/SrtTransportImp.cpp
0 → 100644
查看文件 @
9fadc22c
#
include
<
memory
>
#include "Util/util.h"
#include "SrtTransportImp.hpp"
namespace
SRT
{
SrtTransportImp
::
SrtTransportImp
(
const
EventPoller
::
Ptr
&
poller
)
:
SrtTransport
(
poller
)
{}
SrtTransportImp
::~
SrtTransportImp
()
{
InfoP
(
this
);
}
void
SrtTransportImp
::
onHandShakeFinished
(
std
::
string
&
streamid
,
struct
sockaddr_storage
*
addr
)
{
// TODO parse streamid like this zlmediakit.com/live/test?token=1213444&type=pusher
if
(
!
_addr
){
_addr
.
reset
(
new
sockaddr_storage
(
*
((
sockaddr_storage
*
)
addr
)));
}
_media_info
.
parse
(
"srt://"
+
streamid
);
auto
params
=
Parser
::
parseArgs
(
_media_info
.
_param_strs
);
if
(
params
[
"type"
]
==
"push"
){
_is_pusher
=
true
;
_decoder
=
DecoderImp
::
createDecoder
(
DecoderImp
::
decoder_ts
,
this
);
emitOnPublish
();
}
else
{
_is_pusher
=
false
;
}
}
void
SrtTransportImp
::
onSRTData
(
DataPacket
::
Ptr
pkt
,
struct
sockaddr_storage
*
addr
)
{
if
(
!
_is_pusher
){
WarnP
(
this
)
<<
"this is a player data ignore"
;
return
;
}
if
(
!
_addr
){
_addr
.
reset
(
new
sockaddr_storage
(
*
((
sockaddr_storage
*
)
addr
)));
}
if
(
_decoder
)
{
_decoder
->
input
(
reinterpret_cast
<
const
uint8_t
*>
(
pkt
->
payloadData
()),
pkt
->
payloadSize
());
}
else
{
WarnP
(
this
)
<<
" not reach this"
;
}
}
void
SrtTransportImp
::
onShutdown
(
const
SockException
&
ex
)
{
SrtTransport
::
onShutdown
(
ex
);
}
bool
SrtTransportImp
::
close
(
mediakit
::
MediaSource
&
sender
,
bool
force
){
if
(
!
force
&&
totalReaderCount
(
sender
))
{
return
false
;
}
std
::
string
err
=
StrPrinter
<<
"close media:"
<<
sender
.
getSchema
()
<<
"/"
<<
sender
.
getVhost
()
<<
"/"
<<
sender
.
getApp
()
<<
"/"
<<
sender
.
getId
()
<<
" "
<<
force
;
weak_ptr
<
SrtTransportImp
>
weak_self
=
static_pointer_cast
<
SrtTransportImp
>
(
shared_from_this
());
getPoller
()
->
async
([
weak_self
,
err
]()
{
auto
strong_self
=
weak_self
.
lock
();
if
(
strong_self
)
{
strong_self
->
onShutdown
(
SockException
(
Err_shutdown
,
err
));
//主动关闭推流,那么不延时注销
strong_self
->
_muxer
=
nullptr
;
}
});
return
true
;
}
// 播放总人数
int
SrtTransportImp
::
totalReaderCount
(
mediakit
::
MediaSource
&
sender
){
return
_muxer
?
_muxer
->
totalReaderCount
()
:
sender
.
readerCount
();
}
// 获取媒体源类型
mediakit
::
MediaOriginType
SrtTransportImp
::
getOriginType
(
mediakit
::
MediaSource
&
sender
)
const
{
return
MediaOriginType
::
srt_push
;
}
// 获取媒体源url或者文件路径
std
::
string
SrtTransportImp
::
getOriginUrl
(
mediakit
::
MediaSource
&
sender
)
const
{
return
_media_info
.
_full_url
;
}
// 获取媒体源客户端相关信息
std
::
shared_ptr
<
SockInfo
>
SrtTransportImp
::
getOriginSock
(
mediakit
::
MediaSource
&
sender
)
const
{
return
static_pointer_cast
<
SockInfo
>
(
getSession
());
}
void
SrtTransportImp
::
emitOnPublish
()
{
std
::
weak_ptr
<
SrtTransportImp
>
weak_self
=
static_pointer_cast
<
SrtTransportImp
>
(
shared_from_this
());
Broadcast
::
PublishAuthInvoker
invoker
=
[
weak_self
](
const
std
::
string
&
err
,
const
ProtocolOption
&
option
)
{
auto
strong_self
=
weak_self
.
lock
();
if
(
!
strong_self
)
{
return
;
}
if
(
err
.
empty
())
{
strong_self
->
_muxer
=
std
::
make_shared
<
MultiMediaSourceMuxer
>
(
strong_self
->
_media_info
.
_vhost
,
strong_self
->
_media_info
.
_app
,
strong_self
->
_media_info
.
_streamid
,
0.0
f
,
option
);
strong_self
->
_muxer
->
setMediaListener
(
strong_self
);
strong_self
->
doCachedFunc
();
InfoP
(
strong_self
)
<<
"允许 srt 推流"
;
}
else
{
WarnP
(
strong_self
)
<<
"禁止 srt 推流:"
<<
err
;
strong_self
->
onShutdown
(
SockException
(
Err_refused
,
err
));
}
};
//触发推流鉴权事件
auto
flag
=
NoticeCenter
::
Instance
().
emitEvent
(
Broadcast
::
kBroadcastMediaPublish
,
MediaOriginType
::
rtp_push
,
_media_info
,
invoker
,
static_cast
<
SockInfo
&>
(
*
this
));
if
(
!
flag
)
{
//该事件无人监听,默认不鉴权
invoker
(
""
,
ProtocolOption
());
}
}
void
SrtTransportImp
::
emitOnPlay
(){
std
::
weak_ptr
<
SrtTransportImp
>
weak_self
=
static_pointer_cast
<
SrtTransportImp
>
(
shared_from_this
());
Broadcast
::
AuthInvoker
invoker
=
[
weak_self
](
const
string
&
err
){
auto
strong_self
=
weak_self
.
lock
();
if
(
!
strong_self
)
{
return
;
}
strong_self
->
getPoller
()
->
async
([
strong_self
,
err
]{
if
(
err
!=
""
){
strong_self
->
onShutdown
(
SockException
(
Err_refused
,
err
));
}
else
{
strong_self
->
doPlay
();
}
});
};
auto
flag
=
NoticeCenter
::
Instance
().
emitEvent
(
Broadcast
::
kBroadcastMediaPlayed
,
_media_info
,
invoker
,
static_cast
<
SockInfo
&>
(
*
this
));
if
(
!
flag
){
doPlay
();
}
}
void
SrtTransportImp
::
doPlay
(){
}
std
::
string
SrtTransportImp
::
get_peer_ip
()
{
if
(
!
_addr
)
{
return
"::"
;
}
return
SockUtil
::
inet_ntoa
((
sockaddr
*
)
_addr
.
get
());
}
uint16_t
SrtTransportImp
::
get_peer_port
()
{
if
(
!
_addr
)
{
return
0
;
}
return
SockUtil
::
inet_port
((
sockaddr
*
)
_addr
.
get
());
}
std
::
string
SrtTransportImp
::
get_local_ip
()
{
auto
s
=
getSession
();
if
(
s
)
{
return
s
->
get_local_ip
();
}
return
"::"
;
}
uint16_t
SrtTransportImp
::
get_local_port
()
{
auto
s
=
getSession
();
if
(
s
)
{
return
s
->
get_local_port
();
}
return
0
;
}
std
::
string
SrtTransportImp
::
getIdentifier
()
const
{
return
_media_info
.
_streamid
;
}
bool
SrtTransportImp
::
inputFrame
(
const
Frame
::
Ptr
&
frame
)
{
if
(
_muxer
)
{
return
_muxer
->
inputFrame
(
frame
);
}
if
(
_cached_func
.
size
()
>
200
)
{
WarnL
<<
"cached frame of track("
<<
frame
->
getCodecName
()
<<
") is too much, now dropped"
;
return
false
;
}
auto
frame_cached
=
Frame
::
getCacheAbleFrame
(
frame
);
lock_guard
<
recursive_mutex
>
lck
(
_func_mtx
);
_cached_func
.
emplace_back
([
this
,
frame_cached
]()
{
_muxer
->
inputFrame
(
frame_cached
);
});
return
true
;
}
bool
SrtTransportImp
::
addTrack
(
const
Track
::
Ptr
&
track
)
{
if
(
_muxer
)
{
return
_muxer
->
addTrack
(
track
);
}
lock_guard
<
recursive_mutex
>
lck
(
_func_mtx
);
_cached_func
.
emplace_back
([
this
,
track
]()
{
_muxer
->
addTrack
(
track
);
});
return
true
;
}
void
SrtTransportImp
::
addTrackCompleted
()
{
if
(
_muxer
)
{
_muxer
->
addTrackCompleted
();
}
else
{
lock_guard
<
recursive_mutex
>
lck
(
_func_mtx
);
_cached_func
.
emplace_back
([
this
]()
{
_muxer
->
addTrackCompleted
();
});
}
}
void
SrtTransportImp
::
doCachedFunc
()
{
lock_guard
<
recursive_mutex
>
lck
(
_func_mtx
);
for
(
auto
&
func
:
_cached_func
)
{
func
();
}
_cached_func
.
clear
();
}
}
//
namespace
SRT
\ No newline at end of file
srt/SrtTransportImp.hpp
0 → 100644
查看文件 @
9fadc22c
#
ifndef
ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#include "Common/MultiMediaSourceMuxer.h"
#include "Rtp/Decoder.h"
#include "SrtTransport.hpp"
namespace
SRT
{
using
namespace
toolkit
;
using
namespace
mediakit
;
using
namespace
std
;
class
SrtTransportImp
:
public
SrtTransport
,
public
toolkit
::
SockInfo
,
public
MediaSinkInterface
,
public
mediakit
::
MediaSourceEvent
{
public
:
SrtTransportImp
(
const
EventPoller
::
Ptr
&
poller
);
~
SrtTransportImp
();
/// SockInfo override
std
::
string
get_local_ip
()
override
;
uint16_t
get_local_port
()
override
;
std
::
string
get_peer_ip
()
override
;
uint16_t
get_peer_port
()
override
;
std
::
string
getIdentifier
()
const
override
;
protected
:
///////SrtTransport override///////
void
onHandShakeFinished
(
std
::
string
&
streamid
,
struct
sockaddr_storage
*
addr
)
override
;
void
onSRTData
(
DataPacket
::
Ptr
pkt
,
struct
sockaddr_storage
*
addr
)
override
;
void
onShutdown
(
const
SockException
&
ex
)
override
;
///////MediaSourceEvent override///////
// 关闭
bool
close
(
mediakit
::
MediaSource
&
sender
,
bool
force
)
override
;
// 播放总人数
int
totalReaderCount
(
mediakit
::
MediaSource
&
sender
)
override
;
// 获取媒体源类型
mediakit
::
MediaOriginType
getOriginType
(
mediakit
::
MediaSource
&
sender
)
const
override
;
// 获取媒体源url或者文件路径
std
::
string
getOriginUrl
(
mediakit
::
MediaSource
&
sender
)
const
override
;
// 获取媒体源客户端相关信息
std
::
shared_ptr
<
SockInfo
>
getOriginSock
(
mediakit
::
MediaSource
&
sender
)
const
override
;
bool
inputFrame
(
const
Frame
::
Ptr
&
frame
)
override
;
bool
addTrack
(
const
Track
::
Ptr
&
track
)
override
;
void
addTrackCompleted
()
override
;
void
resetTracks
()
override
{};
private
:
void
emitOnPublish
();
void
emitOnPlay
();
void
doPlay
();
void
doCachedFunc
();
private
:
bool
_is_pusher
=
true
;
MediaInfo
_media_info
;
std
::
unique_ptr
<
sockaddr_storage
>
_addr
;
// for pusher
MultiMediaSourceMuxer
::
Ptr
_muxer
;
DecoderImp
::
Ptr
_decoder
;
std
::
recursive_mutex
_func_mtx
;
std
::
deque
<
std
::
function
<
void
()
>
>
_cached_func
;
};
}
// namespace SRT
#endif // ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论