Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
d8530751
Unverified
Commit
d8530751
authored
Sep 09, 2022
by
custompal
Committed by
GitHub
Sep 09, 2022
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
RtpServer新增tcp主动模式支持 (#1938)
parent
258a4dd1
隐藏空白字符变更
内嵌
并排
正在显示
7 个修改的文件
包含
203 行增加
和
42 行删除
+203
-42
api/include/mk_rtp_server.h
+18
-2
api/source/mk_rtp_server.cpp
+20
-9
postman/ZLMediaKit.postman_collection.json
+43
-2
server/WebApi.cpp
+35
-6
server/WebApi.h
+2
-1
src/Rtp/RtpServer.cpp
+64
-18
src/Rtp/RtpServer.h
+21
-4
没有找到文件。
api/include/mk_rtp_server.h
查看文件 @
d8530751
...
@@ -19,11 +19,27 @@ typedef void* mk_rtp_server;
...
@@ -19,11 +19,27 @@ typedef void* mk_rtp_server;
/**
/**
* 创建GB28181 RTP 服务器
* 创建GB28181 RTP 服务器
* @param port 监听端口,0则为随机
* @param port 监听端口,0则为随机
* @param
enable_tcp 创建udp端口时是否同时监听tcp端口
* @param
tcp_mode tcp模式(0: 不监听端口 1: 监听端口 2: 主动连接到服务端)
* @param stream_id 该端口绑定的流id
* @param stream_id 该端口绑定的流id
* @return
* @return
*/
*/
API_EXPORT
mk_rtp_server
API_CALL
mk_rtp_server_create
(
uint16_t
port
,
int
enable_tcp
,
const
char
*
stream_id
);
API_EXPORT
mk_rtp_server
API_CALL
mk_rtp_server_create
(
uint16_t
port
,
int
tcp_mode
,
const
char
*
stream_id
);
/**
* TCP 主动模式时连接到服务器是否成功的回调
*/
typedef
void
(
API_CALL
*
on_mk_rtp_server_connected
)(
void
*
user_data
,
int
err
,
const
char
*
what
,
int
sys_err
);
/**
* TCP 主动模式时连接到服务器
* @param @param ctx 服务器对象
* @param dst_url 服务端地址
* @param dst_port 服务端端口
* @param cb 连接到服务器是否成功的回调
* @param user_data 用户数据指针
* @return
*/
API_EXPORT
void
API_CALL
mk_rtp_server_connect
(
mk_rtp_server
ctx
,
const
char
*
dst_url
,
uint16_t
dst_port
,
on_mk_rtp_server_connected
cb
,
void
*
user_data
);
/**
/**
* 销毁GB28181 RTP 服务器
* 销毁GB28181 RTP 服务器
...
...
api/source/mk_rtp_server.cpp
查看文件 @
d8530751
...
@@ -16,23 +16,34 @@ using namespace toolkit;
...
@@ -16,23 +16,34 @@ using namespace toolkit;
#include "Rtp/RtpServer.h"
#include "Rtp/RtpServer.h"
using
namespace
mediakit
;
using
namespace
mediakit
;
API_EXPORT
mk_rtp_server
API_CALL
mk_rtp_server_create
(
uint16_t
port
,
int
enable_tcp
,
const
char
*
stream_id
)
{
API_EXPORT
mk_rtp_server
API_CALL
mk_rtp_server_create
(
uint16_t
port
,
int
tcp_mode
,
const
char
*
stream_id
)
{
RtpServer
::
Ptr
*
server
=
new
RtpServer
::
Ptr
(
new
RtpServer
);
RtpServer
::
Ptr
*
server
=
new
RtpServer
::
Ptr
(
new
RtpServer
);
(
*
server
)
->
start
(
port
,
stream_id
,
enable_tcp
);
(
*
server
)
->
start
(
port
,
stream_id
,
(
RtpServer
::
TcpMode
)
tcp_mode
);
return
server
;
return
server
;
}
}
API_EXPORT
void
API_CALL
mk_rtp_server_release
(
mk_rtp_server
ctx
){
API_EXPORT
void
API_CALL
mk_rtp_server_connect
(
mk_rtp_server
ctx
,
const
char
*
dst_url
,
uint16_t
dst_port
,
on_mk_rtp_server_connected
cb
,
void
*
user_data
)
{
RtpServer
::
Ptr
*
server
=
(
RtpServer
::
Ptr
*
)
ctx
;
if
(
server
)
{
(
*
server
)
->
connectToServer
(
dst_url
,
dst_port
,
[
cb
,
user_data
](
const
SockException
&
ex
)
{
if
(
cb
)
{
cb
(
user_data
,
ex
.
getErrCode
(),
ex
.
what
(),
ex
.
getCustomCode
());
}
});
}
}
API_EXPORT
void
API_CALL
mk_rtp_server_release
(
mk_rtp_server
ctx
)
{
RtpServer
::
Ptr
*
server
=
(
RtpServer
::
Ptr
*
)
ctx
;
RtpServer
::
Ptr
*
server
=
(
RtpServer
::
Ptr
*
)
ctx
;
delete
server
;
delete
server
;
}
}
API_EXPORT
uint16_t
API_CALL
mk_rtp_server_port
(
mk_rtp_server
ctx
){
API_EXPORT
uint16_t
API_CALL
mk_rtp_server_port
(
mk_rtp_server
ctx
)
{
RtpServer
::
Ptr
*
server
=
(
RtpServer
::
Ptr
*
)
ctx
;
RtpServer
::
Ptr
*
server
=
(
RtpServer
::
Ptr
*
)
ctx
;
return
(
*
server
)
->
getPort
();
return
(
*
server
)
->
getPort
();
}
}
API_EXPORT
void
API_CALL
mk_rtp_server_set_on_detach
(
mk_rtp_server
ctx
,
on_mk_rtp_server_detach
cb
,
void
*
user_data
){
API_EXPORT
void
API_CALL
mk_rtp_server_set_on_detach
(
mk_rtp_server
ctx
,
on_mk_rtp_server_detach
cb
,
void
*
user_data
)
{
RtpServer
::
Ptr
*
server
=
(
RtpServer
::
Ptr
*
)
ctx
;
RtpServer
::
Ptr
*
server
=
(
RtpServer
::
Ptr
*
)
ctx
;
if
(
cb
)
{
if
(
cb
)
{
(
*
server
)
->
setOnDetach
([
cb
,
user_data
]()
{
(
*
server
)
->
setOnDetach
([
cb
,
user_data
]()
{
...
@@ -45,21 +56,21 @@ API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rt
...
@@ -45,21 +56,21 @@ API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rt
#else
#else
API_EXPORT
mk_rtp_server
API_CALL
mk_rtp_server_create
(
uint16_t
port
,
int
enable_tcp
,
const
char
*
stream_id
){
API_EXPORT
mk_rtp_server
API_CALL
mk_rtp_server_create
(
uint16_t
port
,
int
enable_tcp
,
const
char
*
stream_id
)
{
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
return
nullptr
;
return
nullptr
;
}
}
API_EXPORT
void
API_CALL
mk_rtp_server_release
(
mk_rtp_server
ctx
){
API_EXPORT
void
API_CALL
mk_rtp_server_release
(
mk_rtp_server
ctx
)
{
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
}
}
API_EXPORT
uint16_t
API_CALL
mk_rtp_server_port
(
mk_rtp_server
ctx
){
API_EXPORT
uint16_t
API_CALL
mk_rtp_server_port
(
mk_rtp_server
ctx
)
{
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
return
0
;
return
0
;
}
}
API_EXPORT
void
API_CALL
mk_rtp_server_set_on_detach
(
mk_rtp_server
ctx
,
on_mk_rtp_server_detach
cb
,
void
*
user_data
){
API_EXPORT
void
API_CALL
mk_rtp_server_set_on_detach
(
mk_rtp_server
ctx
,
on_mk_rtp_server_detach
cb
,
void
*
user_data
)
{
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
WarnL
<<
"请打开ENABLE_RTPPROXY后再编译"
;
}
}
...
...
postman/ZLMediaKit.postman_collection.json
查看文件 @
d8530751
...
@@ -1384,9 +1384,9 @@
...
@@ -1384,9 +1384,9 @@
"description"
:
"绑定的端口,0时为随机端口"
"description"
:
"绑定的端口,0时为随机端口"
},
},
{
{
"key"
:
"
enable_tcp
"
,
"key"
:
"
tcp_mode
"
,
"value"
:
"1"
,
"value"
:
"1"
,
"description"
:
"
创建 udp端口时是否同时监听tcp端口
"
"description"
:
"
tcp模式,0时为不启用tcp监听,1时为启用tcp监听,2时为tcp主动连接模式
"
},
},
{
{
"key"
:
"stream_id"
,
"key"
:
"stream_id"
,
...
@@ -1411,6 +1411,47 @@
...
@@ -1411,6 +1411,47 @@
"response"
:
[]
"response"
:
[]
},
},
{
{
"name"
:
"连接RTP服务器(connectRtpServer)"
,
"request"
:
{
"method"
:
"GET"
,
"header"
:
[],
"url"
:
{
"raw"
:
"{{ZLMediaKit_URL}}/index/api/connectRtpServer?secret={{ZLMediaKit_secret}}&dst_url=127.0.0.1&dst_port=10000&stream_id=test"
,
"host"
:
[
"{{ZLMediaKit_URL}}"
],
"path"
:
[
"index"
,
"api"
,
"connectRtpServer"
],
"query"
:
[
{
"key"
:
"secret"
,
"value"
:
"{{ZLMediaKit_secret}}"
,
"description"
:
"api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数"
},
{
"key"
:
"dst_url"
,
"value"
:
"0"
,
"description"
:
"tcp主动模式时服务端地址"
},
{
"key"
:
"dst_port"
,
"value"
:
"1"
,
"description"
:
"tcp主动模式时服务端端口"
},
{
"key"
:
"stream_id"
,
"value"
:
"test"
,
"description"
:
"OpenRtpServer时绑定的流id
\n
"
}
]
}
},
"response"
:
[]
},
{
"name"
:
"关闭RTP服务器(closeRtpServer)"
,
"name"
:
"关闭RTP服务器(closeRtpServer)"
,
"request"
:
{
"request"
:
{
"method"
:
"GET"
,
"method"
:
"GET"
,
...
...
server/WebApi.cpp
查看文件 @
d8530751
...
@@ -383,7 +383,7 @@ Value makeMediaSourceJson(MediaSource &media){
...
@@ -383,7 +383,7 @@ Value makeMediaSourceJson(MediaSource &media){
}
}
#if defined(ENABLE_RTPPROXY)
#if defined(ENABLE_RTPPROXY)
uint16_t
openRtpServer
(
uint16_t
local_port
,
const
string
&
stream_id
,
bool
enable_tcp
,
const
string
&
local_ip
,
bool
re_use_port
,
uint32_t
ssrc
)
{
uint16_t
openRtpServer
(
uint16_t
local_port
,
const
string
&
stream_id
,
int
tcp_mode
,
const
string
&
local_ip
,
bool
re_use_port
,
uint32_t
ssrc
)
{
lock_guard
<
recursive_mutex
>
lck
(
s_rtpServerMapMtx
);
lock_guard
<
recursive_mutex
>
lck
(
s_rtpServerMapMtx
);
if
(
s_rtpServerMap
.
find
(
stream_id
)
!=
s_rtpServerMap
.
end
())
{
if
(
s_rtpServerMap
.
find
(
stream_id
)
!=
s_rtpServerMap
.
end
())
{
//为了防止RtpProcess所有权限混乱的问题,不允许重复添加相同的stream_id
//为了防止RtpProcess所有权限混乱的问题,不允许重复添加相同的stream_id
...
@@ -391,7 +391,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, bool enable
...
@@ -391,7 +391,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, bool enable
}
}
RtpServer
::
Ptr
server
=
std
::
make_shared
<
RtpServer
>
();
RtpServer
::
Ptr
server
=
std
::
make_shared
<
RtpServer
>
();
server
->
start
(
local_port
,
stream_id
,
enable_tcp
,
local_ip
.
c_str
(),
re_use_port
,
ssrc
);
server
->
start
(
local_port
,
stream_id
,
(
RtpServer
::
TcpMode
)
tcp_mode
,
local_ip
.
c_str
(),
re_use_port
,
ssrc
);
server
->
setOnDetach
([
stream_id
]()
{
server
->
setOnDetach
([
stream_id
]()
{
//设置rtp超时移除事件
//设置rtp超时移除事件
lock_guard
<
recursive_mutex
>
lck
(
s_rtpServerMapMtx
);
lock_guard
<
recursive_mutex
>
lck
(
s_rtpServerMapMtx
);
...
@@ -404,6 +404,16 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, bool enable
...
@@ -404,6 +404,16 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, bool enable
return
server
->
getPort
();
return
server
->
getPort
();
}
}
void
connectRtpServer
(
const
string
&
stream_id
,
const
string
&
dst_url
,
uint16_t
dst_port
,
const
function
<
void
(
const
SockException
&
ex
)
>
&
cb
)
{
lock_guard
<
recursive_mutex
>
lck
(
s_rtpServerMapMtx
);
auto
it
=
s_rtpServerMap
.
find
(
stream_id
);
if
(
it
==
s_rtpServerMap
.
end
())
{
cb
(
SockException
(
Err_other
,
"未找到rtp服务"
));
return
;
}
it
->
second
->
connectToServer
(
dst_url
,
dst_port
,
cb
);
}
bool
closeRtpServer
(
const
string
&
stream_id
)
{
bool
closeRtpServer
(
const
string
&
stream_id
)
{
lock_guard
<
recursive_mutex
>
lck
(
s_rtpServerMapMtx
);
lock_guard
<
recursive_mutex
>
lck
(
s_rtpServerMapMtx
);
auto
it
=
s_rtpServerMap
.
find
(
stream_id
);
auto
it
=
s_rtpServerMap
.
find
(
stream_id
);
...
@@ -1135,17 +1145,36 @@ void installWebApi() {
...
@@ -1135,17 +1145,36 @@ void installWebApi() {
api_regist
(
"/index/api/openRtpServer"
,[](
API_ARGS_MAP
){
api_regist
(
"/index/api/openRtpServer"
,[](
API_ARGS_MAP
){
CHECK_SECRET
();
CHECK_SECRET
();
CHECK_ARGS
(
"port"
,
"
enable_tcp"
,
"
stream_id"
);
CHECK_ARGS
(
"port"
,
"stream_id"
);
auto
stream_id
=
allArgs
[
"stream_id"
];
auto
stream_id
=
allArgs
[
"stream_id"
];
auto
port
=
openRtpServer
(
allArgs
[
"port"
],
stream_id
,
allArgs
[
"enable_tcp"
].
as
<
bool
>
(),
"::"
,
auto
tcp_mode
=
allArgs
[
"tcp_mode"
].
as
<
int
>
();
allArgs
[
"re_use_port"
].
as
<
bool
>
(),
allArgs
[
"ssrc"
].
as
<
uint32_t
>
());
if
(
allArgs
[
"enable_tcp"
].
as
<
int
>
()
&&
!
tcp_mode
)
{
if
(
port
==
0
)
{
//兼容老版本请求,新版本去除enable_tcp参数并新增tcp_mode参数
tcp_mode
=
1
;
}
auto
port
=
openRtpServer
(
allArgs
[
"port"
],
stream_id
,
tcp_mode
,
"::"
,
allArgs
[
"re_use_port"
].
as
<
bool
>
(),
allArgs
[
"ssrc"
].
as
<
uint32_t
>
());
if
(
port
==
0
)
{
throw
InvalidArgsException
(
"该stream_id已存在"
);
throw
InvalidArgsException
(
"该stream_id已存在"
);
}
}
//回复json
//回复json
val
[
"port"
]
=
port
;
val
[
"port"
]
=
port
;
});
});
api_regist
(
"/index/api/connectRtpServer"
,
[](
API_ARGS_MAP_ASYNC
)
{
CHECK_SECRET
();
CHECK_ARGS
(
"stream_id"
,
"dst_url"
,
"dst_port"
);
connectRtpServer
(
allArgs
[
"stream_id"
],
allArgs
[
"dst_url"
],
allArgs
[
"dst_port"
],
[
val
,
headerOut
,
invoker
](
const
SockException
&
ex
)
mutable
{
if
(
ex
)
{
val
[
"code"
]
=
API
::
OtherFailed
;
val
[
"msg"
]
=
ex
.
what
();
}
invoker
(
200
,
headerOut
,
val
.
toStyledString
());
});
});
api_regist
(
"/index/api/closeRtpServer"
,[](
API_ARGS_MAP
){
api_regist
(
"/index/api/closeRtpServer"
,[](
API_ARGS_MAP
){
CHECK_SECRET
();
CHECK_SECRET
();
CHECK_ARGS
(
"stream_id"
);
CHECK_ARGS
(
"stream_id"
);
...
...
server/WebApi.h
查看文件 @
d8530751
...
@@ -231,7 +231,8 @@ bool checkArgs(Args &args, const First &first, const KeyTypes &...keys) {
...
@@ -231,7 +231,8 @@ bool checkArgs(Args &args, const First &first, const KeyTypes &...keys) {
void
installWebApi
();
void
installWebApi
();
void
unInstallWebApi
();
void
unInstallWebApi
();
uint16_t
openRtpServer
(
uint16_t
local_port
,
const
std
::
string
&
stream_id
,
bool
enable_tcp
,
const
std
::
string
&
local_ip
,
bool
re_use_port
,
uint32_t
ssrc
);
uint16_t
openRtpServer
(
uint16_t
local_port
,
const
std
::
string
&
stream_id
,
int
tcp_mode
,
const
std
::
string
&
local_ip
,
bool
re_use_port
,
uint32_t
ssrc
);
void
connectRtpServer
(
const
std
::
string
&
stream_id
,
const
std
::
string
&
dst_url
,
uint16_t
dst_port
,
const
std
::
function
<
void
(
const
toolkit
::
SockException
&
ex
)
>
&
cb
);
bool
closeRtpServer
(
const
std
::
string
&
stream_id
);
bool
closeRtpServer
(
const
std
::
string
&
stream_id
);
Json
::
Value
makeMediaSourceJson
(
mediakit
::
MediaSource
&
media
);
Json
::
Value
makeMediaSourceJson
(
mediakit
::
MediaSource
&
media
);
void
getStatisticJson
(
const
std
::
function
<
void
(
Json
::
Value
&
val
)
>
&
cb
);
void
getStatisticJson
(
const
std
::
function
<
void
(
Json
::
Value
&
val
)
>
&
cb
);
...
...
src/Rtp/RtpServer.cpp
查看文件 @
d8530751
...
@@ -22,8 +22,8 @@ namespace mediakit{
...
@@ -22,8 +22,8 @@ namespace mediakit{
RtpServer
::
RtpServer
()
{}
RtpServer
::
RtpServer
()
{}
RtpServer
::~
RtpServer
()
{
RtpServer
::~
RtpServer
()
{
if
(
_on_clearup
)
{
if
(
_on_cleanup
)
{
_on_clea
r
up
();
_on_clea
n
up
();
}
}
}
}
...
@@ -90,7 +90,7 @@ private:
...
@@ -90,7 +90,7 @@ private:
std
::
shared_ptr
<
struct
sockaddr_storage
>
_rtcp_addr
;
std
::
shared_ptr
<
struct
sockaddr_storage
>
_rtcp_addr
;
};
};
void
RtpServer
::
start
(
uint16_t
local_port
,
const
string
&
stream_id
,
bool
enable_tcp
,
const
char
*
local_ip
,
bool
re_use_port
,
uint32_t
ssrc
)
{
void
RtpServer
::
start
(
uint16_t
local_port
,
const
string
&
stream_id
,
TcpMode
tcp_mode
,
const
char
*
local_ip
,
bool
re_use_port
,
uint32_t
ssrc
)
{
//创建udp服务器
//创建udp服务器
Socket
::
Ptr
rtp_socket
=
Socket
::
createSocket
(
nullptr
,
true
);
Socket
::
Ptr
rtp_socket
=
Socket
::
createSocket
(
nullptr
,
true
);
Socket
::
Ptr
rtcp_socket
=
Socket
::
createSocket
(
nullptr
,
true
);
Socket
::
Ptr
rtcp_socket
=
Socket
::
createSocket
(
nullptr
,
true
);
...
@@ -110,13 +110,19 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_
...
@@ -110,13 +110,19 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_
SockUtil
::
setRecvBuf
(
rtp_socket
->
rawFD
(),
4
*
1024
*
1024
);
SockUtil
::
setRecvBuf
(
rtp_socket
->
rawFD
(),
4
*
1024
*
1024
);
TcpServer
::
Ptr
tcp_server
;
TcpServer
::
Ptr
tcp_server
;
if
(
enable_tcp
)
{
_tcp_mode
=
tcp_mode
;
if
(
tcp_mode
==
PASSIVE
||
tcp_mode
==
ACTIVE
)
{
//创建tcp服务器
//创建tcp服务器
tcp_server
=
std
::
make_shared
<
TcpServer
>
(
rtp_socket
->
getPoller
());
tcp_server
=
std
::
make_shared
<
TcpServer
>
(
rtp_socket
->
getPoller
());
(
*
tcp_server
)[
RtpSession
::
kStreamID
]
=
stream_id
;
(
*
tcp_server
)[
RtpSession
::
kStreamID
]
=
stream_id
;
(
*
tcp_server
)[
RtpSession
::
kIsUDP
]
=
0
;
(
*
tcp_server
)[
RtpSession
::
kIsUDP
]
=
0
;
(
*
tcp_server
)[
RtpSession
::
kSSRC
]
=
ssrc
;
(
*
tcp_server
)[
RtpSession
::
kSSRC
]
=
ssrc
;
tcp_server
->
start
<
RtpSession
>
(
rtp_socket
->
get_local_port
(),
local_ip
);
if
(
tcp_mode
==
PASSIVE
)
{
tcp_server
->
start
<
RtpSession
>
(
rtp_socket
->
get_local_port
(),
local_ip
);
}
else
if
(
stream_id
.
empty
())
{
// tcp主动模式时只能一个端口一个流,必须指定流id; 创建TcpServer对象也仅用于传参
throw
std
::
runtime_error
(
StrPrinter
<<
"tcp主动模式时必需指定流id"
);
}
}
}
//创建udp服务器
//创建udp服务器
...
@@ -125,18 +131,21 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_
...
@@ -125,18 +131,21 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_
if
(
!
stream_id
.
empty
())
{
if
(
!
stream_id
.
empty
())
{
//指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流)
//指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流)
process
=
RtpSelector
::
Instance
().
getProcess
(
stream_id
,
true
);
process
=
RtpSelector
::
Instance
().
getProcess
(
stream_id
,
true
);
RtcpHelper
::
Ptr
helper
=
std
::
make_shared
<
RtcpHelper
>
(
std
::
move
(
rtcp_socket
),
process
);
if
(
tcp_mode
!=
ACTIVE
)
{
helper
->
startRtcp
();
RtcpHelper
::
Ptr
helper
=
std
::
make_shared
<
RtcpHelper
>
(
std
::
move
(
rtcp_socket
),
process
);
rtp_socket
->
setOnRead
([
rtp_socket
,
process
,
helper
,
ssrc
](
const
Buffer
::
Ptr
&
buf
,
struct
sockaddr
*
addr
,
int
addr_len
)
{
helper
->
startRtcp
();
RtpHeader
*
header
=
(
RtpHeader
*
)
buf
->
data
();
rtp_socket
->
setOnRead
(
auto
rtp_ssrc
=
ntohl
(
header
->
ssrc
);
[
rtp_socket
,
process
,
helper
,
ssrc
](
const
Buffer
::
Ptr
&
buf
,
struct
sockaddr
*
addr
,
int
addr_len
)
{
if
(
ssrc
&&
rtp_ssrc
!=
ssrc
)
{
RtpHeader
*
header
=
(
RtpHeader
*
)
buf
->
data
();
WarnL
<<
"ssrc不匹配,rtp已丢弃:"
<<
rtp_ssrc
<<
" != "
<<
ssrc
;
auto
rtp_ssrc
=
ntohl
(
header
->
ssrc
);
}
else
{
if
(
ssrc
&&
rtp_ssrc
!=
ssrc
)
{
process
->
inputRtp
(
true
,
rtp_socket
,
buf
->
data
(),
buf
->
size
(),
addr
);
WarnL
<<
"ssrc不匹配,rtp已丢弃:"
<<
rtp_ssrc
<<
" != "
<<
ssrc
;
helper
->
onRecvRtp
(
buf
,
addr
,
addr_len
);
}
else
{
}
process
->
inputRtp
(
true
,
rtp_socket
,
buf
->
data
(),
buf
->
size
(),
addr
);
});
helper
->
onRecvRtp
(
buf
,
addr
,
addr_len
);
}
});
}
}
else
{
}
else
{
#if 1
#if 1
//单端口多线程接收多个流,根据ssrc区分流
//单端口多线程接收多个流,根据ssrc区分流
...
@@ -153,7 +162,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_
...
@@ -153,7 +162,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_
#endif
#endif
}
}
_on_clea
r
up
=
[
rtp_socket
,
process
,
stream_id
]()
{
_on_clea
n
up
=
[
rtp_socket
,
process
,
stream_id
]()
{
if
(
rtp_socket
)
{
if
(
rtp_socket
)
{
//去除循环引用
//去除循环引用
rtp_socket
->
setOnRead
(
nullptr
);
rtp_socket
->
setOnRead
(
nullptr
);
...
@@ -180,5 +189,42 @@ uint16_t RtpServer::getPort() {
...
@@ -180,5 +189,42 @@ uint16_t RtpServer::getPort() {
return
_udp_server
?
_udp_server
->
getPort
()
:
_rtp_socket
->
get_local_port
();
return
_udp_server
?
_udp_server
->
getPort
()
:
_rtp_socket
->
get_local_port
();
}
}
void
RtpServer
::
connectToServer
(
const
std
::
string
&
url
,
uint16_t
port
,
const
function
<
void
(
const
SockException
&
ex
)
>
&
cb
)
{
if
(
_tcp_mode
!=
ACTIVE
||
!
_rtp_socket
)
{
cb
(
SockException
(
Err_other
,
"仅支持tcp主动模式"
));
return
;
}
weak_ptr
<
RtpServer
>
weak_self
=
shared_from_this
();
_rtp_socket
->
connect
(
url
,
port
,
[
url
,
port
,
cb
,
weak_self
](
const
SockException
&
err
)
{
auto
strong_self
=
weak_self
.
lock
();
if
(
!
strong_self
)
{
cb
(
SockException
(
Err_other
,
"服务对象已释放"
));
return
;
}
if
(
err
)
{
WarnL
<<
"连接到服务器 "
<<
url
<<
":"
<<
port
<<
" 失败 "
<<
err
.
what
();
}
else
{
InfoL
<<
"连接到服务器 "
<<
url
<<
":"
<<
port
<<
" 成功"
;
strong_self
->
onConnect
();
}
cb
(
err
);
},
5.0
F
,
"::"
,
_rtp_socket
->
get_local_port
());
}
void
RtpServer
::
onConnect
()
{
auto
rtp_session
=
std
::
make_shared
<
RtpSession
>
(
_rtp_socket
);
rtp_session
->
attachServer
(
*
_tcp_server
);
_rtp_socket
->
setOnRead
([
rtp_session
](
const
Buffer
::
Ptr
&
buf
,
struct
sockaddr
*
addr
,
int
addr_len
)
{
rtp_session
->
onRecv
(
buf
);
});
weak_ptr
<
RtpServer
>
weak_self
=
shared_from_this
();
_rtp_socket
->
setOnErr
([
weak_self
](
const
SockException
&
err
)
{
if
(
auto
strong_self
=
weak_self
.
lock
())
{
strong_self
->
_rtp_socket
->
setOnRead
(
nullptr
);
}
});
}
}
//namespace mediakit
}
//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)
#endif//defined(ENABLE_RTPPROXY)
src/Rtp/RtpServer.h
查看文件 @
d8530751
...
@@ -23,10 +23,11 @@ namespace mediakit{
...
@@ -23,10 +23,11 @@ namespace mediakit{
/**
/**
* RTP服务器,支持UDP/TCP
* RTP服务器,支持UDP/TCP
*/
*/
class
RtpServer
{
class
RtpServer
:
public
std
::
enable_shared_from_this
<
RtpServer
>
{
public
:
public
:
using
Ptr
=
std
::
shared_ptr
<
RtpServer
>
;
using
Ptr
=
std
::
shared_ptr
<
RtpServer
>
;
using
onRecv
=
std
::
function
<
void
(
const
toolkit
::
Buffer
::
Ptr
&
buf
)
>
;
using
onRecv
=
std
::
function
<
void
(
const
toolkit
::
Buffer
::
Ptr
&
buf
)
>
;
enum
TcpMode
{
NONE
=
0
,
PASSIVE
,
ACTIVE
};
RtpServer
();
RtpServer
();
~
RtpServer
();
~
RtpServer
();
...
@@ -35,14 +36,23 @@ public:
...
@@ -35,14 +36,23 @@ public:
* 开启服务器,可能抛异常
* 开启服务器,可能抛异常
* @param local_port 本地端口,0时为随机端口
* @param local_port 本地端口,0时为随机端口
* @param stream_id 流id,置空则使用ssrc
* @param stream_id 流id,置空则使用ssrc
* @param
enable_tcp 是否启用tcp服务器
* @param
tcp_mode tcp服务模式
* @param local_ip 绑定的本地网卡ip
* @param local_ip 绑定的本地网卡ip
* @param re_use_port 是否设置socket为re_use属性
* @param re_use_port 是否设置socket为re_use属性
* @param ssrc 指定的ssrc
*/
*/
void
start
(
uint16_t
local_port
,
const
std
::
string
&
stream_id
=
""
,
bool
enable_tcp
=
true
,
void
start
(
uint16_t
local_port
,
const
std
::
string
&
stream_id
=
""
,
TcpMode
tcp_mode
=
PASSIVE
,
const
char
*
local_ip
=
"::"
,
bool
re_use_port
=
true
,
uint32_t
ssrc
=
0
);
const
char
*
local_ip
=
"::"
,
bool
re_use_port
=
true
,
uint32_t
ssrc
=
0
);
/**
/**
* 连接到tcp服务(tcp主动模式)
* @param url 服务器地址
* @param port 服务器端口
* @param cb 连接服务器是否成功的回调
*/
void
connectToServer
(
const
std
::
string
&
url
,
uint16_t
port
,
const
std
::
function
<
void
(
const
toolkit
::
SockException
&
ex
)
>
&
cb
);
/**
* 获取绑定的本地端口
* 获取绑定的本地端口
*/
*/
uint16_t
getPort
();
uint16_t
getPort
();
...
@@ -52,12 +62,19 @@ public:
...
@@ -52,12 +62,19 @@ public:
*/
*/
void
setOnDetach
(
const
std
::
function
<
void
()
>
&
cb
);
void
setOnDetach
(
const
std
::
function
<
void
()
>
&
cb
);
private
:
// tcp主动模式连接服务器成功回调
void
onConnect
();
protected
:
protected
:
toolkit
::
Socket
::
Ptr
_rtp_socket
;
toolkit
::
Socket
::
Ptr
_rtp_socket
;
toolkit
::
UdpServer
::
Ptr
_udp_server
;
toolkit
::
UdpServer
::
Ptr
_udp_server
;
toolkit
::
TcpServer
::
Ptr
_tcp_server
;
toolkit
::
TcpServer
::
Ptr
_tcp_server
;
RtpProcess
::
Ptr
_rtp_process
;
RtpProcess
::
Ptr
_rtp_process
;
std
::
function
<
void
()
>
_on_clearup
;
std
::
function
<
void
()
>
_on_cleanup
;
//用于tcp主动模式
TcpMode
_tcp_mode
=
NONE
;
};
};
}
//namespace mediakit
}
//namespace mediakit
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论