Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
c367df73
Commit
c367df73
authored
Sep 30, 2017
by
xiongziliang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
整理代码,添加注释
parent
e6bdfbce
显示空白字符变更
内嵌
并排
正在显示
9 个修改的文件
包含
246 行增加
和
140 行删除
+246
-140
src/.DS_Store
+0
-0
src/Rtmp/RtmpPusher.cpp
+13
-6
src/Rtmp/RtmpPusher.h
+2
-0
tests/test_benchmark.cpp
+4
-5
tests/test_httpClient.cpp
+61
-34
tests/test_player.cpp
+4
-7
tests/test_rtmpPusher.cpp
+59
-38
tests/test_rtmpPusherMp4.cpp
+73
-29
tests/test_server.cpp
+30
-21
没有找到文件。
src/.DS_Store
查看文件 @
c367df73
No preview for this file type
src/Rtmp/RtmpPusher.cpp
查看文件 @
c367df73
...
@@ -37,17 +37,24 @@ namespace Rtmp {
...
@@ -37,17 +37,24 @@ namespace Rtmp {
unordered_map
<
string
,
RtmpPusher
::
rtmpCMDHandle
>
RtmpPusher
::
g_mapCmd
;
unordered_map
<
string
,
RtmpPusher
::
rtmpCMDHandle
>
RtmpPusher
::
g_mapCmd
;
RtmpPusher
::
RtmpPusher
(
const
char
*
strApp
,
const
char
*
strStream
)
{
RtmpPusher
::
RtmpPusher
(
const
char
*
strApp
,
const
char
*
strStream
)
{
static
onceToken
token
([]()
{
g_mapCmd
.
emplace
(
"_error"
,
&
RtmpPusher
::
onCmd_result
);
g_mapCmd
.
emplace
(
"_result"
,
&
RtmpPusher
::
onCmd_result
);
g_mapCmd
.
emplace
(
"onStatus"
,
&
RtmpPusher
::
onCmd_onStatus
);
},
[]()
{});
auto
src
=
RtmpMediaSource
::
find
(
strApp
,
strStream
);
auto
src
=
RtmpMediaSource
::
find
(
strApp
,
strStream
);
if
(
!
src
)
{
if
(
!
src
)
{
auto
strErr
=
StrPrinter
<<
"media source:"
<<
strApp
<<
"/"
<<
strStream
<<
"not found!"
<<
endl
;
auto
strErr
=
StrPrinter
<<
"media source:"
<<
strApp
<<
"/"
<<
strStream
<<
"not found!"
<<
endl
;
throw
std
::
runtime_error
(
strErr
);
throw
std
::
runtime_error
(
strErr
);
}
}
m_pMediaSrc
=
src
;
init
(
src
);
}
RtmpPusher
::
RtmpPusher
(
const
RtmpMediaSource
::
Ptr
&
src
){
init
(
src
);
}
void
RtmpPusher
::
init
(
const
RtmpMediaSource
::
Ptr
&
src
){
static
onceToken
token
([]()
{
g_mapCmd
.
emplace
(
"_error"
,
&
RtmpPusher
::
onCmd_result
);
g_mapCmd
.
emplace
(
"_result"
,
&
RtmpPusher
::
onCmd_result
);
g_mapCmd
.
emplace
(
"onStatus"
,
&
RtmpPusher
::
onCmd_onStatus
);
},
[]()
{});
m_pMediaSrc
=
src
;
}
}
RtmpPusher
::~
RtmpPusher
()
{
RtmpPusher
::~
RtmpPusher
()
{
...
...
src/Rtmp/RtmpPusher.h
查看文件 @
c367df73
...
@@ -39,6 +39,7 @@ public:
...
@@ -39,6 +39,7 @@ public:
typedef
std
::
shared_ptr
<
RtmpPusher
>
Ptr
;
typedef
std
::
shared_ptr
<
RtmpPusher
>
Ptr
;
typedef
std
::
function
<
void
(
const
SockException
&
ex
)
>
Event
;
typedef
std
::
function
<
void
(
const
SockException
&
ex
)
>
Event
;
RtmpPusher
(
const
char
*
strApp
,
const
char
*
strStream
);
RtmpPusher
(
const
char
*
strApp
,
const
char
*
strStream
);
RtmpPusher
(
const
RtmpMediaSource
::
Ptr
&
src
);
virtual
~
RtmpPusher
();
virtual
~
RtmpPusher
();
void
publish
(
const
char
*
strUrl
);
void
publish
(
const
char
*
strUrl
);
...
@@ -65,6 +66,7 @@ protected:
...
@@ -65,6 +66,7 @@ protected:
send
(
pcRawData
,
iSize
);
send
(
pcRawData
,
iSize
);
}
}
private
:
private
:
void
init
(
const
RtmpMediaSource
::
Ptr
&
src
);
void
onShutdown
(
const
SockException
&
ex
)
{
void
onShutdown
(
const
SockException
&
ex
)
{
m_pPublishTimer
.
reset
();
m_pPublishTimer
.
reset
();
if
(
m_onShutdown
){
if
(
m_onShutdown
){
...
...
tests/test_benchmark.cpp
查看文件 @
c367df73
...
@@ -42,14 +42,13 @@ using namespace ZL::Rtsp;
...
@@ -42,14 +42,13 @@ using namespace ZL::Rtsp;
using
namespace
ZL
::
Thread
;
using
namespace
ZL
::
Thread
;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
Network
;
void
programExit
(
int
arg
)
{
EventPoller
::
Instance
().
shutdown
();
}
int
main
(
int
argc
,
char
*
argv
[]){
int
main
(
int
argc
,
char
*
argv
[]){
//设置退出信号处理函数
signal
(
SIGINT
,
[](
int
){
EventPoller
::
Instance
().
shutdown
();});
//设置日志
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
//Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
signal
(
SIGINT
,
programExit
);
if
(
argc
!=
5
){
if
(
argc
!=
5
){
FatalL
<<
"
\r\n
测试方法:./test_benchmark player_count play_interval rtxp_url rtp_type
\r\n
"
FatalL
<<
"
\r\n
测试方法:./test_benchmark player_count play_interval rtxp_url rtp_type
\r\n
"
...
...
tests/test_httpClient.cpp
查看文件 @
c367df73
...
@@ -25,14 +25,15 @@
...
@@ -25,14 +25,15 @@
*/
*/
#include <signal.h>
#include <signal.h>
#include <string>
#include <iostream>
#include <iostream>
#include "
Http/HttpDownloader
.h"
#include "
Util/MD5
.h"
#include "
Http/HttpRequester
.h"
#include "
Util/File
.h"
#include "Util/logger.h"
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/onceToken.h"
#include "Util/File.h"
#include "Poller/EventPoller.h"
#include "Poller/EventPoller.h"
#include <list>
#include "Http/HttpRequester.h"
#include "Http/HttpDownloader.h"
using
namespace
std
;
using
namespace
std
;
using
namespace
ZL
::
Util
;
using
namespace
ZL
::
Util
;
...
@@ -40,90 +41,116 @@ using namespace ZL::Http;
...
@@ -40,90 +41,116 @@ using namespace ZL::Http;
using
namespace
ZL
::
Poller
;
using
namespace
ZL
::
Poller
;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
Network
;
void
programExit
(
int
arg
)
{
EventPoller
::
Instance
().
shutdown
();
}
int
main
(
int
argc
,
char
*
argv
[]){
int
main
(
int
argc
,
char
*
argv
[]){
signal
(
SIGINT
,
programExit
);
//设置退出信号处理函数
signal
(
SIGINT
,
[](
int
){
EventPoller
::
Instance
().
shutdown
();});
//设置日志
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
///////////////////////////////http downloader///////////////////////
///////////////////////////////http downloader///////////////////////
list
<
HttpDownloader
::
Ptr
>
downloaderList
;
//下载器map
map
<
string
,
HttpDownloader
::
Ptr
>
downloaderMap
;
//下载两个文件,一个是http下载,一个https下载
auto
urlList
=
{
"http://img3.imgtn.bdimg.com/it/u=158031390,1321729164&fm=214&gp=0.jpg"
,
auto
urlList
=
{
"http://img3.imgtn.bdimg.com/it/u=158031390,1321729164&fm=214&gp=0.jpg"
,
"https://media-cdn.tripadvisor.com/media/photo-s/06/c3/2f/64/de-notre-chambre.jpg"
};
"https://ss0.bdstatic.com/70cFvHSh_Q1YnxGkpoWK1HF6hhy/it/u=931786003,1029770543&fm=27&gp=0.jpg"
};
int
i
=
0
;
for
(
auto
url
:
urlList
){
for
(
auto
&
url
:
urlList
){
//创建下载器
HttpDownloader
::
Ptr
downloader
(
new
HttpDownloader
());
HttpDownloader
::
Ptr
downloader
(
new
HttpDownloader
());
downloader
->
setOnResult
([](
ErrCode
code
,
const
char
*
errMsg
,
const
char
*
filePath
){
downloader
->
setOnResult
([](
ErrCode
code
,
const
char
*
errMsg
,
const
char
*
filePath
){
DebugL
<<
"=====================HttpDownloader result======================="
;
//下载结果回调
if
(
code
==
Err_success
){
if
(
code
==
Err_success
){
//文件下载成功
InfoL
<<
"download file success:"
<<
filePath
;
InfoL
<<
"download file success:"
<<
filePath
;
}
else
{
}
else
{
//下载失败
WarnL
<<
"code:"
<<
code
<<
" msg:"
<<
errMsg
;
WarnL
<<
"code:"
<<
code
<<
" msg:"
<<
errMsg
;
}
}
});
});
//断点续传功能,开启后可能会遇到416的错误(因为上次文件已经下载完全)
//断点续传功能,开启后可能会遇到416的错误(因为上次文件已经下载完全)
downloader
->
startDownload
(
url
,
exeDir
()
+
to_string
(
i
++
)
+
".jpg"
,
true
);
downloader
->
startDownload
(
url
,
exeDir
()
+
MD5
(
url
).
hexdigest
()
+
".jpg"
,
true
);
downloaderList
.
push_back
(
downloader
);
//下载器必须被强引用,否则作用域一失效就会导致对象销毁
downloaderMap
.
emplace
(
url
,
downloader
);
}
}
///////////////////////////////http get///////////////////////
///////////////////////////////http get///////////////////////
//创建一个Http请求器
HttpRequester
::
Ptr
requesterGet
(
new
HttpRequester
());
HttpRequester
::
Ptr
requesterGet
(
new
HttpRequester
());
//使用GET方式请求
requesterGet
->
setMethod
(
"GET"
);
requesterGet
->
setMethod
(
"GET"
);
//设置http
头,我们假设设置cookie
//设置http
请求头,我们假设设置cookie,当然你也可以设置其他http头
requesterGet
->
addHeader
(
"Cookie"
,
"SESSIONID=e1aa89b3-f79f-4ac6-8ae2-0cea9ae8e2d7"
);
requesterGet
->
addHeader
(
"Cookie"
,
"SESSIONID=e1aa89b3-f79f-4ac6-8ae2-0cea9ae8e2d7"
);
requesterGet
->
startRequester
(
"http://pv.sohu.com/cityjson?ie=utf-8"
,
//开启请求,该api会返回当前主机外网ip等信息
[](
const
SockException
&
ex
,
requesterGet
->
startRequester
(
"http://pv.sohu.com/cityjson?ie=utf-8"
,
//url地址
const
string
&
status
,
[](
const
SockException
&
ex
,
//网络相关的失败信息,如果为空就代表成功
const
HttpClient
::
HttpHeader
&
header
,
const
string
&
status
,
//http回复的状态码,比如说200/404
const
string
&
strRecvBody
){
const
HttpClient
::
HttpHeader
&
header
,
//http回复头
const
string
&
strRecvBody
){
//http回复body
DebugL
<<
"=====================HttpRequester GET==========================="
;
if
(
ex
){
if
(
ex
){
//网络相关的错误
WarnL
<<
"network err:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
WarnL
<<
"network err:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
}
else
{
}
else
{
//打印http回复信息
_StrPrinter
printer
;
_StrPrinter
printer
;
for
(
auto
&
pr
:
header
){
for
(
auto
&
pr
:
header
){
printer
<<
pr
.
first
<<
":"
<<
pr
.
second
<<
"
\r\n
"
;
printer
<<
pr
.
first
<<
":"
<<
pr
.
second
<<
"
\r\n
"
;
}
}
InfoL
<<
"
\r\n
http status:"
<<
status
<<
"
\r\n
\r\n
"
InfoL
<<
"
status:"
<<
status
<<
"
\r\n
"
<<
"header:
"
<<
(
printer
<<
endl
)
<<
"header:
\r\n
"
<<
(
printer
<<
endl
)
<<
"
\r\n
body:"
<<
strRecvBody
;
<<
"
\r\n
body:"
<<
strRecvBody
;
}
}
});
});
///////////////////////////////http post///////////////////////
///////////////////////////////http post///////////////////////
//创建一个Http请求器
HttpRequester
::
Ptr
requesterPost
(
new
HttpRequester
());
HttpRequester
::
Ptr
requesterPost
(
new
HttpRequester
());
//使用POST方式请求
requesterPost
->
setMethod
(
"POST"
);
requesterPost
->
setMethod
(
"POST"
);
//设置http请求头
requesterPost
->
addHeader
(
"X-Requested-With"
,
"XMLHttpRequest"
);
requesterPost
->
addHeader
(
"Origin"
,
"http://fanyi.baidu.com"
);
//设置POST参数列表
HttpArgs
args
;
HttpArgs
args
;
args
[
"query"
]
=
"test"
;
args
[
"query"
]
=
"test"
;
args
[
"from"
]
=
"en"
;
args
[
"from"
]
=
"en"
;
args
[
"to"
]
=
"zh"
;
args
[
"to"
]
=
"zh"
;
args
[
"transtype"
]
=
"translang"
;
args
[
"transtype"
]
=
"translang"
;
args
[
"simple_means_flag"
]
=
"3"
;
args
[
"simple_means_flag"
]
=
"3"
;
requesterPost
->
addHeader
(
"X-Requested-With"
,
"XMLHttpRequest"
);
requesterPost
->
addHeader
(
"Origin"
,
"http://fanyi.baidu.com"
);
requesterPost
->
setBody
(
args
.
make
());
requesterPost
->
setBody
(
args
.
make
());
requesterPost
->
startRequester
(
"http://fanyi.baidu.com/langdetect"
,
[](
const
SockException
&
ex
,
//开启请求
const
string
&
status
,
requesterPost
->
startRequester
(
"http://fanyi.baidu.com/langdetect"
,
//url地址
const
HttpClient
::
HttpHeader
&
header
,
[](
const
SockException
&
ex
,
//网络相关的失败信息,如果为空就代表成功
const
string
&
strRecvBody
){
const
string
&
status
,
//http回复的状态码,比如说200/404
const
HttpClient
::
HttpHeader
&
header
,
//http回复头
const
string
&
strRecvBody
){
//http回复body
DebugL
<<
"=====================HttpRequester POST=========================="
;
if
(
ex
){
if
(
ex
){
//网络相关的错误
WarnL
<<
"network err:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
WarnL
<<
"network err:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
}
else
{
}
else
{
//打印http回复信息
_StrPrinter
printer
;
_StrPrinter
printer
;
for
(
auto
&
pr
:
header
){
for
(
auto
&
pr
:
header
){
printer
<<
pr
.
first
<<
":"
<<
pr
.
second
<<
"
\r\n
"
;
printer
<<
pr
.
first
<<
":"
<<
pr
.
second
<<
"
\r\n
"
;
}
}
InfoL
<<
"
\r\n
http status:"
<<
status
<<
"
\r\n
\r\n
"
InfoL
<<
"
status:"
<<
status
<<
"
\r\n
"
<<
"header:
"
<<
(
printer
<<
endl
)
<<
"header:
\r\n
"
<<
(
printer
<<
endl
)
<<
"
\r\n
body:"
<<
strRecvBody
;
<<
"
\r\n
body:"
<<
strRecvBody
;
}
}
});
});
//事件轮询
EventPoller
::
Instance
().
runLoop
();
EventPoller
::
Instance
().
runLoop
();
static
onceToken
token
(
nullptr
,[](){
//清空下载器
downloaderMap
.
clear
();
requesterGet
.
reset
();
requesterPost
.
reset
();
//程序开始退出
EventPoller
::
Destory
();
EventPoller
::
Destory
();
Logger
::
Destory
();
Logger
::
Destory
();
});
return
0
;
return
0
;
}
}
tests/test_player.cpp
查看文件 @
c367df73
...
@@ -44,15 +44,12 @@ using namespace ZL::Network;
...
@@ -44,15 +44,12 @@ using namespace ZL::Network;
using
namespace
ZL
::
Rtsp
;
using
namespace
ZL
::
Rtsp
;
using
namespace
ZL
::
Player
;
using
namespace
ZL
::
Player
;
void
programExit
(
int
arg
)
{
EventPoller
::
Instance
().
shutdown
();
}
int
main
(
int
argc
,
char
*
argv
[]){
int
main
(
int
argc
,
char
*
argv
[]){
//设置退出信号处理函数
signal
(
SIGINT
,
[](
int
){
EventPoller
::
Instance
().
shutdown
();});
//设置日志
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
//Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
signal
(
SIGINT
,
programExit
);
if
(
argc
!=
3
){
if
(
argc
!=
3
){
FatalL
<<
"
\r\n
测试方法:./test_player rtxp_url rtp_type
\r\n
"
FatalL
<<
"
\r\n
测试方法:./test_player rtxp_url rtp_type
\r\n
"
...
...
tests/test_rtmpPusher.cpp
查看文件 @
c367df73
...
@@ -27,7 +27,6 @@
...
@@ -27,7 +27,6 @@
#include <signal.h>
#include <signal.h>
#include <iostream>
#include <iostream>
#include "Util/logger.h"
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/NoticeCenter.h"
#include "Util/NoticeCenter.h"
#include "Poller/EventPoller.h"
#include "Poller/EventPoller.h"
#include "Device/PlayerProxy.h"
#include "Device/PlayerProxy.h"
...
@@ -41,57 +40,79 @@ using namespace ZL::Thread;
...
@@ -41,57 +40,79 @@ using namespace ZL::Thread;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
DEV
;
using
namespace
ZL
::
DEV
;
//推流器,保持强引用
RtmpPusher
::
Ptr
pusher
;
//声明函数
void
rePushDelay
(
const
string
&
app
,
const
string
&
stream
,
const
string
&
url
);
//创建推流器并开始推流
void
createPusher
(
const
string
&
app
,
const
string
&
stream
,
const
string
&
url
){
//创建推流器并绑定一个RtmpMediaSource
pusher
.
reset
(
new
RtmpPusher
(
app
.
data
(),
stream
.
data
()));
//设置推流中断处理逻辑
pusher
->
setOnShutdown
([
app
,
stream
,
url
](
const
SockException
&
ex
)
{
WarnL
<<
"Server connection is closed:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
//重试
rePushDelay
(
app
,
stream
,
url
);
});
//设置发布结果处理逻辑
pusher
->
setOnPublished
([
app
,
stream
,
url
](
const
SockException
&
ex
)
{
if
(
ex
)
{
WarnL
<<
"Publish fail:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
//如果发布失败,就重试
rePushDelay
(
app
,
stream
,
url
);
}
else
{
InfoL
<<
"Publish success,Please play with player:"
<<
url
;
}
});
pusher
->
publish
(
url
.
data
());
}
//推流失败或断开延迟2秒后重试推流
void
rePushDelay
(
const
string
&
app
,
const
string
&
stream
,
const
string
&
url
){
//上次延时两秒的任务可能还没执行,所以我们要先取消上次任务
AsyncTaskThread
::
Instance
().
CancelTask
(
0
);
//2秒后执行重新推流的任务
AsyncTaskThread
::
Instance
().
DoTaskDelay
(
0
,
2000
,
[
app
,
stream
,
url
]()
{
InfoL
<<
"Re-Publishing..."
;
//重新推流
createPusher
(
app
,
stream
,
url
);
//此任务不重复
return
false
;
});
}
//这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了
int
domain
(
int
argc
,
const
char
*
argv
[])
{
int
domain
(
int
argc
,
const
char
*
argv
[])
{
//设置退出信号处理函数
signal
(
SIGINT
,
[](
int
){
EventPoller
::
Instance
().
shutdown
();});
signal
(
SIGINT
,
[](
int
){
EventPoller
::
Instance
().
shutdown
();});
//设置日志
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
string
playUrl
=
argv
[
1
];
string
pushUrl
=
argv
[
2
];
PlayerProxy
::
Ptr
player
(
new
PlayerProxy
(
"app"
,
"stream"
));
//拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream"
//拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream"
//你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请研读MediaReader代码)
//你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码)
player
->
play
(
argv
[
1
]);
PlayerProxy
::
Ptr
player
(
new
PlayerProxy
(
"app"
,
"stream"
));
player
->
play
(
playUrl
.
data
());
RtmpPusher
::
Ptr
pusher
;
//监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发
//监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发。
NoticeCenter
::
Instance
().
addListener
(
nullptr
,
Config
::
Broadcast
::
kBroadcastRtmpSrcRegisted
,
[
pushUrl
](
BroadcastRtmpSrcRegistedArgs
)
{
NoticeCenter
::
Instance
().
addListener
(
nullptr
,
Config
::
Broadcast
::
kBroadcastRtmpSrcRegisted
,
[
&
pusher
,
argv
](
BroadcastRtmpSrcRegistedArgs
)
{
//媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源
//媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源
const_cast
<
RtmpPusher
::
Ptr
&>
(
pusher
).
reset
(
new
RtmpPusher
(
app
,
stream
));
createPusher
(
app
,
stream
,
pushUrl
);
string
appTmp
(
app
),
streamTmp
(
stream
);
pusher
->
setOnShutdown
([
appTmp
,
streamTmp
,
argv
](
const
SockException
&
ex
)
{
WarnL
<<
"已断开与服务器连接(Server connection is closed):"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
AsyncTaskThread
::
Instance
().
CancelTask
(
0
);
AsyncTaskThread
::
Instance
().
DoTaskDelay
(
0
,
2000
,
[
appTmp
,
streamTmp
,
argv
]()
{
InfoL
<<
"正在重新发布(Re-Publish Steam)..."
;
NoticeCenter
::
Instance
().
emitEvent
(
Config
::
Broadcast
::
kBroadcastRtmpSrcRegisted
,
appTmp
.
data
(),
streamTmp
.
data
());
return
false
;
});
});
pusher
->
setOnPublished
([
appTmp
,
streamTmp
,
argv
](
const
SockException
&
ex
)
{
if
(
ex
)
{
WarnL
<<
"发布失败(Publish fail):"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
AsyncTaskThread
::
Instance
().
CancelTask
(
0
);
AsyncTaskThread
::
Instance
().
DoTaskDelay
(
0
,
2000
,
[
appTmp
,
streamTmp
,
argv
]()
{
InfoL
<<
"正在重新发布(Re-Publish Steam)..."
;
NoticeCenter
::
Instance
().
emitEvent
(
Config
::
Broadcast
::
kBroadcastRtmpSrcRegisted
,
appTmp
.
data
(),
streamTmp
.
data
());
return
false
;
});
}
else
{
InfoL
<<
"发布成功,请用播放器打开(Publish success,Please use play with player):"
<<
argv
[
2
];
}
});
//开始推流
pusher
->
publish
(
argv
[
2
]);
});
});
//事件轮询
EventPoller
::
Instance
().
runLoop
();
EventPoller
::
Instance
().
runLoop
();
//删除事件监听
NoticeCenter
::
Instance
().
delListener
(
nullptr
);
NoticeCenter
::
Instance
().
delListener
(
nullptr
);
//销毁代理播放器、推流器
player
.
reset
();
player
.
reset
();
pusher
.
reset
();
pusher
.
reset
();
//清理程序
EventPoller
::
Destory
();
EventPoller
::
Destory
();
Logger
::
Destory
();
Logger
::
Destory
();
return
0
;
return
0
;
...
@@ -101,7 +122,7 @@ int domain(int argc, const char *argv[]) {
...
@@ -101,7 +122,7 @@ int domain(int argc, const char *argv[]) {
int
main
(
int
argc
,
char
*
argv
[]){
int
main
(
int
argc
,
char
*
argv
[]){
const
char
*
argList
[]
=
{
argv
[
0
],
"rtmp://live.hkstv.hk.lxdns.com/live/hks"
,
"rtmp://jizan.iok.la/live/test"
};
const
char
*
argList
[]
=
{
argv
[
0
],
"rtmp://live.hkstv.hk.lxdns.com/live/hks"
,
"rtmp://jizan.iok.la/live/test"
};
return
domain
(
argc
,
argList
);
return
domain
(
3
,
argList
);
}
}
...
...
tests/test_rtmpPusherMp4.cpp
查看文件 @
c367df73
...
@@ -27,7 +27,6 @@
...
@@ -27,7 +27,6 @@
#include <signal.h>
#include <signal.h>
#include <iostream>
#include <iostream>
#include "Util/logger.h"
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/NoticeCenter.h"
#include "Util/NoticeCenter.h"
#include "Poller/EventPoller.h"
#include "Poller/EventPoller.h"
#include "Device/PlayerProxy.h"
#include "Device/PlayerProxy.h"
...
@@ -42,48 +41,85 @@ using namespace ZL::Thread;
...
@@ -42,48 +41,85 @@ using namespace ZL::Thread;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
DEV
;
using
namespace
ZL
::
DEV
;
void
programExit
(
int
arg
)
{
//推流器,保持强引用
EventPoller
::
Instance
().
shutdown
();
RtmpPusher
::
Ptr
pusher
;
}
int
main
(
int
argc
,
char
*
argv
[]){
setExePath
(
argv
[
0
]);
signal
(
SIGINT
,
programExit
);
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
RtmpPusher
::
Ptr
pusher
;
//声明函数
//监听RtmpMediaSource注册事件,在流媒体化MP4文件后触发。
void
rePushDelay
(
const
string
&
app
,
const
string
&
stream
,
const
string
&
url
);
NoticeCenter
::
Instance
().
addListener
(
nullptr
,
Config
::
Broadcast
::
kBroadcastRtmpSrcRegisted
,
void
createPusher
(
const
string
&
app
,
const
string
&
stream
,
const
string
&
url
);
[
&
pusher
](
BroadcastRtmpSrcRegistedArgs
){
//媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源
const_cast
<
RtmpPusher
::
Ptr
&>
(
pusher
).
reset
(
new
RtmpPusher
(
app
,
stream
));
pusher
->
setOnShutdown
([](
const
SockException
&
ex
){
WarnL
<<
"已断开与服务器连接:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
});
pusher
->
setOnPublished
([](
const
SockException
&
ex
){
//创建推流器并开始推流
if
(
ex
){
void
createPusher
(
const
string
&
app
,
const
string
&
stream
,
const
string
&
url
){
WarnL
<<
"发布失败:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
auto
rtmpSrc
=
MediaReader
::
onMakeRtmp
(
app
,
stream
);
}
else
{
if
(
!
rtmpSrc
){
InfoL
<<
"发布成功,请用播放器打开:rtmp://jizan.iok.la/live/test"
;
//文件不存在
WarnL
<<
"MP4 file not exited!"
;
return
;
}
//创建推流器并绑定一个RtmpMediaSource
pusher
.
reset
(
new
RtmpPusher
(
rtmpSrc
));
//设置推流中断处理逻辑
pusher
->
setOnShutdown
([
app
,
stream
,
url
](
const
SockException
&
ex
)
{
WarnL
<<
"Server connection is closed:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
//重新推流
rePushDelay
(
app
,
stream
,
url
);
});
//设置发布结果处理逻辑
pusher
->
setOnPublished
([
app
,
stream
,
url
](
const
SockException
&
ex
)
{
if
(
ex
)
{
WarnL
<<
"Publish fail:"
<<
ex
.
getErrCode
()
<<
" "
<<
ex
.
what
();
//如果发布失败,就重试
rePushDelay
(
app
,
stream
,
url
);
}
else
{
InfoL
<<
"Publish success,Please play with player:"
<<
url
;
}
}
});
});
pusher
->
publish
(
url
.
data
());
}
//推流地址,请改成你自己的服务器。
//推流失败或断开延迟2秒后重试推流
//这个范例地址(也是基于mediakit)是可用的,但是带宽只有1mb,访问可能很卡顿。
void
rePushDelay
(
const
string
&
app
,
const
string
&
stream
,
const
string
&
url
){
InfoL
<<
"start publish rtmp!"
;
//上次延时两秒的任务可能还没执行,所以我们要先取消上次任务
pusher
->
publish
(
"rtmp://jizan.iok.la/live/test"
);
AsyncTaskThread
::
Instance
().
CancelTask
(
0
);
//2秒后执行重新推流的任务
AsyncTaskThread
::
Instance
().
DoTaskDelay
(
0
,
2000
,
[
app
,
stream
,
url
]()
{
InfoL
<<
"Re-Publishing..."
;
//重新推流
createPusher
(
app
,
stream
,
url
);
//此任务不重复
return
false
;
});
});
}
//这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了
int
domain
(
int
argc
,
const
char
*
argv
[]){
//设置退出信号处理函数
signal
(
SIGINT
,
[](
int
){
EventPoller
::
Instance
().
shutdown
();});
//设置日志
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
//filePath同时也是流id
string
filePath
=
argv
[
1
];
//推流地址
string
pushUrl
=
argv
[
2
];
//流媒体化MP4文件,该文件需要放置在 httpRoot/record目录下
//录像应用名称默认为record
//app必须为“record”,stream为相对于httpRoot/record的路径
string
appName
=
mINI
::
Instance
()[
Config
::
Record
::
kAppName
];
MediaReader
::
onMakeRtmp
(
"record"
,
"live/0/2017-08-22/10-08-44.mp4"
);
//app必须record,filePath(流id)为相对于httpRoot/record的路径,否则MediaReader会找到不该文件
//限制app为record是为了防止服务器上的文件被肆意访问
createPusher
(
appName
,
filePath
,
pushUrl
);
//开始事件轮询
EventPoller
::
Instance
().
runLoop
();
EventPoller
::
Instance
().
runLoop
();
//删除事件监听
NoticeCenter
::
Instance
().
delListener
(
nullptr
);
NoticeCenter
::
Instance
().
delListener
(
nullptr
);
//销毁推流器
pusher
.
reset
();
pusher
.
reset
();
//程序清理
EventPoller
::
Destory
();
EventPoller
::
Destory
();
Logger
::
Destory
();
Logger
::
Destory
();
return
0
;
return
0
;
...
@@ -91,6 +127,14 @@ int main(int argc,char *argv[]){
...
@@ -91,6 +127,14 @@ int main(int argc,char *argv[]){
int
main
(
int
argc
,
char
*
argv
[]){
//MP4文件需要放置在 httpRoot/record目录下,文件负载必须为h264+aac
//可以使用test_server生成的mp4文件
const
char
*
argList
[]
=
{
argv
[
0
],
"app/stream/2017-09-30/12-55-38.mp4"
,
"rtmp://jizan.iok.la/live/test"
};
return
domain
(
3
,
argList
);
}
tests/test_server.cpp
查看文件 @
c367df73
...
@@ -48,6 +48,7 @@
...
@@ -48,6 +48,7 @@
#include "Device/PlayerProxy.h"
#include "Device/PlayerProxy.h"
using
namespace
std
;
using
namespace
std
;
using
namespace
ZL
::
DEV
;
using
namespace
ZL
::
Util
;
using
namespace
ZL
::
Util
;
using
namespace
ZL
::
Http
;
using
namespace
ZL
::
Http
;
using
namespace
ZL
::
Rtsp
;
using
namespace
ZL
::
Rtsp
;
...
@@ -55,37 +56,39 @@ using namespace ZL::Rtmp;
...
@@ -55,37 +56,39 @@ using namespace ZL::Rtmp;
using
namespace
ZL
::
Shell
;
using
namespace
ZL
::
Shell
;
using
namespace
ZL
::
Thread
;
using
namespace
ZL
::
Thread
;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
Network
;
using
namespace
ZL
::
DEV
;
void
programExit
(
int
arg
)
{
EventPoller
::
Instance
().
shutdown
();
}
int
main
(
int
argc
,
char
*
argv
[]){
int
main
(
int
argc
,
char
*
argv
[]){
setExePath
(
argv
[
0
]);
//设置退出信号处理函数
signal
(
SIGINT
,
programExit
);
signal
(
SIGINT
,
[](
int
){
EventPoller
::
Instance
().
shutdown
();});
//设置日志
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
(
"stdout"
,
LTrace
));
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
//加载配置文件,如果配置文件不存在就创建一个
Config
::
loadIniConfig
();
Config
::
loadIniConfig
();
DebugL
<<
exePath
();
//这里是拉流地址,支持rtmp/rtsp协议,负载必须是H264+AAC
//support rtmp and rtsp url
//如果是其他不识别的音视频将会被忽略(譬如说h264+adpcm转发后会去除音频)
//just support H264+AAC
auto
urlList
=
{
"rtmp://live.hkstv.hk.lxdns.com/live/hks"
,
auto
urlList
=
{
"rtmp://live.hkstv.hk.lxdns.com/live/hks"
,
//rtsp链接支持输入用户名密码
"rtsp://admin:jzan123456@192.168.0.122/"
};
"rtsp://admin:jzan123456@192.168.0.122/"
};
map
<
string
,
PlayerProxy
::
Ptr
>
proxyMap
;
map
<
string
,
PlayerProxy
::
Ptr
>
proxyMap
;
int
i
=
0
;
int
i
=
0
;
for
(
auto
url
:
urlList
){
for
(
auto
&
url
:
urlList
){
//PlayerProxy构造函数前两个参数分别为应用名(app),流id(streamId)
//PlayerProxy构造函数前两个参数分别为应用名(app),流id(streamId)
//比如说应用为live,流id为0,那么直播地址为:
//比如说应用为live,流id为0,那么直播地址为:
//http://127.0.0.1/live/0/hls.m3u8
//http://127.0.0.1/live/0/hls.m3u8
//rtsp://127.0.0.1/live/0
//rtsp://127.0.0.1/live/0
//rtmp://127.0.0.1/live/0
//rtmp://127.0.0.1/live/0
//录像地址为:
//录像地址为
(当然vlc不支持这么多级的rtmp url,可以用test_player测试rtmp点播)
:
//http://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4
//http://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4
//rtsp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4
//rtsp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4
//rtmp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4
//rtmp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4
PlayerProxy
::
Ptr
player
(
new
PlayerProxy
(
"live"
,
to_string
(
i
++
).
data
()));
PlayerProxy
::
Ptr
player
(
new
PlayerProxy
(
"live"
,
to_string
(
i
++
).
data
()));
(
*
player
)[
PlayerProxy
::
kAliveSecond
]
=
10
;
//录制10秒
//指定RTP over TCP(播放rtsp时有效)
(
*
player
)[
RtspPlayer
::
kRtpType
]
=
PlayerBase
::
RTP_TCP
;
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,重试次数在配置文件中配置,默认一直重试
player
->
play
(
url
);
player
->
play
(
url
);
proxyMap
.
emplace
(
string
(
url
),
player
);
//需要保存PlayerProxy,否则作用域结束就会销毁该对象
proxyMap
.
emplace
(
url
,
player
);
}
}
#ifdef ENABLE_OPENSSL
#ifdef ENABLE_OPENSSL
...
@@ -94,31 +97,35 @@ int main(int argc,char *argv[]){
...
@@ -94,31 +97,35 @@ int main(int argc,char *argv[]){
SSL_Initor
::
Instance
().
loadServerPem
((
exePath
()
+
".pem"
).
data
());
SSL_Initor
::
Instance
().
loadServerPem
((
exePath
()
+
".pem"
).
data
());
}
catch
(...){
}
catch
(...){
FatalL
<<
"请把证书:"
<<
(
exeName
()
+
".pem"
)
<<
"放置在本程序可执行程序同目录下:"
<<
exeDir
()
<<
endl
;
FatalL
<<
"请把证书:"
<<
(
exeName
()
+
".pem"
)
<<
"放置在本程序可执行程序同目录下:"
<<
exeDir
()
<<
endl
;
proxyMap
.
clear
();
return
0
;
return
0
;
}
}
#endif //ENABLE_OPENSSL
#endif //ENABLE_OPENSSL
//简单的telnet服务器,可用于服务器调试,但是不能使用23端口
//简单的telnet服务器,可用于服务器调试,但是不能使用23端口
,否则telnet上了莫名其妙的现象
//测试方法:telnet 127.0.0.1 8023
//测试方法:telnet 127.0.0.1 8023
//输入用户名和密码登录(user:test,pwd:123456),输入help命令查看帮助
//输入用户名和密码登录(user:test,pwd:123456),输入help命令查看帮助
TcpServer
<
ShellSession
>::
Ptr
shellSrv
(
new
TcpServer
<
ShellSession
>
());
TcpServer
<
ShellSession
>::
Ptr
shellSrv
(
new
TcpServer
<
ShellSession
>
());
ShellSession
::
addUser
(
"test"
,
"123456"
);
shellSrv
->
start
(
8023
);
//开启rtsp/rtmp/http服务器
TcpServer
<
RtspSession
>::
Ptr
rtspSrv
(
new
TcpServer
<
RtspSession
>
());
TcpServer
<
RtspSession
>::
Ptr
rtspSrv
(
new
TcpServer
<
RtspSession
>
());
TcpServer
<
RtmpSession
>::
Ptr
rtmpSrv
(
new
TcpServer
<
RtmpSession
>
());
TcpServer
<
RtmpSession
>::
Ptr
rtmpSrv
(
new
TcpServer
<
RtmpSession
>
());
TcpServer
<
HttpSession
>::
Ptr
httpSrv
(
new
TcpServer
<
HttpSession
>
());
TcpServer
<
HttpSession
>::
Ptr
httpSrv
(
new
TcpServer
<
HttpSession
>
());
rtspSrv
->
start
(
mINI
::
Instance
()[
Config
::
Rtsp
::
kPort
]);
//默认554
ShellSession
::
addUser
(
"test"
,
"123456"
);
rtmpSrv
->
start
(
mINI
::
Instance
()[
Config
::
Rtmp
::
kPort
]);
//默认1935
shellSrv
->
start
(
8023
);
httpSrv
->
start
(
mINI
::
Instance
()[
Config
::
Http
::
kPort
]);
//默认80
rtspSrv
->
start
(
mINI
::
Instance
()[
Config
::
Rtsp
::
kPort
]);
rtmpSrv
->
start
(
mINI
::
Instance
()[
Config
::
Rtmp
::
kPort
]);
httpSrv
->
start
(
mINI
::
Instance
()[
Config
::
Http
::
kPort
]);
#ifdef ENABLE_OPENSSL
#ifdef ENABLE_OPENSSL
//如果支持ssl,还可以开启https服务器
TcpServer
<
HttpsSession
>::
Ptr
httpsSrv
(
new
TcpServer
<
HttpsSession
>
());
TcpServer
<
HttpsSession
>::
Ptr
httpsSrv
(
new
TcpServer
<
HttpsSession
>
());
httpsSrv
->
start
(
mINI
::
Instance
()[
Config
::
Http
::
kSSLPort
]);
httpsSrv
->
start
(
mINI
::
Instance
()[
Config
::
Http
::
kSSLPort
]);
//默认443
#endif //ENABLE_OPENSSL
#endif //ENABLE_OPENSSL
EventPoller
::
Instance
().
runLoop
();
EventPoller
::
Instance
().
runLoop
();
//销毁拉流客户端
proxyMap
.
clear
();
proxyMap
.
clear
();
//销毁服务器
shellSrv
.
reset
();
shellSrv
.
reset
();
rtspSrv
.
reset
();
rtspSrv
.
reset
();
rtmpSrv
.
reset
();
rtmpSrv
.
reset
();
...
@@ -128,7 +135,9 @@ int main(int argc,char *argv[]){
...
@@ -128,7 +135,9 @@ int main(int argc,char *argv[]){
httpsSrv
.
reset
();
httpsSrv
.
reset
();
#endif //ENABLE_OPENSSL
#endif //ENABLE_OPENSSL
//rtsp服务器用到udp端口分配器了
UDPServer
::
Destory
();
UDPServer
::
Destory
();
//TcpServer用到了WorkThreadPool
WorkThreadPool
::
Destory
();
WorkThreadPool
::
Destory
();
EventPoller
::
Destory
();
EventPoller
::
Destory
();
Logger
::
Destory
();
Logger
::
Destory
();
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论