Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
43d7479c
Commit
43d7479c
authored
Apr 17, 2023
by
xiongguangjie
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' into dev
parents
772fccba
f949c6de
隐藏空白字符变更
内嵌
并排
正在显示
19 个修改的文件
包含
307 行增加
和
147 行删除
+307
-147
README.md
+4
-1
conf/config.ini
+3
-0
server/WebApi.cpp
+23
-0
server/WebHook.cpp
+21
-11
src/Common/MediaSink.cpp
+6
-0
src/Common/MediaSink.h
+6
-0
src/Common/MediaSource.cpp
+0
-2
src/Common/MultiMediaSourceMuxer.cpp
+88
-63
src/Common/MultiMediaSourceMuxer.h
+8
-2
src/Common/config.cpp
+2
-0
src/Common/config.h
+2
-0
src/Common/macros.cpp
+1
-1
src/Common/macros.h
+7
-0
src/Extension/H265Rtp.cpp
+106
-45
src/Extension/H265Rtp.h
+16
-0
src/Rtp/PSDecoder.cpp
+1
-1
src/Rtp/RtpCache.cpp
+0
-14
src/Rtp/RtpCache.h
+0
-3
tests/test_rtp.cpp
+13
-4
没有找到文件。
README.md
查看文件 @
43d7479c
...
...
@@ -198,7 +198,9 @@ bash build_docker_images.sh
## 联系方式
-
邮箱:
<1213642868@qq.com>
(本项目相关或流媒体相关问题请走issue流程,否则恕不邮件答复)
-
QQ群:两个qq群已满员(共4000人),后续将不再新建qq群,用户可加入
[
知识星球
](
https://t.zsxq.com/0cVcuquPJ
)
提问以支持本项目。
-
QQ群:两个qq群已满员(共4000人),后续将不再新建qq群,用户可加入
[
知识星球
](
https://github.com/ZLMediaKit/ZLMediaKit/issues/2364
)
提问以支持本项目。
-
关注微信公众号:
<img
src=
https://user-images.githubusercontent.com/11495632/232451702-4c50bc72-84d8-4c94-af2b-57290088ba7a.png
width=
15%
/>
## 怎么提问?
...
...
@@ -208,6 +210,7 @@ bash build_docker_images.sh
-
2、如果您的问题还没解决,可以提issue.
-
3、有些问题,如果不具备参考性的,无需在issue提的,可以在qq群提.
-
4、QQ私聊一般不接受无偿技术咨询和支持(
[
为什么不提倡QQ私聊
](
https://github.com/ZLMediaKit/ZLMediaKit/wiki/%E4%B8%BA%E4%BB%80%E4%B9%88%E4%B8%8D%E5%BB%BA%E8%AE%AEQQ%E7%A7%81%E8%81%8A%E5%92%A8%E8%AF%A2%E9%97%AE%E9%A2%98%EF%BC%9F
)
).
-
5、如果需要获取更及时贴心的技术支持,可以有偿加入
[
知识星球
](
https://github.com/ZLMediaKit/ZLMediaKit/issues/2364
)
.
## 特别感谢
...
...
conf/config.ini
查看文件 @
43d7479c
...
...
@@ -295,6 +295,9 @@ h265_pt=99
ps_pt
=
96
#rtp opus 负载的pt
opus_pt
=
100
#RtpSender相关功能是否提前开启gop缓存优化级联秒开体验,默认开启
#如果不调用startSendRtp相关接口,可以置0节省内存
gop_cache
=
1
[rtc]
#rtc播放推流、播放超时时间
...
...
server/WebApi.cpp
查看文件 @
43d7479c
...
...
@@ -1615,6 +1615,29 @@ void installWebApi() {
}
});
});
static
auto
whip_whep_func
=
[](
const
char
*
type
,
API_ARGS_STRING_ASYNC
)
{
auto
offer
=
allArgs
.
getArgs
();
CHECK
(
!
offer
.
empty
(),
"http body(webrtc offer sdp) is empty"
);
WebRtcPluginManager
::
Instance
().
getAnswerSdp
(
*
(
static_cast
<
Session
*>
(
&
sender
)),
type
,
WebRtcArgsImp
(
allArgs
,
sender
.
getIdentifier
()),
[
invoker
,
offer
,
headerOut
](
const
WebRtcInterface
&
exchanger
)
mutable
{
// 设置跨域
headerOut
[
"Access-Control-Allow-Origin"
]
=
"*"
;
try
{
// 设置返回类型
headerOut
[
"Content-Type"
]
=
"application/sdp"
;
invoker
(
201
,
headerOut
,
const_cast
<
WebRtcInterface
&>
(
exchanger
).
getAnswerSdp
(
offer
));
}
catch
(
std
::
exception
&
ex
)
{
headerOut
[
"Content-Type"
]
=
"text/plain"
;
invoker
(
406
,
headerOut
,
ex
.
what
());
}
});
};
api_regist
(
"/index/api/whip"
,
[](
API_ARGS_STRING_ASYNC
)
{
whip_whep_func
(
"push"
,
API_ARGS_VALUE
,
invoker
);
});
api_regist
(
"/index/api/whep"
,
[](
API_ARGS_STRING_ASYNC
)
{
whip_whep_func
(
"play"
,
API_ARGS_VALUE
,
invoker
);
});
#endif
#if defined(ENABLE_VERSION)
...
...
server/WebHook.cpp
查看文件 @
43d7479c
...
...
@@ -94,15 +94,16 @@ static onceToken token([]() {
}
//namespace Cluster
static
void
parse_http_response
(
const
SockException
&
ex
,
const
Parser
&
res
,
const
function
<
void
(
const
Value
&
,
const
string
&
)
>
&
fun
){
const
function
<
void
(
const
Value
&
,
const
string
&
,
const
bool
&
)
>
&
fun
){
bool
should_retry
=
true
;
if
(
ex
)
{
auto
errStr
=
StrPrinter
<<
"[network err]:"
<<
ex
.
what
()
<<
endl
;
fun
(
Json
::
nullValue
,
errStr
);
fun
(
Json
::
nullValue
,
errStr
,
should_retry
);
return
;
}
if
(
res
.
Url
()
!=
"200"
)
{
auto
errStr
=
StrPrinter
<<
"[bad http status code]:"
<<
res
.
Url
()
<<
endl
;
fun
(
Json
::
nullValue
,
errStr
);
fun
(
Json
::
nullValue
,
errStr
,
should_retry
);
return
;
}
Value
result
;
...
...
@@ -111,20 +112,29 @@ static void parse_http_response(const SockException &ex, const Parser &res,
ss
>>
result
;
}
catch
(
std
::
exception
&
ex
)
{
auto
errStr
=
StrPrinter
<<
"[parse json failed]:"
<<
ex
.
what
()
<<
endl
;
fun
(
Json
::
nullValue
,
errStr
);
fun
(
Json
::
nullValue
,
errStr
,
should_retry
);
return
;
}
if
(
result
[
"code"
].
asInt
()
!=
0
)
{
auto
errStr
=
StrPrinter
<<
"[json code]:"
<<
"code="
<<
result
[
"code"
]
<<
",msg="
<<
result
[
"msg"
]
<<
endl
;
fun
(
Json
::
nullValue
,
errStr
);
auto
code
=
result
[
"code"
];
if
(
!
code
.
isInt64
())
{
auto
errStr
=
StrPrinter
<<
"[json code]:"
<<
"code not int :"
<<
code
<<
endl
;
fun
(
Json
::
nullValue
,
errStr
,
should_retry
);
return
;
}
should_retry
=
false
;
if
(
code
.
asInt64
()
!=
0
){
auto
errStr
=
StrPrinter
<<
"[auth failed]: code:"
<<
code
<<
" msg:"
<<
result
[
"msg"
]
<<
endl
;
fun
(
Json
::
nullValue
,
errStr
,
should_retry
);
return
;
}
try
{
fun
(
result
,
""
);
fun
(
result
,
""
,
should_retry
);
}
catch
(
std
::
exception
&
ex
)
{
auto
errStr
=
StrPrinter
<<
"[do hook invoker failed]:"
<<
ex
.
what
()
<<
endl
;
//如果还是抛异常,那么再上抛异常
fun
(
Json
::
nullValue
,
errStr
);
fun
(
Json
::
nullValue
,
errStr
,
should_retry
);
}
}
...
...
@@ -173,12 +183,12 @@ void do_http_hook(const string &url, const ArgsType &body, const function<void(c
Ticker
ticker
;
requester
->
startRequester
(
url
,
[
url
,
func
,
bodyStr
,
body
,
requester
,
ticker
,
retry
](
const
SockException
&
ex
,
const
Parser
&
res
)
mutable
{
onceToken
token
(
nullptr
,
[
&
]()
mutable
{
requester
.
reset
();
});
parse_http_response
(
ex
,
res
,
[
&
](
const
Value
&
obj
,
const
string
&
err
)
{
parse_http_response
(
ex
,
res
,
[
&
](
const
Value
&
obj
,
const
string
&
err
,
const
bool
&
should_retry
)
{
if
(
!
err
.
empty
())
{
// hook失败
WarnL
<<
"hook "
<<
url
<<
" "
<<
ticker
.
elapsedTime
()
<<
"ms,failed"
<<
err
<<
":"
<<
bodyStr
;
if
(
retry
--
>
0
)
{
if
(
retry
--
>
0
&&
should_retry
)
{
requester
->
getPoller
()
->
doDelayTask
(
MAX
(
retry_delay
,
0.0
)
*
1000
,
[
url
,
body
,
func
,
retry
]
{
do_http_hook
(
url
,
body
,
func
,
retry
);
return
0
;
...
...
src/Common/MediaSink.cpp
查看文件 @
43d7479c
...
...
@@ -63,6 +63,7 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) {
void
MediaSink
::
resetTracks
()
{
_all_track_ready
=
false
;
_have_video
=
false
;
_track_map
.
clear
();
_track_ready_callback
.
clear
();
_ticker
.
resetTime
();
...
...
@@ -186,6 +187,7 @@ void MediaSink::onAllTrackReady_l() {
}
onAllTrackReady
();
_all_track_ready
=
true
;
_have_video
=
(
bool
)
getTrack
(
TrackVideo
);
}
vector
<
Track
::
Ptr
>
MediaSink
::
getTracks
(
bool
ready
)
const
{
...
...
@@ -292,6 +294,10 @@ void MediaSink::enableMuteAudio(bool flag) {
_add_mute_audio
=
flag
;
}
bool
MediaSink
::
haveVideo
()
const
{
return
_have_video
;
}
///////////////////////////DemuxerSink//////////////////////////////
void
MediaSinkDelegate
::
setTrackListener
(
TrackListener
*
listener
)
{
...
...
src/Common/MediaSink.h
查看文件 @
43d7479c
...
...
@@ -131,6 +131,11 @@ public:
*/
void
enableMuteAudio
(
bool
flag
);
/**
* 是否有视频track
*/
bool
haveVideo
()
const
;
protected
:
/**
* 某track已经准备好,其ready()状态返回true,
...
...
@@ -171,6 +176,7 @@ private:
bool
_only_audio
=
false
;
bool
_add_mute_audio
=
true
;
bool
_all_track_ready
=
false
;
bool
_have_video
=
false
;
size_t
_max_track_size
=
2
;
std
::
unordered_map
<
int
,
std
::
pair
<
Track
::
Ptr
,
bool
/*got frame*/
>
>
_track_map
;
std
::
unordered_map
<
int
,
toolkit
::
List
<
Frame
::
Ptr
>
>
_frame_unread
;
...
...
src/Common/MediaSource.cpp
查看文件 @
43d7479c
...
...
@@ -153,8 +153,6 @@ std::shared_ptr<void> MediaSource::getOwnership() {
//已经被所有
return
nullptr
;
}
// 关闭所有rtp推流,确保线程安全
stopSendRtp
(
""
);
weak_ptr
<
MediaSource
>
weak_self
=
shared_from_this
();
//确保返回的Ownership智能指针不为空,0x01无实际意义
return
std
::
shared_ptr
<
void
>
((
void
*
)
0x01
,
[
weak_self
](
void
*
ptr
)
{
...
...
src/Common/MultiMediaSourceMuxer.cpp
查看文件 @
43d7479c
...
...
@@ -21,6 +21,17 @@ namespace toolkit {
namespace
mediakit
{
namespace
{
class
MediaSourceForMuxer
:
public
MediaSource
{
public
:
MediaSourceForMuxer
(
const
MultiMediaSourceMuxer
::
Ptr
&
muxer
)
:
MediaSource
(
"muxer"
,
muxer
->
getVhost
(),
muxer
->
getApp
(),
muxer
->
getStreamId
())
{
MediaSource
::
setListener
(
muxer
);
}
int
readerCount
()
override
{
return
0
;
}
};
}
// namespace
static
std
::
shared_ptr
<
MediaSinkInterface
>
makeRecorder
(
MediaSource
&
sender
,
const
vector
<
Track
::
Ptr
>
&
tracks
,
Recorder
::
type
type
,
const
ProtocolOption
&
option
){
auto
recorder
=
Recorder
::
createRecorder
(
type
,
sender
.
getVhost
(),
sender
.
getApp
(),
sender
.
getId
(),
option
);
for
(
auto
&
track
:
tracks
)
{
...
...
@@ -148,20 +159,15 @@ std::weak_ptr<MultiMediaSourceMuxer::Listener> MultiMediaSourceMuxer::getTrackLi
int
MultiMediaSourceMuxer
::
totalReaderCount
()
const
{
auto
hls
=
_hls
;
auto
ret
=
(
_rtsp
?
_rtsp
->
readerCount
()
:
0
)
+
(
_rtmp
?
_rtmp
->
readerCount
()
:
0
)
+
(
_ts
?
_ts
->
readerCount
()
:
0
)
+
#if defined(ENABLE_MP4)
(
_fmp4
?
_fmp4
->
readerCount
()
:
0
)
+
#endif
(
_mp4
?
_option
.
mp4_as_player
:
0
)
+
(
hls
?
hls
->
readerCount
()
:
0
);
#if defined(ENABLE_RTPPROXY)
return
ret
+
(
int
)
_rtp_sender
.
size
();
#else
return
ret
;
#endif
return
(
_rtsp
?
_rtsp
->
readerCount
()
:
0
)
+
(
_rtmp
?
_rtmp
->
readerCount
()
:
0
)
+
(
_ts
?
_ts
->
readerCount
()
:
0
)
+
#if defined(ENABLE_MP4)
(
_fmp4
?
_fmp4
->
readerCount
()
:
0
)
+
#endif
(
_mp4
?
_option
.
mp4_as_player
:
0
)
+
(
hls
?
hls
->
readerCount
()
:
0
)
+
(
_ring
?
_ring
->
readerCount
()
:
0
);
}
void
MultiMediaSourceMuxer
::
setTimeStamp
(
uint32_t
stamp
)
{
...
...
@@ -241,42 +247,45 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type
void
MultiMediaSourceMuxer
::
startSendRtp
(
MediaSource
&
sender
,
const
MediaSourceEvent
::
SendRtpArgs
&
args
,
const
std
::
function
<
void
(
uint16_t
,
const
toolkit
::
SockException
&
)
>
cb
)
{
#if defined(ENABLE_RTPPROXY)
createGopCacheIfNeed
();
auto
ring
=
_ring
;
auto
ssrc
=
args
.
ssrc
;
auto
tracks
=
getTracks
(
false
);
auto
rtp_sender
=
std
::
make_shared
<
RtpSender
>
(
getOwnerPoller
(
sender
));
weak_ptr
<
MediaSource
>
weak_sender
=
sender
.
shared_from_this
();
weak_ptr
<
MultiMediaSourceMuxer
>
weak_self
=
shared_from_this
();
rtp_sender
->
startSend
(
args
,
[
args
,
weak_self
,
rtp_sender
,
cb
,
weak_sender
](
uint16_t
local_port
,
const
SockException
&
ex
)
mutable
{
rtp_sender
->
startSend
(
args
,
[
ssrc
,
weak_self
,
rtp_sender
,
cb
,
tracks
,
ring
](
uint16_t
local_port
,
const
SockException
&
ex
)
mutable
{
cb
(
local_port
,
ex
);
auto
strong_self
=
weak_self
.
lock
();
if
(
!
strong_self
||
ex
)
{
return
;
}
if
(
!
strong_self
->
getOwnerPoller
(
MediaSource
::
NullMediaSource
())
->
isCurrentThread
())
{
// poller线程发生变更了
return
;
}
for
(
auto
&
track
:
strong_self
->
getTracks
(
false
))
{
for
(
auto
&
track
:
tracks
)
{
rtp_sender
->
addTrack
(
track
);
}
rtp_sender
->
addTrackCompleted
();
auto
ssrc
=
args
.
ssrc
;
rtp_sender
->
setOnClose
([
weak_self
,
ssrc
,
weak_sender
](
const
toolkit
::
SockException
&
ex
)
{
rtp_sender
->
setOnClose
([
weak_self
,
ssrc
](
const
toolkit
::
SockException
&
ex
)
{
if
(
auto
strong_self
=
weak_self
.
lock
())
{
WarnL
<<
"stream:"
<<
strong_self
->
shortUrl
()
<<
" stop send rtp:"
<<
ssrc
<<
", reason:"
<<
ex
.
what
();
strong_self
->
_rtp_sender
.
erase
(
ssrc
);
//触发观看人数统计
auto
strong_sender
=
weak_sender
.
lock
();
if
(
strong_sender
)
{
strong_self
->
onReaderChanged
(
*
strong_sender
,
strong_self
->
totalReaderCount
());
}
NoticeCenter
::
Instance
().
emitEvent
(
Broadcast
::
kBroadcastSendRtpStopped
,
*
strong_self
,
ssrc
,
ex
);
// 可能归属线程发生变更
strong_self
->
getOwnerPoller
(
MediaSource
::
NullMediaSource
())
->
async
([
=
]()
{
WarnL
<<
"stream:"
<<
strong_self
->
shortUrl
()
<<
" stop send rtp:"
<<
ssrc
<<
", reason:"
<<
ex
.
what
();
strong_self
->
_rtp_sender
.
erase
(
ssrc
);
NoticeCenter
::
Instance
().
emitEvent
(
Broadcast
::
kBroadcastSendRtpStopped
,
*
strong_self
,
ssrc
,
ex
);
});
}
});
strong_self
->
_rtp_sender
[
args
.
ssrc
]
=
std
::
move
(
rtp_sender
);
auto
strong_sender
=
weak_sender
.
lock
();
if
(
strong_sender
)
{
strong_self
->
onReaderChanged
(
*
strong_sender
,
strong_self
->
totalReaderCount
());
}
auto
reader
=
ring
->
attach
(
EventPoller
::
getCurrentPoller
());
reader
->
setReadCB
([
rtp_sender
](
const
Frame
::
Ptr
&
frame
)
{
rtp_sender
->
inputFrame
(
frame
);
});
// 可能归属线程发生变更
strong_self
->
getOwnerPoller
(
MediaSource
::
NullMediaSource
())
->
async
([
=
]()
{
strong_self
->
_rtp_sender
[
ssrc
]
=
std
::
move
(
reader
);
});
});
#else
cb
(
0
,
SockException
(
Err_other
,
"该功能未启用,编译时请打开ENABLE_RTPPROXY宏"
));
...
...
@@ -285,10 +294,6 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE
bool
MultiMediaSourceMuxer
::
stopSendRtp
(
MediaSource
&
sender
,
const
string
&
ssrc
)
{
#if defined(ENABLE_RTPPROXY)
onceToken
token
(
nullptr
,
[
&
]()
{
//关闭rtp推流,可能触发无人观看事件
onReaderChanged
(
sender
,
totalReaderCount
());
});
if
(
ssrc
.
empty
())
{
//关闭全部
auto
size
=
_rtp_sender
.
size
();
...
...
@@ -373,9 +378,33 @@ void MultiMediaSourceMuxer::onAllTrackReady() {
if
(
listener
)
{
listener
->
onAllTrackReady
();
}
#if defined(ENABLE_RTPPROXY)
GET_CONFIG
(
bool
,
gop_cache
,
RtpProxy
::
kGopCache
);
if
(
gop_cache
)
{
createGopCacheIfNeed
();
}
#endif
InfoL
<<
"stream: "
<<
shortUrl
()
<<
" , codec info: "
<<
getTrackInfoStr
(
this
);
}
void
MultiMediaSourceMuxer
::
createGopCacheIfNeed
()
{
if
(
_ring
)
{
return
;
}
weak_ptr
<
MultiMediaSourceMuxer
>
weak_self
=
shared_from_this
();
_ring
=
std
::
make_shared
<
RingType
>
(
1024
,
[
weak_self
](
int
size
)
{
auto
strong_self
=
weak_self
.
lock
();
if
(
strong_self
)
{
// 切换到归属线程
strong_self
->
getOwnerPoller
(
MediaSource
::
NullMediaSource
())
->
async
([
=
]()
{
auto
src
=
std
::
make_shared
<
MediaSourceForMuxer
>
(
strong_self
);
strong_self
->
onReaderChanged
(
*
src
,
strong_self
->
totalReaderCount
());
});
}
});
}
void
MultiMediaSourceMuxer
::
resetTracks
()
{
MediaSink
::
resetTracks
();
...
...
@@ -394,12 +423,6 @@ void MultiMediaSourceMuxer::resetTracks() {
}
#endif
#if defined(ENABLE_RTPPROXY)
for
(
auto
&
pr
:
_rtp_sender
)
{
pr
.
second
->
resetTracks
();
}
#endif
//拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto
hls
=
_hls
;
if
(
hls
)
{
...
...
@@ -447,11 +470,17 @@ bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) {
}
#endif
#if defined(ENABLE_RTPPROXY)
for
(
auto
&
pr
:
_rtp_sender
)
{
ret
=
pr
.
second
->
inputFrame
(
frame
)
?
true
:
ret
;
if
(
_ring
)
{
if
(
frame
->
getTrackType
()
==
TrackVideo
)
{
// 视频时,遇到第一帧配置帧或关键帧则标记为gop开始处
auto
video_key_pos
=
frame
->
keyFrame
()
||
frame
->
configFrame
();
_ring
->
write
(
frame
,
video_key_pos
&&
!
_video_key_pos
);
_video_key_pos
=
video_key_pos
;
}
else
{
// 没有视频时,设置is_key为true,目的是关闭gop缓存
_ring
->
write
(
frame
,
!
haveVideo
());
}
}
#endif //ENABLE_RTPPROXY
return
ret
;
}
...
...
@@ -461,19 +490,15 @@ bool MultiMediaSourceMuxer::isEnabled(){
//无人观看时,每次检查是否真的无人观看
//有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能)
auto
hls
=
_hls
;
auto
flag
=
(
_rtmp
?
_rtmp
->
isEnabled
()
:
false
)
||
(
_rtsp
?
_rtsp
->
isEnabled
()
:
false
)
||
(
_ts
?
_ts
->
isEnabled
()
:
false
)
||
#if defined(ENABLE_MP4)
(
_fmp4
?
_fmp4
->
isEnabled
()
:
false
)
||
#endif
(
hls
?
hls
->
isEnabled
()
:
false
)
||
_mp4
;
_is_enable
=
(
_rtmp
?
_rtmp
->
isEnabled
()
:
false
)
||
(
_rtsp
?
_rtsp
->
isEnabled
()
:
false
)
||
(
_ts
?
_ts
->
isEnabled
()
:
false
)
||
#if defined(ENABLE_MP4)
(
_fmp4
?
_fmp4
->
isEnabled
()
:
false
)
||
#endif
(
_ring
?
(
bool
)
_ring
->
readerCount
()
:
false
)
||
(
hls
?
hls
->
isEnabled
()
:
false
)
||
_mp4
;
#if defined(ENABLE_RTPPROXY)
_is_enable
=
flag
||
_rtp_sender
.
size
();
#else
_is_enable
=
flag
;
#endif //ENABLE_RTPPROXY
if
(
_is_enable
)
{
//无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍,所以刷新计数器无意义且浪费cpu
_last_check
.
resetTime
();
...
...
src/Common/MultiMediaSourceMuxer.h
查看文件 @
43d7479c
...
...
@@ -28,6 +28,7 @@ namespace mediakit {
class
MultiMediaSourceMuxer
:
public
MediaSourceEventInterceptor
,
public
MediaSink
,
public
std
::
enable_shared_from_this
<
MultiMediaSourceMuxer
>
{
public
:
using
Ptr
=
std
::
shared_ptr
<
MultiMediaSourceMuxer
>
;
using
RingType
=
toolkit
::
RingBuffer
<
Frame
::
Ptr
>
;
class
Listener
{
public
:
...
...
@@ -46,7 +47,7 @@ public:
void
setMediaListener
(
const
std
::
weak_ptr
<
MediaSourceEvent
>
&
listener
);
/**
*
随着
Track就绪事件监听器
*
设置
Track就绪事件监听器
* @param listener 事件监听器
*/
void
setTrackListener
(
const
std
::
weak_ptr
<
Listener
>
&
listener
);
...
...
@@ -157,8 +158,12 @@ protected:
bool
onTrackFrame
(
const
Frame
::
Ptr
&
frame
)
override
;
private
:
void
createGopCacheIfNeed
();
private
:
bool
_is_enable
=
false
;
bool
_create_in_poller
=
false
;
bool
_video_key_pos
=
false
;
std
::
string
_vhost
;
std
::
string
_app
;
std
::
string
_stream_id
;
...
...
@@ -167,7 +172,7 @@ private:
Stamp
_stamp
[
2
];
std
::
weak_ptr
<
Listener
>
_track_listener
;
#if defined(ENABLE_RTPPROXY)
std
::
unordered_map
<
std
::
string
,
R
tpSen
der
::
Ptr
>
_rtp_sender
;
std
::
unordered_map
<
std
::
string
,
R
ingType
::
RingRea
der
::
Ptr
>
_rtp_sender
;
#endif //ENABLE_RTPPROXY
#if defined(ENABLE_MP4)
...
...
@@ -179,6 +184,7 @@ private:
MediaSinkInterface
::
Ptr
_mp4
;
HlsRecorder
::
Ptr
_hls
;
toolkit
::
EventPoller
::
Ptr
_poller
;
RingType
::
Ptr
_ring
;
//对象个数统计
toolkit
::
ObjectStatistic
<
MultiMediaSourceMuxer
>
_statistic
;
...
...
src/Common/config.cpp
查看文件 @
43d7479c
...
...
@@ -318,6 +318,7 @@ const string kH264PT = RTP_PROXY_FIELD "h264_pt";
const
string
kH265PT
=
RTP_PROXY_FIELD
"h265_pt"
;
const
string
kPSPT
=
RTP_PROXY_FIELD
"ps_pt"
;
const
string
kOpusPT
=
RTP_PROXY_FIELD
"opus_pt"
;
const
string
kGopCache
=
RTP_PROXY_FIELD
"gop_cache"
;
static
onceToken
token
([]()
{
mINI
::
Instance
()[
kDumpDir
]
=
""
;
...
...
@@ -327,6 +328,7 @@ static onceToken token([]() {
mINI
::
Instance
()[
kH265PT
]
=
99
;
mINI
::
Instance
()[
kPSPT
]
=
96
;
mINI
::
Instance
()[
kOpusPT
]
=
100
;
mINI
::
Instance
()[
kGopCache
]
=
1
;
});
}
// namespace RtpProxy
...
...
src/Common/config.h
查看文件 @
43d7479c
...
...
@@ -352,6 +352,8 @@ extern const std::string kH265PT;
extern
const
std
::
string
kPSPT
;
// rtp server opus 的pt
extern
const
std
::
string
kOpusPT
;
// RtpSender相关功能是否提前开启gop缓存优化级联秒开体验,默认开启
extern
const
std
::
string
kGopCache
;
}
// namespace RtpProxy
/**
...
...
src/Common/macros.cpp
查看文件 @
43d7479c
...
...
@@ -26,7 +26,7 @@ void Assert_Throw(int failed, const char *exp, const char *func, const char *fil
printer
<<
", "
<<
str
;
}
printer
<<
"), function "
<<
func
<<
", file "
<<
file
<<
", line "
<<
line
<<
"."
;
throw
std
::
runtime_error
(
printer
);
throw
mediakit
::
AssertFailedException
(
printer
);
}
}
}
...
...
src/Common/macros.h
查看文件 @
43d7479c
...
...
@@ -80,6 +80,13 @@ extern void Assert_Throw(int failed, const char *exp, const char *func, const ch
namespace
mediakit
{
class
AssertFailedException
:
public
std
::
runtime_error
{
public
:
template
<
typename
...
T
>
AssertFailedException
(
T
&&
...
args
)
:
std
::
runtime_error
(
std
::
forward
<
T
>
(
args
)...)
{}
~
AssertFailedException
()
override
=
default
;
};
extern
const
char
kServerName
[];
template
<
typename
...
ARGS
>
...
...
src/Extension/H265Rtp.cpp
查看文件 @
43d7479c
...
...
@@ -9,7 +9,7 @@
*/
#include "H265Rtp.h"
#include "Common/config.h"
namespace
mediakit
{
//https://datatracker.ietf.org/doc/rfc7798/
...
...
@@ -258,58 +258,119 @@ H265RtpEncoder::H265RtpEncoder(uint32_t ui32Ssrc,
ui8Interleaved
)
{
}
void
H265RtpEncoder
::
packRtpFu
(
const
char
*
ptr
,
size_t
len
,
uint64_t
pts
,
bool
is_mark
,
bool
gop_pos
){
auto
max_size
=
getMaxSize
()
-
3
;
auto
nal_type
=
H265_TYPE
(
ptr
[
0
]);
//获取NALU的5bit 帧类型
unsigned
char
s_e_flags
;
bool
fu_start
=
true
;
bool
mark_bit
=
false
;
size_t
offset
=
2
;
while
(
!
mark_bit
)
{
if
(
len
<=
offset
+
max_size
)
{
// FU end
mark_bit
=
true
;
max_size
=
len
-
offset
;
s_e_flags
=
(
1
<<
6
)
|
nal_type
;
}
else
if
(
fu_start
)
{
// FU start
s_e_flags
=
(
1
<<
7
)
|
nal_type
;
}
else
{
// FU mid
s_e_flags
=
nal_type
;
}
{
// 传入nullptr先不做payload的内存拷贝
auto
rtp
=
makeRtp
(
getTrackType
(),
nullptr
,
max_size
+
3
,
mark_bit
,
pts
);
// rtp payload 负载部分
uint8_t
*
payload
=
rtp
->
getPayload
();
// FU 第1个字节,表明为FU
payload
[
0
]
=
49
<<
1
;
// FU 第2个字节貌似固定为1
payload
[
1
]
=
ptr
[
1
];
// 1;
// FU 第3个字节
payload
[
2
]
=
s_e_flags
;
// H265 数据
memcpy
(
payload
+
3
,
ptr
+
offset
,
max_size
);
// 输入到rtp环形缓存
RtpCodec
::
inputRtp
(
rtp
,
fu_start
&&
gop_pos
);
}
offset
+=
max_size
;
fu_start
=
false
;
}
}
void
H265RtpEncoder
::
packRtp
(
const
char
*
ptr
,
size_t
len
,
uint64_t
pts
,
bool
is_mark
,
bool
gop_pos
){
if
(
len
+
3
<=
getMaxSize
())
{
//signal-nalu
RtpCodec
::
inputRtp
(
makeRtp
(
getTrackType
(),
ptr
,
len
,
is_mark
,
pts
),
gop_pos
);
}
else
{
//FU-A模式
packRtpFu
(
ptr
,
len
,
pts
,
is_mark
,
gop_pos
);
}
}
void
H265RtpEncoder
::
insertConfigFrame
(
uint64_t
pts
){
if
(
!
_sps
||
!
_pps
||
!
_vps
)
{
WarnL
<<
" not ok"
;
return
;
}
//gop缓存从vps 开始,vps ,sps、pps后面还有时间戳相同的关键帧,所以mark bit为false
packRtp
(
_vps
->
data
()
+
_vps
->
prefixSize
(),
_vps
->
size
()
-
_vps
->
prefixSize
(),
pts
,
false
,
true
);
packRtp
(
_sps
->
data
()
+
_sps
->
prefixSize
(),
_sps
->
size
()
-
_sps
->
prefixSize
(),
pts
,
false
,
false
);
packRtp
(
_pps
->
data
()
+
_pps
->
prefixSize
(),
_pps
->
size
()
-
_pps
->
prefixSize
(),
pts
,
false
,
false
);
}
bool
H265RtpEncoder
::
inputFrame_l
(
const
Frame
::
Ptr
&
frame
,
bool
is_mark
){
if
(
frame
->
keyFrame
())
{
//保证每一个关键帧前都有SPS PPS VPS
insertConfigFrame
(
frame
->
pts
());
}
packRtp
(
frame
->
data
()
+
frame
->
prefixSize
(),
frame
->
size
()
-
frame
->
prefixSize
(),
frame
->
pts
(),
is_mark
,
false
);
return
true
;
}
bool
H265RtpEncoder
::
inputFrame
(
const
Frame
::
Ptr
&
frame
)
{
auto
ptr
=
(
uint8_t
*
)
frame
->
data
()
+
frame
->
prefixSize
();
auto
len
=
frame
->
size
()
-
frame
->
prefixSize
();
auto
pts
=
frame
->
pts
();
auto
nal_type
=
H265_TYPE
(
ptr
[
0
]);
//获取NALU的5bit 帧类型
auto
max_size
=
getMaxSize
()
-
3
;
//超过MTU,按照FU方式打包
if
(
len
>
max_size
+
2
)
{
//获取帧头数据,1byte
unsigned
char
s_e_flags
;
bool
fu_start
=
true
;
bool
mark_bit
=
false
;
size_t
offset
=
2
;
while
(
!
mark_bit
)
{
if
(
len
<=
offset
+
max_size
)
{
//FU end
mark_bit
=
true
;
max_size
=
len
-
offset
;
s_e_flags
=
(
1
<<
6
)
|
nal_type
;
}
else
if
(
fu_start
)
{
//FU start
s_e_flags
=
(
1
<<
7
)
|
nal_type
;
}
else
{
//FU mid
s_e_flags
=
nal_type
;
}
{
//传入nullptr先不做payload的内存拷贝
auto
rtp
=
makeRtp
(
getTrackType
(),
nullptr
,
max_size
+
3
,
mark_bit
,
pts
);
//rtp payload 负载部分
uint8_t
*
payload
=
rtp
->
getPayload
();
//FU 第1个字节,表明为FU
payload
[
0
]
=
49
<<
1
;
//FU 第2个字节貌似固定为1
payload
[
1
]
=
ptr
[
1
];
// 1;
//FU 第3个字节
payload
[
2
]
=
s_e_flags
;
//H265 数据
memcpy
(
payload
+
3
,
ptr
+
offset
,
max_size
);
//输入到rtp环形缓存
RtpCodec
::
inputRtp
(
rtp
,
fu_start
&&
frame
->
keyFrame
());
}
switch
(
nal_type
)
{
case
H265Frame
:
:
NAL_SPS
:
{
_sps
=
Frame
::
getCacheAbleFrame
(
frame
);
return
true
;
}
case
H265Frame
:
:
NAL_PPS
:
{
_pps
=
Frame
::
getCacheAbleFrame
(
frame
);
return
true
;
}
case
H265Frame
:
:
NAL_VPS
:
{
_vps
=
Frame
::
getCacheAbleFrame
(
frame
);
return
true
;
}
default
:
break
;
}
offset
+=
max_size
;
fu_start
=
false
;
GET_CONFIG
(
int
,
lowLatency
,
Rtp
::
kLowLatency
);
if
(
lowLatency
)
{
// 低延迟模式
if
(
_last_frame
)
{
flush
();
}
inputFrame_l
(
frame
,
true
);
}
else
{
RtpCodec
::
inputRtp
(
makeRtp
(
getTrackType
(),
ptr
,
len
,
false
,
pts
),
frame
->
keyFrame
());
if
(
_last_frame
)
{
//如果时间戳发生了变化,那么markbit才置true
inputFrame_l
(
_last_frame
,
_last_frame
->
pts
()
!=
frame
->
pts
());
}
_last_frame
=
Frame
::
getCacheAbleFrame
(
frame
);
}
return
true
;
}
void
H265RtpEncoder
::
flush
()
{
if
(
_last_frame
)
{
// 如果时间戳发生了变化,那么markbit才置true
inputFrame_l
(
_last_frame
,
true
);
_last_frame
=
nullptr
;
}
return
len
>
0
;
}
}
//namespace mediakit
src/Extension/H265Rtp.h
查看文件 @
43d7479c
...
...
@@ -85,6 +85,22 @@ public:
* @param frame 帧数据,必须
*/
bool
inputFrame
(
const
Frame
::
Ptr
&
frame
)
override
;
/**
* 刷新输出所有frame缓存
*/
void
flush
()
override
;
private
:
void
packRtp
(
const
char
*
ptr
,
size_t
len
,
uint64_t
pts
,
bool
is_mark
,
bool
gop_pos
);
void
packRtpFu
(
const
char
*
ptr
,
size_t
len
,
uint64_t
pts
,
bool
is_mark
,
bool
gop_pos
);
void
insertConfigFrame
(
uint64_t
pts
);
bool
inputFrame_l
(
const
Frame
::
Ptr
&
frame
,
bool
is_mark
);
private
:
Frame
::
Ptr
_sps
;
Frame
::
Ptr
_pps
;
Frame
::
Ptr
_vps
;
Frame
::
Ptr
_last_frame
;
};
}
//namespace mediakit{
...
...
src/Rtp/PSDecoder.cpp
查看文件 @
43d7479c
...
...
@@ -63,7 +63,7 @@ const char *PSDecoder::onSearchPacketTail(const char *data, size_t len) {
//解析失败,丢弃所有数据
return
data
+
len
;
}
catch
(
std
::
e
xception
&
ex
)
{
}
catch
(
AssertFailedE
xception
&
ex
)
{
InfoL
<<
"解析 ps 异常: bytes="
<<
len
<<
", exception="
<<
ex
.
what
()
<<
", hex="
<<
hexdump
(
data
,
MIN
(
len
,
32
));
...
...
src/Rtp/RtpCache.cpp
查看文件 @
43d7479c
...
...
@@ -20,14 +20,6 @@ RtpCache::RtpCache(onFlushed cb) {
_cb
=
std
::
move
(
cb
);
}
bool
RtpCache
::
firstKeyReady
(
bool
in
)
{
if
(
_first_key
)
{
return
_first_key
;
}
_first_key
=
in
;
return
_first_key
;
}
void
RtpCache
::
onFlush
(
std
::
shared_ptr
<
List
<
Buffer
::
Ptr
>>
rtp_list
,
bool
)
{
_cb
(
std
::
move
(
rtp_list
));
}
...
...
@@ -42,9 +34,6 @@ void RtpCachePS::flush() {
}
void
RtpCachePS
::
onRTP
(
Buffer
::
Ptr
buffer
,
bool
is_key
)
{
if
(
!
firstKeyReady
(
is_key
))
{
return
;
}
auto
rtp
=
std
::
static_pointer_cast
<
RtpPacket
>
(
buffer
);
auto
stamp
=
rtp
->
getStampMS
();
input
(
stamp
,
std
::
move
(
buffer
),
is_key
);
...
...
@@ -56,9 +45,6 @@ void RtpCacheRaw::flush() {
}
void
RtpCacheRaw
::
onRTP
(
Buffer
::
Ptr
buffer
,
bool
is_key
)
{
if
(
!
firstKeyReady
(
is_key
))
{
return
;
}
auto
rtp
=
std
::
static_pointer_cast
<
RtpPacket
>
(
buffer
);
auto
stamp
=
rtp
->
getStampMS
();
input
(
stamp
,
std
::
move
(
buffer
),
is_key
);
...
...
src/Rtp/RtpCache.h
查看文件 @
43d7479c
...
...
@@ -32,13 +32,10 @@ protected:
*/
void
input
(
uint64_t
stamp
,
toolkit
::
Buffer
::
Ptr
buffer
,
bool
is_key
=
false
);
bool
firstKeyReady
(
bool
in
);
protected
:
void
onFlush
(
std
::
shared_ptr
<
toolkit
::
List
<
toolkit
::
Buffer
::
Ptr
>
>
rtp_list
,
bool
)
override
;
private
:
bool
_first_key
=
false
;
onFlushed
_cb
;
};
...
...
tests/test_rtp.cpp
查看文件 @
43d7479c
...
...
@@ -26,8 +26,10 @@ using namespace std;
using
namespace
toolkit
;
using
namespace
mediakit
;
static
semaphore
sem
;
#if defined(ENABLE_RTPPROXY)
static
bool
loadFile
(
const
char
*
path
){
static
bool
loadFile
(
const
char
*
path
,
const
EventPoller
::
Ptr
&
poller
){
FILE
*
fp
=
fopen
(
path
,
"rb"
);
if
(
!
fp
)
{
WarnL
<<
"open file failed:"
<<
path
;
...
...
@@ -40,7 +42,7 @@ static bool loadFile(const char *path){
struct
sockaddr_storage
addr
;
memset
(
&
addr
,
0
,
sizeof
(
addr
));
addr
.
ss_family
=
AF_INET
;
auto
sock
=
Socket
::
createSocket
();
auto
sock
=
Socket
::
createSocket
(
poller
);
size_t
total_size
=
0
;
RtpProcess
::
Ptr
process
;
uint32_t
ssrc
=
0
;
...
...
@@ -108,8 +110,15 @@ int main(int argc,char *argv[]) {
//此处选择是否导出调试文件
// mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/";
if
(
argc
==
2
)
loadFile
(
argv
[
1
]);
if
(
argc
==
2
){
auto
poller
=
EventPollerPool
::
Instance
().
getPoller
();
poller
->
async_first
([
poller
,
argv
](){
loadFile
(
argv
[
1
],
poller
);
sem
.
post
();
});
sem
.
wait
();
sleep
(
1
);
}
else
ErrorL
<<
"parameter error."
;
#else
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论