Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
c5037493
Commit
c5037493
authored
Jun 06, 2019
by
xiongziliang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
添加FFmpeg拉流功能
parent
ecc133d6
隐藏空白字符变更
内嵌
并排
正在显示
5 个修改的文件
包含
375 行增加
和
20 行删除
+375
-20
server/FFmpegSource.cpp
+212
-0
server/FFmpegSource.h
+47
-0
server/WebApi.cpp
+94
-20
server/WebApi.h
+21
-0
server/main.cpp
+1
-0
没有找到文件。
server/FFmpegSource.cpp
0 → 100644
查看文件 @
c5037493
//
// Created by xzl on 2018/5/24.
//
#include "FFmpegSource.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Util/File.h"
#include "System.h"
namespace
FFmpeg
{
#define FFmpeg_FIELD "ffmpeg."
const
char
kBin
[]
=
FFmpeg_FIELD
"bin"
;
const
char
kCmd
[]
=
FFmpeg_FIELD
"cmd"
;
const
char
kLog
[]
=
FFmpeg_FIELD
"log"
;
onceToken
token
([]()
{
mINI
::
Instance
()[
kBin
]
=
trim
(
System
::
execute
(
"which ffmpeg"
));
mINI
::
Instance
()[
kCmd
]
=
"%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s"
;
mINI
::
Instance
()[
kLog
]
=
exeDir
()
+
"ffmpeg/ffmpeg.log"
;
});
}
FFmpegSource
::
FFmpegSource
()
{
_poller
=
EventPollerPool
::
Instance
().
getPoller
();
}
FFmpegSource
::~
FFmpegSource
()
{
NoticeCenter
::
Instance
().
delListener
(
this
,
Broadcast
::
kBroadcastStreamNoneReader
);
DebugL
;
}
void
FFmpegSource
::
play
(
const
string
&
src_url
,
const
string
&
dst_url
,
int
timeout_ms
,
const
onPlay
&
cb
)
{
GET_CONFIG_AND_REGISTER
(
string
,
ffmpeg_bin
,
FFmpeg
::
kBin
);
GET_CONFIG_AND_REGISTER
(
string
,
ffmpeg_cmd
,
FFmpeg
::
kCmd
);
GET_CONFIG_AND_REGISTER
(
string
,
ffmpeg_log
,
FFmpeg
::
kLog
);
_src_url
=
src_url
;
_dst_url
=
dst_url
;
_media_info
.
parse
(
dst_url
);
char
cmd
[
1024
]
=
{
0
};
snprintf
(
cmd
,
sizeof
(
cmd
),
ffmpeg_cmd
.
data
(),
ffmpeg_bin
.
data
(),
src_url
.
data
(),
dst_url
.
data
());
_process
.
run
(
cmd
,
ffmpeg_log
);
InfoL
<<
cmd
;
if
(
_media_info
.
_host
==
"127.0.0.1"
){
//推流给自己的,通过判断流是否注册上来判断是否正常
if
(
_media_info
.
_schema
!=
RTSP_SCHEMA
&&
_media_info
.
_schema
!=
RTMP_SCHEMA
){
cb
(
SockException
(
Err_other
,
"本服务只支持rtmp/rtsp推流"
));
return
;
}
weak_ptr
<
FFmpegSource
>
weakSelf
=
shared_from_this
();
findAsync
(
timeout_ms
,[
cb
,
weakSelf
,
timeout_ms
](
const
MediaSource
::
Ptr
&
src
){
auto
strongSelf
=
weakSelf
.
lock
();
if
(
!
strongSelf
){
//自己已经销毁
return
;
}
if
(
src
){
//推流给自己成功
cb
(
SockException
());
strongSelf
->
startTimer
(
timeout_ms
);
return
;
}
//推流失败
if
(
!
strongSelf
->
_process
.
wait
(
false
)){
//ffmpeg进程已经退出
cb
(
SockException
(
Err_other
,
StrPrinter
<<
"ffmpeg已经退出,exit code = "
<<
strongSelf
->
_process
.
exit_code
()));
return
;
}
//ffmpeg进程还在线,但是等待推流超时
cb
(
SockException
(
Err_other
,
"等待超时"
));
});
}
else
{
//推流给其他服务器的,通过判断FFmpeg进程是否在线判断是否成功
weak_ptr
<
FFmpegSource
>
weakSelf
=
shared_from_this
();
_timer
=
std
::
make_shared
<
Timer
>
(
timeout_ms
/
1000
,[
weakSelf
,
cb
,
timeout_ms
](){
auto
strongSelf
=
weakSelf
.
lock
();
if
(
!
strongSelf
){
//自身已经销毁
return
false
;
}
//FFmpeg还在线,那么我们认为推流成功
if
(
strongSelf
->
_process
.
wait
(
false
)){
cb
(
SockException
());
strongSelf
->
startTimer
(
timeout_ms
);
return
false
;
}
//ffmpeg进程已经退出
cb
(
SockException
(
Err_other
,
StrPrinter
<<
"ffmpeg已经退出,exit code = "
<<
strongSelf
->
_process
.
exit_code
()));
return
false
;
},
_poller
);
}
}
void
FFmpegSource
::
findAsync
(
int
maxWaitMS
,
const
function
<
void
(
const
MediaSource
::
Ptr
&
src
)
>
&
cb
)
{
auto
src
=
MediaSource
::
find
(
_media_info
.
_schema
,
_media_info
.
_vhost
,
_media_info
.
_app
,
_media_info
.
_streamid
,
false
);
if
(
src
||
!
maxWaitMS
){
cb
(
src
);
return
;
}
void
*
listener_tag
=
this
;
//若干秒后执行等待媒体注册超时回调
auto
onRegistTimeout
=
_poller
->
doDelayTask
(
maxWaitMS
,[
cb
,
listener_tag
](){
//取消监听该事件
NoticeCenter
::
Instance
().
delListener
(
listener_tag
,
Broadcast
::
kBroadcastMediaChanged
);
cb
(
nullptr
);
return
0
;
});
weak_ptr
<
FFmpegSource
>
weakSelf
=
shared_from_this
();
auto
onRegist
=
[
listener_tag
,
weakSelf
,
cb
,
onRegistTimeout
](
BroadcastMediaChangedArgs
)
{
auto
strongSelf
=
weakSelf
.
lock
();
if
(
!
strongSelf
)
{
//本身已经销毁,取消延时任务
onRegistTimeout
->
cancel
();
NoticeCenter
::
Instance
().
delListener
(
listener_tag
,
Broadcast
::
kBroadcastMediaChanged
);
return
;
}
if
(
!
bRegist
||
schema
!=
strongSelf
->
_media_info
.
_schema
||
vhost
!=
strongSelf
->
_media_info
.
_vhost
||
app
!=
strongSelf
->
_media_info
.
_app
||
stream
!=
strongSelf
->
_media_info
.
_streamid
){
//不是自己感兴趣的事件,忽略之
return
;
}
//查找的流终于注册上了;取消延时任务,防止多次回调
onRegistTimeout
->
cancel
();
//取消事件监听
NoticeCenter
::
Instance
().
delListener
(
listener_tag
,
Broadcast
::
kBroadcastMediaChanged
);
//切换到自己的线程再回复
strongSelf
->
_poller
->
async
([
listener_tag
,
weakSelf
,
cb
](){
auto
strongSelf
=
weakSelf
.
lock
();
if
(
!
strongSelf
)
{
return
;
}
//再找一遍媒体源,一般能找到
strongSelf
->
findAsync
(
0
,
cb
);
},
false
);
};
//监听媒体注册事件
NoticeCenter
::
Instance
().
addListener
(
listener_tag
,
Broadcast
::
kBroadcastMediaChanged
,
onRegist
);
}
/**
* 定时检查媒体是否在线
*/
void
FFmpegSource
::
startTimer
(
int
timeout_ms
)
{
weak_ptr
<
FFmpegSource
>
weakSelf
=
shared_from_this
();
_timer
=
std
::
make_shared
<
Timer
>
(
1
,
[
weakSelf
,
timeout_ms
]()
{
auto
strongSelf
=
weakSelf
.
lock
();
if
(
!
strongSelf
)
{
//自身已经销毁
return
false
;
}
if
(
strongSelf
->
_media_info
.
_host
==
"127.0.0.1"
)
{
//推流给自己的,我们通过检查是否已经注册来判断FFmpeg是否工作正常
strongSelf
->
findAsync
(
0
,
[
&
](
const
MediaSource
::
Ptr
&
src
)
{
//同步查找流
if
(
!
src
)
{
//流不在线,重新拉流
strongSelf
->
play
(
strongSelf
->
_src_url
,
strongSelf
->
_dst_url
,
timeout_ms
,
[](
const
SockException
&
)
{});
}
});
}
else
{
//推流给其他服务器的,我们通过判断FFmpeg进程是否在线,如果FFmpeg推流中断,那么它应该会自动退出
if
(
!
strongSelf
->
_process
.
wait
(
false
))
{
//ffmpeg不在线,重新拉流
strongSelf
->
play
(
strongSelf
->
_src_url
,
strongSelf
->
_dst_url
,
timeout_ms
,
[](
const
SockException
&
)
{});
}
}
return
true
;
},
_poller
);
NoticeCenter
::
Instance
().
delListener
(
this
,
Broadcast
::
kBroadcastStreamNoneReader
);
NoticeCenter
::
Instance
().
addListener
(
this
,
Broadcast
::
kBroadcastStreamNoneReader
,[
weakSelf
](
BroadcastStreamNoneReaderArgs
)
{
auto
strongSelf
=
weakSelf
.
lock
();
if
(
!
strongSelf
)
{
//自身已经销毁
return
;
}
if
(
sender
.
getVhost
()
!=
strongSelf
->
_media_info
.
_vhost
||
sender
.
getApp
()
!=
strongSelf
->
_media_info
.
_app
||
sender
.
getId
()
!=
strongSelf
->
_media_info
.
_streamid
){
//不是自己感兴趣的事件,忽略之
return
;
}
//该流无人观看,我们停止吧
if
(
strongSelf
->
_onClose
){
strongSelf
->
_onClose
();
}
});
}
void
FFmpegSource
::
setOnClose
(
const
function
<
void
()
>
&
cb
){
_onClose
=
cb
;
}
\ No newline at end of file
server/FFmpegSource.h
0 → 100644
查看文件 @
c5037493
//
// Created by xzl on 2018/5/24.
//
#ifndef FFMPEG_SOURCE_H
#define FFMPEG_SOURCE_H
#include <mutex>
#include <memory>
#include <functional>
#include "Process.h"
#include "Util/TimeTicker.h"
#include "Network/Socket.h"
#include "Common/MediaSource.h"
using
namespace
std
;
using
namespace
toolkit
;
using
namespace
mediakit
;
class
FFmpegSource
:
public
std
::
enable_shared_from_this
<
FFmpegSource
>
{
public
:
typedef
shared_ptr
<
FFmpegSource
>
Ptr
;
typedef
function
<
void
(
const
SockException
&
ex
)
>
onPlay
;
FFmpegSource
();
virtual
~
FFmpegSource
();
/**
* 设置主动关闭回调
* @param cb
*/
void
setOnClose
(
const
function
<
void
()
>
&
cb
);
void
play
(
const
string
&
src_url
,
const
string
&
dst_url
,
int
timeout_ms
,
const
onPlay
&
cb
);
private
:
void
findAsync
(
int
maxWaitMS
,
const
function
<
void
(
const
MediaSource
::
Ptr
&
src
)
>
&
cb
);
void
startTimer
(
int
timeout_ms
);
private
:
Process
_process
;
Timer
::
Ptr
_timer
;
EventPoller
::
Ptr
_poller
;
MediaInfo
_media_info
;
string
_src_url
;
string
_dst_url
;
function
<
void
()
>
_onClose
;
};
#endif //FFMPEG_SOURCE_H
server/WebApi.cpp
查看文件 @
c5037493
...
@@ -16,6 +16,9 @@
...
@@ -16,6 +16,9 @@
#include "Http/HttpSession.h"
#include "Http/HttpSession.h"
#include "Network/TcpServer.h"
#include "Network/TcpServer.h"
#include "Player/PlayerProxy.h"
#include "Player/PlayerProxy.h"
#include "FFmpegSource.h"
#include "Util/MD5.h"
#include "WebApi.h"
using
namespace
Json
;
using
namespace
Json
;
using
namespace
toolkit
;
using
namespace
toolkit
;
...
@@ -216,6 +219,9 @@ static inline string getProxyKey(const string &vhost,const string &app,const str
...
@@ -216,6 +219,9 @@ static inline string getProxyKey(const string &vhost,const string &app,const str
return
vhost
+
"/"
+
app
+
"/"
+
stream
;
return
vhost
+
"/"
+
app
+
"/"
+
stream
;
}
}
static
unordered_map
<
string
,
FFmpegSource
::
Ptr
>
s_ffmpegMap
;
static
recursive_mutex
s_ffmpegMapMtx
;
/**
/**
* 安装api接口
* 安装api接口
* 所有api都支持GET和POST两种方式
* 所有api都支持GET和POST两种方式
...
@@ -427,12 +433,11 @@ void installWebApi() {
...
@@ -427,12 +433,11 @@ void installWebApi() {
//指定RTP over TCP(播放rtsp时有效)
//指定RTP over TCP(播放rtsp时有效)
(
*
player
)[
kRtpType
]
=
rtp_type
;
(
*
player
)[
kRtpType
]
=
rtp_type
;
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试
player
->
setPlayCallbackOnce
([
cb
,
player
,
key
](
const
SockException
&
ex
){
player
->
setPlayCallbackOnce
([
cb
,
key
](
const
SockException
&
ex
){
if
(
ex
){
if
(
ex
){
lock_guard
<
recursive_mutex
>
lck
(
s_proxyMapMtx
);
lock_guard
<
recursive_mutex
>
lck
(
s_proxyMapMtx
);
s_proxyMap
.
erase
(
key
);
s_proxyMap
.
erase
(
key
);
}
}
const_cast
<
PlayerProxy
::
Ptr
&>
(
player
).
reset
();
cb
(
ex
,
key
);
cb
(
ex
,
key
);
});
});
...
@@ -476,6 +481,62 @@ void installWebApi() {
...
@@ -476,6 +481,62 @@ void installWebApi() {
val
[
"data"
][
"flag"
]
=
s_proxyMap
.
erase
(
allArgs
[
"key"
])
==
1
;
val
[
"data"
][
"flag"
]
=
s_proxyMap
.
erase
(
allArgs
[
"key"
])
==
1
;
});
});
static
auto
addFFmepgSource
=
[](
const
string
&
src_url
,
const
string
&
dst_url
,
int
timeout_ms
,
const
function
<
void
(
const
SockException
&
ex
,
const
string
&
key
)
>
&
cb
){
auto
key
=
MD5
(
dst_url
).
hexdigest
();
lock_guard
<
decltype
(
s_ffmpegMapMtx
)
>
lck
(
s_ffmpegMapMtx
);
if
(
s_ffmpegMap
.
find
(
key
)
!=
s_ffmpegMap
.
end
()){
//已经在拉流了
cb
(
SockException
(
Err_success
),
key
);
return
;
}
FFmpegSource
::
Ptr
ffmpeg
=
std
::
make_shared
<
FFmpegSource
>
();
s_ffmpegMap
[
key
]
=
ffmpeg
;
ffmpeg
->
setOnClose
([
key
](){
lock_guard
<
decltype
(
s_ffmpegMapMtx
)
>
lck
(
s_ffmpegMapMtx
);
s_ffmpegMap
.
erase
(
key
);
});
ffmpeg
->
play
(
src_url
,
dst_url
,
timeout_ms
,[
cb
,
key
](
const
SockException
&
ex
){
if
(
ex
){
lock_guard
<
decltype
(
s_ffmpegMapMtx
)
>
lck
(
s_ffmpegMapMtx
);
s_ffmpegMap
.
erase
(
key
);
}
cb
(
ex
,
key
);
});
};
//动态添加rtsp/rtmp拉流代理
//测试url http://127.0.0.1/index/api/addFFmpegSource?src_url=http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8&dst_url=rtmp://127.0.0.1/live/hks2&timeout_ms=10000
API_REGIST_INVOKER
(
api
,
addFFmpegSource
,{
CHECK_SECRET
();
CHECK_ARGS
(
"src_url"
,
"dst_url"
,
"timeout_ms"
);
auto
src_url
=
allArgs
[
"src_url"
];
auto
dst_url
=
allArgs
[
"dst_url"
];
int
timeout_ms
=
allArgs
[
"timeout_ms"
];
addFFmepgSource
(
src_url
,
dst_url
,
timeout_ms
,[
invoker
,
val
,
headerOut
](
const
SockException
&
ex
,
const
string
&
key
){
if
(
ex
){
const_cast
<
Value
&>
(
val
)[
"code"
]
=
API
::
OtherFailed
;
const_cast
<
Value
&>
(
val
)[
"msg"
]
=
ex
.
what
();
}
else
{
const_cast
<
Value
&>
(
val
)[
"data"
][
"key"
]
=
key
;
}
invoker
(
"200 OK"
,
headerOut
,
val
.
toStyledString
());
});
});
//关闭拉流代理
//测试url http://127.0.0.1/index/api/delFFmepgSource?key=key
API_REGIST
(
api
,
delFFmepgSource
,{
CHECK_SECRET
();
CHECK_ARGS
(
"key"
);
lock_guard
<
decltype
(
s_ffmpegMapMtx
)
>
lck
(
s_ffmpegMapMtx
);
val
[
"data"
][
"flag"
]
=
s_ffmpegMap
.
erase
(
allArgs
[
"key"
])
==
1
;
});
////////////以下是注册的Hook API////////////
////////////以下是注册的Hook API////////////
API_REGIST
(
hook
,
on_publish
,{
API_REGIST
(
hook
,
on_publish
,{
...
@@ -517,22 +578,27 @@ void installWebApi() {
...
@@ -517,22 +578,27 @@ void installWebApi() {
//媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流
//媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流
CHECK_SECRET
();
CHECK_SECRET
();
CHECK_ARGS
(
"vhost"
,
"app"
,
"stream"
);
CHECK_ARGS
(
"vhost"
,
"app"
,
"stream"
);
addStreamProxy
(
allArgs
[
"vhost"
],
GET_CONFIG
(
int
,
rtmp_port
,
Rtmp
::
kPort
);
allArgs
[
"app"
],
allArgs
[
"stream"
],
string
dst_url
=
StrPrinter
"rtmp://live.hkstv.hk.lxdns.com/live/hks2"
,
<<
"rtmp://127.0.0.1:"
false
,
<<
rtmp_port
<<
"/"
false
,
<<
allArgs
[
"app"
]
<<
"/"
0
,
<<
allArgs
[
"stream"
]
<<
"?vhost="
[
invoker
,
val
,
headerOut
](
const
SockException
&
ex
,
const
string
&
key
){
<<
allArgs
[
"vhost"
];
if
(
ex
){
const_cast
<
Value
&>
(
val
)[
"code"
]
=
API
::
OtherFailed
;
addFFmepgSource
(
"http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8"
,
const_cast
<
Value
&>
(
val
)[
"msg"
]
=
ex
.
what
();
dst_url
,
}
else
{
10000
,
const_cast
<
Value
&>
(
val
)[
"data"
][
"key"
]
=
key
;
[
invoker
,
val
,
headerOut
](
const
SockException
&
ex
,
const
string
&
key
){
}
if
(
ex
){
invoker
(
"200 OK"
,
headerOut
,
val
.
toStyledString
());
const_cast
<
Value
&>
(
val
)[
"code"
]
=
API
::
OtherFailed
;
});
const_cast
<
Value
&>
(
val
)[
"msg"
]
=
ex
.
what
();
}
else
{
const_cast
<
Value
&>
(
val
)[
"data"
][
"key"
]
=
key
;
}
invoker
(
"200 OK"
,
headerOut
,
val
.
toStyledString
());
});
});
});
...
@@ -555,6 +621,13 @@ void installWebApi() {
...
@@ -555,6 +621,13 @@ void installWebApi() {
}
}
void
unInstallWebApi
(){
void
unInstallWebApi
(){
lock_guard
<
recursive_mutex
>
lck
(
s_proxyMapMtx
);
{
s_proxyMap
.
clear
();
lock_guard
<
recursive_mutex
>
lck
(
s_proxyMapMtx
);
s_proxyMap
.
clear
();
}
{
lock_guard
<
recursive_mutex
>
lck
(
s_ffmpegMapMtx
);
s_ffmpegMap
.
clear
();
}
}
}
\ No newline at end of file
server/WebApi.h
0 → 100644
查看文件 @
c5037493
//
// Created by xzl on 2019-06-06.
//
#ifndef ZLMEDIAKIT_WEBAPI_H
#define ZLMEDIAKIT_WEBAPI_H
namespace
mediakit
{
////////////RTSP服务器配置///////////
namespace
Rtsp
{
extern
const
char
kPort
[];
}
//namespace Rtsp
////////////RTMP服务器配置///////////
namespace
Rtmp
{
extern
const
char
kPort
[];
}
//namespace RTMP
}
// namespace mediakit
#endif //ZLMEDIAKIT_WEBAPI_H
server/main.cpp
查看文件 @
c5037493
...
@@ -44,6 +44,7 @@
...
@@ -44,6 +44,7 @@
#include "Player/PlayerProxy.h"
#include "Player/PlayerProxy.h"
#include "Http/WebSocketSession.h"
#include "Http/WebSocketSession.h"
#include "System.h"
#include "System.h"
#include "WebApi.h"
using
namespace
std
;
using
namespace
std
;
using
namespace
toolkit
;
using
namespace
toolkit
;
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论