Commit a3c8cb11 by baiyfcu Committed by GitHub

Merge pull request #5 from zlmediakit/master

update
parents d3a4fc1f 3c4d7498
release/ filter=lfs diff=lfs merge=lfs -text
...@@ -34,3 +34,4 @@ ...@@ -34,3 +34,4 @@
/cmake-build-debug/ /cmake-build-debug/
/.idea/ /.idea/
/c_wrapper/.idea/ /c_wrapper/.idea/
/release/mac/Debug/
\ No newline at end of file
Subproject commit 2bb234006c852b1d1a61a0e9a7f39dde7105fe34 Subproject commit 57e7c83d5667b1e06fb8f5ca73dbe3f04a9fc67f
Subproject commit 6df71e01c174cdfe69e597cc4acb766a20b28620 Subproject commit 40edf6243d9d99676062062efdec203b24a178aa
...@@ -2,13 +2,34 @@ ...@@ -2,13 +2,34 @@
cmake_minimum_required(VERSION 3.1.3) cmake_minimum_required(VERSION 3.1.3)
#使能c++11 #使能c++11
set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD 11)
#加载自定义模块 #加载自定义模块
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PROJECT_SOURCE_DIR}/cmake") set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PROJECT_SOURCE_DIR}/cmake")
#设置库文件路径
set(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/lib) #set(CMAKE_BUILD_TYPE "Release")
#设置可执行程序路径
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin) if(${CMAKE_BUILD_TYPE} MATCHES "Release")
message(STATUS "Release版本")
set(BuildType "Release")
else()
set(BuildType "Debug")
message(STATUS "Debug版本")
endif()
#设置bin和lib库目录
set(RELEASE_DIR ${CMAKE_SOURCE_DIR}/release)
if (CMAKE_SYSTEM_NAME MATCHES "Linux")
SET(LIBRARY_OUTPUT_PATH ${RELEASE_DIR}/linux/${BuildType})
SET(EXECUTABLE_OUTPUT_PATH ${RELEASE_DIR}/linux/${BuildType})
elseif (CMAKE_SYSTEM_NAME MATCHES "Windows")
SET(LIBRARY_OUTPUT_PATH ${RELEASE_DIR}/windows/${BuildType})
SET(EXECUTABLE_OUTPUT_PATH ${RELEASE_DIR}/windows/${BuildType})
elseif (CMAKE_SYSTEM_NAME MATCHES "Darwin")
SET(LIBRARY_OUTPUT_PATH ${RELEASE_DIR}/mac/${BuildType})
SET(EXECUTABLE_OUTPUT_PATH ${RELEASE_DIR}/mac/${BuildType})
endif ()
LINK_DIRECTORIES(${LIBRARY_OUTPUT_PATH})
#设置工程源码根目录 #设置工程源码根目录
set(ToolKit_Root ${CMAKE_CURRENT_SOURCE_DIR}/3rdpart/ZLToolKit/src) set(ToolKit_Root ${CMAKE_CURRENT_SOURCE_DIR}/3rdpart/ZLToolKit/src)
...@@ -106,7 +127,7 @@ add_library(zltoolkit STATIC ${ToolKit_src_list}) ...@@ -106,7 +127,7 @@ add_library(zltoolkit STATIC ${ToolKit_src_list})
add_library(zlmediakit STATIC ${MediaKit_src_list}) add_library(zlmediakit STATIC ${MediaKit_src_list})
set(VS_FALGS "/wd4819 /wd4996 /wd4018 /wd4267 /wd4244 /wd4101 /wd4828 /wd4309 /wd4573 /utf-8" ) set(VS_FALGS "/wd4819 /wd4996 /wd4018 /wd4267 /wd4244 /wd4101 /wd4828 /wd4309 /wd4573" )
#libmpeg #libmpeg
if(ENABLE_HLS) if(ENABLE_HLS)
aux_source_directory(${MediaServer_Root}/libmpeg/include src_mpeg) aux_source_directory(${MediaServer_Root}/libmpeg/include src_mpeg)
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
- Well performance and stable test,can be used commercially. - Well performance and stable test,can be used commercially.
- Support linux, macos, ios, android, Windows Platforms. - Support linux, macos, ios, android, Windows Platforms.
- Very low latency(lower then one second), video opened immediately. - Very low latency(lower then one second), video opened immediately.
- **Now Support websocket-flv!**
## Features ## Features
...@@ -116,7 +117,12 @@ ...@@ -116,7 +117,12 @@
- Apple OSX(Darwin), both 32 and 64bits. - Apple OSX(Darwin), both 32 and 64bits.
- All hardware with x86/x86_64/arm/mips cpu. - All hardware with x86/x86_64/arm/mips cpu.
- Windows. - Windows.
- **You must use git to clone the complete code. Do not download the source code by downloading zip package. Otherwise, the sub-module code will not be downloaded by default.** - **You must use git to clone the complete code. Do not download the source code by downloading zip package. Otherwise, the sub-module code will not be downloaded by default.You can do it like this:**
```
git clone https://github.com/zlmediakit/ZLMediaKit.git
cd ZLMediaKit
git submodule update --init
```
...@@ -231,7 +237,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber ...@@ -231,7 +237,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber
## Usage ## Usage
- As server: - As server:
``` ```cpp
TcpServer::Ptr rtspSrv(new TcpServer()); TcpServer::Ptr rtspSrv(new TcpServer());
TcpServer::Ptr rtmpSrv(new TcpServer()); TcpServer::Ptr rtmpSrv(new TcpServer());
TcpServer::Ptr httpSrv(new TcpServer()); TcpServer::Ptr httpSrv(new TcpServer());
...@@ -244,7 +250,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber ...@@ -244,7 +250,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber
``` ```
- As player: - As player:
``` ```cpp
MediaPlayer::Ptr player(new MediaPlayer()); MediaPlayer::Ptr player(new MediaPlayer());
weak_ptr<MediaPlayer> weakPlayer = player; weak_ptr<MediaPlayer> weakPlayer = player;
player->setOnPlayResult([weakPlayer](const SockException &ex) { player->setOnPlayResult([weakPlayer](const SockException &ex) {
...@@ -273,7 +279,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber ...@@ -273,7 +279,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber
player->play("rtsp://admin:jzan123456@192.168.0.122/"); player->play("rtsp://admin:jzan123456@192.168.0.122/");
``` ```
- As proxy server: - As proxy server:
``` ```cpp
//support rtmp and rtsp url //support rtmp and rtsp url
//just support H264+AAC //just support H264+AAC
auto urlList = {"rtmp://live.hkstv.hk.lxdns.com/live/hks", auto urlList = {"rtmp://live.hkstv.hk.lxdns.com/live/hks",
...@@ -288,7 +294,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber ...@@ -288,7 +294,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber
``` ```
- As puser: - As puser:
``` ```cpp
PlayerProxy::Ptr player(new PlayerProxy("app","stream")); PlayerProxy::Ptr player(new PlayerProxy("app","stream"));
player->play("rtmp://live.hkstv.hk.lxdns.com/live/hks"); player->play("rtmp://live.hkstv.hk.lxdns.com/live/hks");
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
- 代码经过大量的稳定性、性能测试,可满足商用服务器项目。 - 代码经过大量的稳定性、性能测试,可满足商用服务器项目。
- 支持linux、macos、ios、android、windows平台 - 支持linux、macos、ios、android、windows平台
- 支持画面秒开(GOP缓存)、极低延时(1秒内) - 支持画面秒开(GOP缓存)、极低延时(1秒内)
- **支持websocket-flv直播**
- [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86) - [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86)
## 项目定位 ## 项目定位
...@@ -127,7 +128,12 @@ ...@@ -127,7 +128,12 @@
## 编译要求 ## 编译要求
- 编译器支持C++11,GCC4.8/Clang3.3/VC2015或以上 - 编译器支持C++11,GCC4.8/Clang3.3/VC2015或以上
- cmake3.2或以上 - cmake3.2或以上
- **必须使用git下载完整的代码,不要使用下载zip包的方式下载源码,否则子模块代码默认不下载!** - **必须使用git下载完整的代码,不要使用下载zip包的方式下载源码,否则子模块代码默认不下载!你可以像以下这样操作:**
```
git clone https://github.com/zlmediakit/ZLMediaKit.git
cd ZLMediaKit
git submodule update --init
```
## 编译(Linux) ## 编译(Linux)
- 我的编译环境 - 我的编译环境
...@@ -219,7 +225,7 @@ ...@@ -219,7 +225,7 @@
``` ```
## 使用方法 ## 使用方法
- 作为服务器: - 作为服务器:
``` ```cpp
TcpServer::Ptr rtspSrv(new TcpServer()); TcpServer::Ptr rtspSrv(new TcpServer());
TcpServer::Ptr rtmpSrv(new TcpServer()); TcpServer::Ptr rtmpSrv(new TcpServer());
TcpServer::Ptr httpSrv(new TcpServer()); TcpServer::Ptr httpSrv(new TcpServer());
...@@ -232,7 +238,7 @@ ...@@ -232,7 +238,7 @@
``` ```
- 作为播放器: - 作为播放器:
``` ```cpp
MediaPlayer::Ptr player(new MediaPlayer()); MediaPlayer::Ptr player(new MediaPlayer());
weak_ptr<MediaPlayer> weakPlayer = player; weak_ptr<MediaPlayer> weakPlayer = player;
player->setOnPlayResult([weakPlayer](const SockException &ex) { player->setOnPlayResult([weakPlayer](const SockException &ex) {
...@@ -261,7 +267,7 @@ ...@@ -261,7 +267,7 @@
player->play("rtsp://admin:jzan123456@192.168.0.122/"); player->play("rtsp://admin:jzan123456@192.168.0.122/");
``` ```
- 作为代理服务器: - 作为代理服务器:
``` ```cpp
//support rtmp and rtsp url //support rtmp and rtsp url
//just support H264+AAC //just support H264+AAC
auto urlList = {"rtmp://live.hkstv.hk.lxdns.com/live/hks", auto urlList = {"rtmp://live.hkstv.hk.lxdns.com/live/hks",
...@@ -285,7 +291,7 @@ ...@@ -285,7 +291,7 @@
``` ```
- 作为推流客户端器: - 作为推流客户端器:
``` ```cpp
PlayerProxy::Ptr player(new PlayerProxy("app","stream")); PlayerProxy::Ptr player(new PlayerProxy("app","stream"));
//拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream"
//你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请研读MediaReader代码) //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请研读MediaReader代码)
......
[api]
#是否调试http api,启用调试后,会打印每次http请求的内容和回复
apiDebug=1
#一些比较敏感的http api在访问时需要提供secret,否则无权限调用
#如果是通过127.0.0.1访问,那么可以不提供secret
secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc
[ffmpeg]
#FFmpeg可执行程序路径
bin=/usr/local/bin/ffmpeg
#FFmpeg拉流再推流的命令模板,通过该模板可以设置再编码的一些参数
cmd=%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s
#FFmpeg日志的路径,如果置空则不生成FFmpeg日志
log=/Users/xzl/git/ZLMediaKit/release/mac/Release/ffmpeg/ffmpeg.log
[general]
#是否启用虚拟主机
enableVhost=1
#播放器或推流器在断开后会触发hook.on_flow_report事件(使用多少流量事件),
#flowThreshold参数控制触发hook.on_flow_report事件阈值,使用流量超过该阈值后才触发,单位KB
flowThreshold=1024
#播放最多等待时间,单位毫秒
#播放在播放某个流时,如果该流不存在,
#ZLMediaKit会最多让播放器等待maxStreamWaitMS毫秒
#如果在这个时间内,该流注册成功,那么会立即返回播放器播放成功
#否则返回播放器未找到该流,该机制的目的是可以先播放再推流
maxStreamWaitMS=5000
#某个流无人观看时,触发hook.on_stream_none_reader事件的最大等待时间,单位毫秒
#在配合hook.on_stream_none_reader事件时,可以做到无人观看自动停止拉流或停止接收推流
streamNoneReaderDelayMS=5000
[hls]
#hls写文件的buf大小,调整参数可以提高文件io性能
fileBufSize=65536
#hls保存文件路径
filePath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot
#hls最大切片时间
segDur=3
#m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个)
segNum=3
[hook]
#在推流时,如果url参数匹对admin_params,那么可以不经过hook鉴权直接推流成功,播放时亦然
#该配置项的目的是为了开发者自己调试测试,该参数暴露后会有泄露隐私的安全隐患
admin_params=secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc
#是否启用hook事件,启用后,推拉流都将进行鉴权
enable=0
#播放器或推流器使用流量事件,置空则关闭
on_flow_report=https://127.0.0.1/index/hook/on_flow_report
#访问http文件鉴权事件,置空则关闭鉴权
on_http_access=https://127.0.0.1/index/hook/on_http_access
#播放鉴权事件,置空则关闭鉴权
on_play=https://127.0.0.1/index/hook/on_play
#推流鉴权事件,置空则关闭鉴权
on_publish=https://127.0.0.1/index/hook/on_publish
#录制mp4切片完成事件
on_record_mp4=https://127.0.0.1/index/hook/on_record_mp4
#rtsp播放鉴权事件,此事件中比对rtsp的用户名密码
on_rtsp_auth=https://127.0.0.1/index/hook/on_rtsp_auth
#rtsp播放是否开启鉴权事件,置空则关闭rtsp鉴权。rtsp播放鉴权还支持url方式鉴权
#建议开发者统一采用url参数方式鉴权,rtsp用户名密码鉴权一般在设备上用的比较多
on_rtsp_realm=https://127.0.0.1/index/hook/on_rtsp_realm
#远程telnet调试鉴权事件
on_shell_login=https://127.0.0.1/index/hook/on_shell_login
#直播流注册或注销事件
on_stream_changed=https://127.0.0.1/index/hook/on_stream_changed
#无人观看流事件,通过该事件,可以选择是否关闭无人观看的流。配合general.streamNoneReaderDelayMS选项一起使用
on_stream_none_reader=https://127.0.0.1/index/hook/on_stream_none_reader
#播放时,未找到流事件,通过配合hook.on_stream_none_reader事件可以完成按需拉流
on_stream_not_found=https://127.0.0.1/index/hook/on_stream_not_found
#hook api最大等待回复时间,单位秒
timeoutSec=10
[http]
#http服务器字符编码,windows上默认gb2312
charSet=utf-8
#http链接超时时间
keepAliveSecond=10
#keep-alive类型的链接最多复用次数
maxReqCount=100
#http请求体最大字节数,如果post的body太大,则不适合缓存body在内存
maxReqSize=4096
#404网页内容,用户可以自定义404网页
notFound=<html><head><title>404 Not Found</title></head><body bgcolor="white"><center><h1>您访问的资源不存在!</h1></center><hr><center>ZLMediaKit-4.0</center></body></html>
#http服务器监听端口
port=80
#http文件服务器根目录
rootPath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot
#http文件服务器读文件缓存大小,单位BYTE,调整该参数可以优化文件io性能
sendBufSize=65536
#https服务器监听端口
sslport=443
[multicast]
#rtp组播截止组播ip地址
addrMax=239.255.255.255
#rtp组播起始组播ip地址
addrMin=239.0.0.0
#组播udp ttl
udpTTL=64
[record]
#mp4录制或mp4点播的应用名,通过限制应用名,可以防止随意点播
appName=record
#mp4录制写文件缓存,单位BYTE,调整参数可以提高文件io性能
fileBufSize=65536
#mp4录制保存路径
filePath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot
#mp4录制切片时间,单位秒
fileSecond=3600
#mp4点播每次流化数据量,单位毫秒,
#减少该值可以让点播数据发送量更平滑,增大该值则更节省cpu资源
sampleMS=100
[rtmp]
#rtmp必须在此时间内完成握手,否则服务器会断开链接,单位秒
handshakeSecond=15
#rtmp超时时间,如果该时间内未收到客户端的数据,
#或者tcp发送缓存超过这个时间,则会断开连接,单位秒
keepAliveSecond=15
#在接收rtmp推流时,是否重新生成时间戳(很多推流器的时间戳着实很烂)
modifyStamp=1
#rtmp服务器监听端口
port=1935
[rtp]
#音频mtu大小,该参数限制rtp最大字节数,推荐不要超过1400
#加大该值会明显增加直播延时
audioMtuSize=600
#如果rtp的序列号连续clearCount次有序,那么rtp将不再排序(目的减少rtp排序导致的延时)
clearCount=10
#rtp时间戳回环时间,单位毫秒
cycleMS=46800000
#rtp排序map缓存大小,加大该值可能会增大延时,但是rtp乱序问题会减小
maxRtpCount=50
#视频mtu大小,该参数限制rtp最大字节数,推荐不要超过1400
videoMtuSize=1400
[rtsp]
#rtsp专有鉴权方式是采用base64还是md5方式
authBasic=0
#rtsp拉流代理是否是直接代理模式
#直接代理后支持任意编码格式,但是会导致GOP缓存无法定位到I帧,可能会导致开播花屏
#并且如果是tcp方式拉流,如果rtp大于mtu会导致无法使用udp方式代理
#假定您的拉流源地址不是264或265或AAC,那么你可以使用直接代理的方式来支持rtsp代理
#默认开启rtsp直接代理,rtmp由于没有这些问题,是强制开启直接代理的
directProxy=1
#rtsp必须在此时间内完成握手,否则服务器会断开链接,单位秒
handshakeSecond=15
#rtsp超时时间,如果该时间内未收到客户端的数据,
#或者tcp发送缓存超过这个时间,则会断开连接,单位秒
keepAliveSecond=15
#rtsp服务器监听地址
port=554
#rtsps服务器监听地址
sslport=322
[shell]
#调试telnet服务器接受最大bufffer大小
maxReqSize=1024
#调试telnet服务器监听端口
port=9000
执行可执行程序时,请在终端输入:
```
export LD_LIBRARY_PATH=./
./MediaServer -d &
```
如果由于so动态库链接失败导致运行不起来,请重建so库软链接
如果由于端口权限问题导致启动失败,请修改配置文件中端口号,或者以root权限运行
执行可执行程序时,请在终端输入:
```
export DYLD_LIBRARY_PATH=./
./MediaServer -d &
```
如果由于so动态库链接失败导致运行不起来,请重建so库软链接
如果由于端口权限问题导致启动失败,请修改配置文件中端口号,或者以root权限运行
执行可执行程序时,可以直接双击MediaServer运行
如果由于端口权限问题导致启动失败,请修改配置文件(.ini后缀的文件)中端口号,然后再运行
...@@ -74,7 +74,7 @@ const string kOnHttpAccess = HOOK_FIELD"on_http_access"; ...@@ -74,7 +74,7 @@ const string kOnHttpAccess = HOOK_FIELD"on_http_access";
const string kAdminParams = HOOK_FIELD"admin_params"; const string kAdminParams = HOOK_FIELD"admin_params";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kEnable] = true; mINI::Instance()[kEnable] = false;
mINI::Instance()[kTimeoutSec] = 10; mINI::Instance()[kTimeoutSec] = 10;
mINI::Instance()[kOnPublish] = "https://127.0.0.1/index/hook/on_publish"; mINI::Instance()[kOnPublish] = "https://127.0.0.1/index/hook/on_publish";
mINI::Instance()[kOnPlay] = "https://127.0.0.1/index/hook/on_play"; mINI::Instance()[kOnPlay] = "https://127.0.0.1/index/hook/on_play";
...@@ -318,7 +318,7 @@ void installWebHook(){ ...@@ -318,7 +318,7 @@ void installWebHook(){
do_http_hook(hook_stream_not_found,body, nullptr); do_http_hook(hook_stream_not_found,body, nullptr);
}); });
#ifdef ENABLE_MP4V2 #ifdef ENABLE_MP4RECORD
//录制mp4文件成功后广播 //录制mp4文件成功后广播
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastRecordMP4,[](BroadcastRecordMP4Args){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastRecordMP4,[](BroadcastRecordMP4Args){
if(!hook_enable || hook_record_mp4.empty()){ if(!hook_enable || hook_record_mp4.empty()){
...@@ -338,7 +338,7 @@ void installWebHook(){ ...@@ -338,7 +338,7 @@ void installWebHook(){
//执行hook //执行hook
do_http_hook(hook_record_mp4,body, nullptr); do_http_hook(hook_record_mp4,body, nullptr);
}); });
#endif //ENABLE_MP4V2 #endif //ENABLE_MP4RECORD
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastShellLogin,[](BroadcastShellLoginArgs){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastShellLogin,[](BroadcastShellLoginArgs){
if(!hook_enable || hook_shell_login.empty() || sender.get_peer_ip() == "127.0.0.1"){ if(!hook_enable || hook_shell_login.empty() || sender.get_peer_ip() == "127.0.0.1"){
......
...@@ -70,7 +70,7 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) { ...@@ -70,7 +70,7 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) {
it->second->inputFrame(frame); it->second->inputFrame(frame);
if(!_allTrackReady && !_trackReadyCallback.empty() && it->second->ready()){ if(!_allTrackReady && !_trackReadyCallback.empty() && it->second->ready()){
//Track由未就绪状态换成就绪状态,我们就触发onTrackReady回调 //Track由未就绪状态换成就绪状态,我们就触发onTrackReady回调
auto it_callback = _trackReadyCallback.find(codec_id); auto it_callback = _trackReadyCallback.find(codec_id);
if(it_callback != _trackReadyCallback.end()){ if(it_callback != _trackReadyCallback.end()){
it_callback->second(); it_callback->second();
......
...@@ -123,6 +123,7 @@ MediaSource::Ptr MediaSource::find( ...@@ -123,6 +123,7 @@ MediaSource::Ptr MediaSource::find(
lock_guard<recursive_mutex> lock(g_mtxMediaSrc); lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
MediaSource::Ptr ret; MediaSource::Ptr ret;
//查找某一媒体源,找到后返回
searchMedia(schema, vhost, app, id, searchMedia(schema, vhost, app, id,
[&](SchemaVhostAppStreamMap::iterator &it0 , [&](SchemaVhostAppStreamMap::iterator &it0 ,
VhostAppStreamMap::iterator &it1, VhostAppStreamMap::iterator &it1,
...@@ -138,7 +139,7 @@ MediaSource::Ptr MediaSource::find( ...@@ -138,7 +139,7 @@ MediaSource::Ptr MediaSource::find(
return true; return true;
}); });
if(!ret && bMake){ if(!ret && bMake){
//查找某一媒体源,找到后返回 //未查找媒体源,则创建一个
ret = MediaReader::onMakeMediaSource(schema, vhost,app,id); ret = MediaReader::onMakeMediaSource(schema, vhost,app,id);
} }
return ret; return ret;
......
...@@ -186,6 +186,14 @@ public: ...@@ -186,6 +186,14 @@ public:
} }
virtual int readerCount() = 0; virtual int readerCount() = 0;
/**
* 获取track
* @return
*/
virtual vector<Track::Ptr> getTracks(bool trackReady) const{
return vector<Track::Ptr>(0);
}
protected: protected:
void regist() ; void regist() ;
bool unregist() ; bool unregist() ;
......
...@@ -161,13 +161,16 @@ namespace Rtsp { ...@@ -161,13 +161,16 @@ namespace Rtsp {
const string kAuthBasic = RTSP_FIELD"authBasic"; const string kAuthBasic = RTSP_FIELD"authBasic";
const string kHandshakeSecond = RTSP_FIELD"handshakeSecond"; const string kHandshakeSecond = RTSP_FIELD"handshakeSecond";
const string kKeepAliveSecond = RTSP_FIELD"keepAliveSecond"; const string kKeepAliveSecond = RTSP_FIELD"keepAliveSecond";
const string kDirectProxy = RTSP_FIELD"directProxy";; const string kDirectProxy = RTSP_FIELD"directProxy";
const string kModifyStamp = RTSP_FIELD"modifyStamp";
onceToken token([](){ onceToken token([](){
//默认Md5方式认证 //默认Md5方式认证
mINI::Instance()[kAuthBasic] = 0; mINI::Instance()[kAuthBasic] = 0;
mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kHandshakeSecond] = 15;
mINI::Instance()[kKeepAliveSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15;
mINI::Instance()[kDirectProxy] = 1; mINI::Instance()[kDirectProxy] = 1;
mINI::Instance()[kModifyStamp] = true;
},nullptr); },nullptr);
} //namespace Rtsp } //namespace Rtsp
......
...@@ -209,6 +209,8 @@ extern const string kKeepAliveSecond; ...@@ -209,6 +209,8 @@ extern const string kKeepAliveSecond;
//假定您的拉流源地址不是264或265或AAC,那么你可以使用直接代理的方式来支持rtsp代理 //假定您的拉流源地址不是264或265或AAC,那么你可以使用直接代理的方式来支持rtsp代理
//默认开启rtsp直接代理,rtmp由于没有这些问题,是强制开启直接代理的 //默认开启rtsp直接代理,rtmp由于没有这些问题,是强制开启直接代理的
extern const string kDirectProxy; extern const string kDirectProxy;
//rtsp推流是否修改时间戳
extern const string kModifyStamp;
} //namespace Rtsp } //namespace Rtsp
////////////RTMP服务器配置/////////// ////////////RTMP服务器配置///////////
......
...@@ -48,7 +48,7 @@ string makeAdtsConfig(const uint8_t *pcAdts); ...@@ -48,7 +48,7 @@ string makeAdtsConfig(const uint8_t *pcAdts);
void getAACInfo(const AACFrame &adts,int &iSampleRate,int &iChannel); void getAACInfo(const AACFrame &adts,int &iSampleRate,int &iChannel);
/** /**
* aac帧,包含adts头 * aac帧,包含adts头
*/ */
class AACFrame : public Frame { class AACFrame : public Frame {
...@@ -156,10 +156,10 @@ public: ...@@ -156,10 +156,10 @@ public:
* @param aac_cfg aac两个字节的配置信息 * @param aac_cfg aac两个字节的配置信息
*/ */
AACTrack(const string &aac_cfg){ AACTrack(const string &aac_cfg){
if(aac_cfg.size() != 2){ if(aac_cfg.size() < 2){
throw std::invalid_argument("adts配置必须2个字节"); throw std::invalid_argument("adts配置必须最少2个字节");
} }
_cfg = aac_cfg; _cfg = aac_cfg.substr(0,2);
onReady(); onReady();
} }
......
...@@ -36,6 +36,7 @@ using namespace toolkit; ...@@ -36,6 +36,7 @@ using namespace toolkit;
namespace mediakit{ namespace mediakit{
/** /**
* h264 Rtmp解码类 * h264 Rtmp解码类
* 将 h264 over rtmp 解复用出 h264-Frame
*/ */
class H264RtmpDecoder : public RtmpCodec ,public ResourcePoolHelper<H264Frame> { class H264RtmpDecoder : public RtmpCodec ,public ResourcePoolHelper<H264Frame> {
public: public:
......
...@@ -90,7 +90,24 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -90,7 +90,24 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
* Type==7:SPS frame * Type==7:SPS frame
* Type==8:PPS frame * Type==8:PPS frame
*/ */
/*
RTF3984 5.2节 Common Structure of the RTP Payload Format
Table 1. Summary of NAL unit types and their payload structures
Type Packet Type name Section
---------------------------------------------------------
0 undefined -
1-23 NAL unit Single NAL unit packet per H.264 5.6
24 STAP-A Single-time aggregation packet 5.7.1
25 STAP-B Single-time aggregation packet 5.7.1
26 MTAP16 Multi-time aggregation packet 5.7.2
27 MTAP24 Multi-time aggregation packet 5.7.2
28 FU-A Fragmentation unit 5.8
29 FU-B Fragmentation unit 5.8
30-31 undefined -
*/
const uint8_t *frame = (uint8_t *) rtppack->data() + rtppack->offset; const uint8_t *frame = (uint8_t *) rtppack->data() + rtppack->offset;
int length = rtppack->size() - rtppack->offset; int length = rtppack->size() - rtppack->offset;
NALU nal; NALU nal;
...@@ -145,7 +162,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -145,7 +162,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
FU fu; FU fu;
MakeFU(frame[1], fu); MakeFU(frame[1], fu);
if (fu.S) { if (fu.S) {
//该帧的第一个rtp包 //该帧的第一个rtp包 FU-A start
char tmp = (nal.forbidden_zero_bit << 7 | nal.nal_ref_idc << 5 | fu.type); char tmp = (nal.forbidden_zero_bit << 7 | nal.nal_ref_idc << 5 | fu.type);
_h264frame->buffer.assign("\x0\x0\x0\x1", 4); _h264frame->buffer.assign("\x0\x0\x0\x1", 4);
_h264frame->buffer.push_back(tmp); _h264frame->buffer.push_back(tmp);
...@@ -164,14 +181,14 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -164,14 +181,14 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
} }
if (!fu.E) { if (!fu.E) {
//该帧的中间rtp包 //该帧的中间rtp包 FU-A mid
_h264frame->buffer.append((char *)frame + 2, length - 2); _h264frame->buffer.append((char *)frame + 2, length - 2);
//该函数return时,保存下当前sequence,以便下次对比seq是否连续 //该函数return时,保存下当前sequence,以便下次对比seq是否连续
_lastSeq = rtppack->sequence; _lastSeq = rtppack->sequence;
return false; return false;
} }
//该帧最后一个rtp包 //该帧最后一个rtp包 FU-A end
_h264frame->buffer.append((char *)frame + 2, length - 2); _h264frame->buffer.append((char *)frame + 2, length - 2);
_h264frame->timeStamp = rtppack->timeStamp; _h264frame->timeStamp = rtppack->timeStamp;
auto key = _h264frame->keyFrame(); auto key = _h264frame->keyFrame();
......
...@@ -36,6 +36,8 @@ namespace mediakit{ ...@@ -36,6 +36,8 @@ namespace mediakit{
/** /**
* h264 rtp解码类 * h264 rtp解码类
* 将 h264 over rtsp-rtp 解复用出 h264-Frame
* rfc3984
*/ */
class H264RtpDecoder : public RtpCodec , public ResourcePoolHelper<H264Frame> { class H264RtpDecoder : public RtpCodec , public ResourcePoolHelper<H264Frame> {
public: public:
......
...@@ -37,6 +37,8 @@ namespace mediakit{ ...@@ -37,6 +37,8 @@ namespace mediakit{
/** /**
* h265 rtp解码类 * h265 rtp解码类
* 将 h265 over rtsp-rtp 解复用出 h265-Frame
* 《草案(H265-over-RTP)draft-ietf-payload-rtp-h265-07.pdf》
*/ */
class H265RtpDecoder : public RtpCodec , public ResourcePoolHelper<H265Frame> { class H265RtpDecoder : public RtpCodec , public ResourcePoolHelper<H265Frame> {
public: public:
......
...@@ -440,6 +440,7 @@ static inline unsigned int showBitsLong(void *pvHandle, int iN) ...@@ -440,6 +440,7 @@ static inline unsigned int showBitsLong(void *pvHandle, int iN)
if (iN <= 32) { if (iN <= 32) {
return showBits(ptPtr, iN); return showBits(ptPtr, iN);
} }
return 0;
} }
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include "Util/mini.h" #include "Util/mini.h"
#include "Util/util.h"
#include "Util/TimeTicker.h" #include "Util/TimeTicker.h"
#include "Network/Socket.h" #include "Network/Socket.h"
#include "Common/Parser.h" #include "Common/Parser.h"
...@@ -47,7 +48,7 @@ class HttpCookieManager; ...@@ -47,7 +48,7 @@ class HttpCookieManager;
/** /**
* cookie对象,用于保存cookie的一些相关属性 * cookie对象,用于保存cookie的一些相关属性
*/ */
class HttpServerCookie : public map<string,string> , public noncopyable{ class HttpServerCookie : public AnyStorage , public noncopyable{
public: public:
typedef std::shared_ptr<HttpServerCookie> Ptr; typedef std::shared_ptr<HttpServerCookie> Ptr;
/** /**
...@@ -108,6 +109,8 @@ public: ...@@ -108,6 +109,8 @@ public:
* @return * @return
*/ */
std::shared_ptr<lock_guard<mutex> > getLock(); std::shared_ptr<lock_guard<mutex> > getLock();
private: private:
string cookieExpireTime() const ; string cookieExpireTime() const ;
private: private:
......
...@@ -211,13 +211,25 @@ inline bool HttpSession::checkWebSocket(){ ...@@ -211,13 +211,25 @@ inline bool HttpSession::checkWebSocket(){
if(!_parser["Sec-WebSocket-Protocol"].empty()){ if(!_parser["Sec-WebSocket-Protocol"].empty()){
headerOut["Sec-WebSocket-Protocol"] = _parser["Sec-WebSocket-Protocol"]; headerOut["Sec-WebSocket-Protocol"] = _parser["Sec-WebSocket-Protocol"];
} }
auto res_cb = [this,headerOut](){
_flv_over_websocket = true;
sendResponse("101 Switching Protocols",headerOut,"");
};
//判断是否为websocket-flv
if(checkLiveFlvStream(res_cb)){
//这里是websocket-flv直播请求
return true;
}
//如果checkLiveFlvStream返回false,则代表不是websocket-flv,而是普通的websocket连接
sendResponse("101 Switching Protocols",headerOut,""); sendResponse("101 Switching Protocols",headerOut,"");
checkLiveFlvStream(true);
return true; return true;
} }
//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 //http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2
//如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。 //如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。
inline bool HttpSession::checkLiveFlvStream(bool over_websocket){ inline bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
auto pos = strrchr(_parser.Url().data(),'.'); auto pos = strrchr(_parser.Url().data(),'.');
if(!pos){ if(!pos){
//未找到".flv"后缀 //未找到".flv"后缀
...@@ -240,7 +252,7 @@ inline bool HttpSession::checkLiveFlvStream(bool over_websocket){ ...@@ -240,7 +252,7 @@ inline bool HttpSession::checkLiveFlvStream(bool over_websocket){
bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt); bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf,bClose,this,over_websocket](const MediaSource::Ptr &src){ MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf,bClose,this,cb](const MediaSource::Ptr &src){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
//本对象已经销毁 //本对象已经销毁
...@@ -249,35 +261,32 @@ inline bool HttpSession::checkLiveFlvStream(bool over_websocket){ ...@@ -249,35 +261,32 @@ inline bool HttpSession::checkLiveFlvStream(bool over_websocket){
auto rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(src); auto rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(src);
if(!rtmp_src){ if(!rtmp_src){
//未找到该流 //未找到该流
if(!over_websocket){
sendNotFound(bClose); sendNotFound(bClose);
}
if(bClose){ if(bClose){
shutdown(SockException(Err_shutdown,"flv stream not found")); shutdown(SockException(Err_shutdown,"flv stream not found"));
} }
return; return;
} }
//找到流了 //找到流了
auto onRes = [this,rtmp_src,over_websocket](const string &err){ auto onRes = [this,rtmp_src,cb](const string &err){
bool authSuccess = err.empty(); bool authSuccess = err.empty();
if(!authSuccess){ if(!authSuccess){
if(!over_websocket){
sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err); sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err);
}
shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return ; return ;
} }
if(!over_websocket) { if(!cb) {
//找到rtmp源,发送http头,负载后续发送 //找到rtmp源,发送http头,负载后续发送
sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), ""); sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), "");
}else{
cb();
} }
//开始发送rtmp负载 //开始发送rtmp负载
//关闭tcp_nodelay ,优化性能 //关闭tcp_nodelay ,优化性能
SockUtil::setNoDelay(_sock->rawFD(),false); SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags); (*this) << SocketFlags(kSockFlags);
_flv_over_websocket = over_websocket;
try{ try{
start(getPoller(),rtmp_src); start(getPoller(),rtmp_src);
}catch (std::exception &ex){ }catch (std::exception &ex){
...@@ -403,7 +412,7 @@ inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const f ...@@ -403,7 +412,7 @@ inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const f
//上次鉴权失败,如果url发生变更,那么也重新鉴权 //上次鉴权失败,如果url发生变更,那么也重新鉴权
if (_parser.Params().empty() || _parser.Params() == cookie->getUid()) { if (_parser.Params().empty() || _parser.Params() == cookie->getUid()) {
//url参数未变,那么判断无权限访问 //url参数未变,那么判断无权限访问
callback(accessErr.empty() ? "无权限访问该目录" : accessErr, nullptr); callback(accessErr.empty() ? "无权限访问该目录" : accessErr.get<string>(), nullptr);
return; return;
} }
} }
...@@ -427,9 +436,9 @@ inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const f ...@@ -427,9 +436,9 @@ inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const f
//对cookie上锁 //对cookie上锁
auto lck = cookie->getLock(); auto lck = cookie->getLock();
//记录用户能访问的路径 //记录用户能访问的路径
(*cookie)[kCookiePathKey] = cookie_path; (*cookie)[kCookiePathKey].set<string>(cookie_path);
//记录能否访问 //记录能否访问
(*cookie)[kAccessErrKey] = errMsg; (*cookie)[kAccessErrKey].set<string>(errMsg);
} }
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
...@@ -480,7 +489,8 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -480,7 +489,8 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) {
} }
//再看看是否为http-flv直播请求 //再看看是否为http-flv直播请求
if(checkLiveFlvStream(false)){ if(checkLiveFlvStream()){
//若是,return!
return; return;
} }
...@@ -520,7 +530,7 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -520,7 +530,7 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) {
} }
auto headerOut = makeHttpHeader(bClose,strMeun.size()); auto headerOut = makeHttpHeader(bClose,strMeun.size());
if(cookie){ if(cookie){
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey]); headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" , headerOut, strMeun); sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" , headerOut, strMeun);
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder"); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder");
...@@ -555,7 +565,7 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -555,7 +565,7 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) {
if(!errMsg.empty()){ if(!errMsg.empty()){
auto headerOut = makeHttpHeader(bClose,errMsg.size()); auto headerOut = makeHttpHeader(bClose,errMsg.size());
if(cookie){ if(cookie){
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey]); headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
sendResponse("401 Unauthorized" , headerOut, errMsg); sendResponse("401 Unauthorized" , headerOut, errMsg);
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed"); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed");
...@@ -954,12 +964,12 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer) { ...@@ -954,12 +964,12 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer) {
header._reserved = 0; header._reserved = 0;
header._opcode = WebSocketHeader::BINARY; header._opcode = WebSocketHeader::BINARY;
header._mask_flag = false; header._mask_flag = false;
WebSocketSplitter::encode(header,(uint8_t *)buffer->data(),buffer->size()); WebSocketSplitter::encode(header,buffer);
} }
void HttpSession::onWebSocketEncodeData(const uint8_t *ptr,uint64_t len){ void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){
_ui64TotalBytes += len; _ui64TotalBytes += buffer->size();
SocketHelper::send((char *)ptr,len); send(buffer);
} }
void HttpSession::onDetach() { void HttpSession::onDetach() {
......
...@@ -104,14 +104,13 @@ protected: ...@@ -104,14 +104,13 @@ protected:
/** /**
* 发送数据进行websocket协议打包后回调 * 发送数据进行websocket协议打包后回调
* @param ptr * @param buffer
* @param len
*/ */
void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override; void onWebSocketEncodeData(const Buffer::Ptr &buffer) override;
private: private:
inline void Handle_Req_GET(int64_t &content_len); inline void Handle_Req_GET(int64_t &content_len);
inline void Handle_Req_POST(int64_t &content_len); inline void Handle_Req_POST(int64_t &content_len);
inline bool checkLiveFlvStream(bool over_websocket = false); inline bool checkLiveFlvStream(const function<void()> &cb = nullptr);
inline bool checkWebSocket(); inline bool checkWebSocket();
inline bool emitHttpEvent(bool doInvoke); inline bool emitHttpEvent(bool doInvoke);
inline void urlDecode(Parser &parser); inline void urlDecode(Parser &parser);
......
...@@ -89,7 +89,7 @@ protected: ...@@ -89,7 +89,7 @@ protected:
header._reserved = 0; header._reserved = 0;
header._opcode = WebSocketHeader::TEXT; header._opcode = WebSocketHeader::TEXT;
header._mask_flag = false; header._mask_flag = false;
strongSelf->WebSocketSplitter::encode(header,(uint8_t *)buf->data(),buf->size()); strongSelf->WebSocketSplitter::encode(header,buf);
} }
return buf->size(); return buf->size();
}); });
...@@ -118,12 +118,12 @@ protected: ...@@ -118,12 +118,12 @@ protected:
switch (header._opcode){ switch (header._opcode){
case WebSocketHeader::CLOSE:{ case WebSocketHeader::CLOSE:{
HttpSessionType::encode(header,nullptr,0); HttpSessionType::encode(header,nullptr);
} }
break; break;
case WebSocketHeader::PING:{ case WebSocketHeader::PING:{
const_cast<WebSocketHeader&>(header)._opcode = WebSocketHeader::PONG; const_cast<WebSocketHeader&>(header)._opcode = WebSocketHeader::PONG;
HttpSessionType::encode(header,(uint8_t *)_remian_data.data(),_remian_data.size()); HttpSessionType::encode(header,std::make_shared<BufferString>(_remian_data));
} }
break; break;
case WebSocketHeader::CONTINUATION:{ case WebSocketHeader::CONTINUATION:{
...@@ -132,8 +132,7 @@ protected: ...@@ -132,8 +132,7 @@ protected:
break; break;
case WebSocketHeader::TEXT: case WebSocketHeader::TEXT:
case WebSocketHeader::BINARY:{ case WebSocketHeader::BINARY:{
BufferString::Ptr buffer = std::make_shared<BufferString>(_remian_data); _session->onRecv(std::make_shared<BufferString>(_remian_data));
_session->onRecv(buffer);
} }
break; break;
default: default:
...@@ -145,11 +144,10 @@ protected: ...@@ -145,11 +144,10 @@ protected:
/** /**
* 发送数据进行websocket协议打包后回调 * 发送数据进行websocket协议打包后回调
* @param ptr * @param buffer
* @param len
*/ */
void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override{ void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
SocketHelper::send((char *)ptr,len); SocketHelper::send(buffer);
} }
private: private:
typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB; typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
......
...@@ -164,9 +164,9 @@ void WebSocketSplitter::onPlayloadData(uint8_t *ptr, uint64_t len) { ...@@ -164,9 +164,9 @@ void WebSocketSplitter::onPlayloadData(uint8_t *ptr, uint64_t len) {
onWebSocketDecodePlayload(*this, _mask_flag ? ptr - len : ptr, len, _playload_offset); onWebSocketDecodePlayload(*this, _mask_flag ? ptr - len : ptr, len, _playload_offset);
} }
void WebSocketSplitter::encode(const WebSocketHeader &header,uint8_t *data, const uint64_t len) { void WebSocketSplitter::encode(const WebSocketHeader &header,const Buffer::Ptr &buffer) {
string ret; string ret;
uint64_t len = buffer ? buffer->size() : 0;
uint8_t byte = header._fin << 7 | ((header._reserved & 0x07) << 4) | (header._opcode & 0x0F) ; uint8_t byte = header._fin << 7 | ((header._reserved & 0x07) << 4) | (header._opcode & 0x0F) ;
ret.push_back(byte); ret.push_back(byte);
...@@ -195,16 +195,16 @@ void WebSocketSplitter::encode(const WebSocketHeader &header,uint8_t *data, cons ...@@ -195,16 +195,16 @@ void WebSocketSplitter::encode(const WebSocketHeader &header,uint8_t *data, cons
ret.append((char *)header._mask.data(),4); ret.append((char *)header._mask.data(),4);
} }
onWebSocketEncodeData((uint8_t*)ret.data(),ret.size()); onWebSocketEncodeData(std::make_shared<BufferString>(std::move(ret)));
if(len > 0){ if(len > 0){
if(mask_flag){ if(mask_flag){
uint8_t *ptr = data; uint8_t *ptr = (uint8_t*)buffer->data();
for(int i = 0; i < len ; ++i,++ptr){ for(int i = 0; i < len ; ++i,++ptr){
*(ptr) ^= header._mask[i % 4]; *(ptr) ^= header._mask[i % 4];
} }
} }
onWebSocketEncodeData(data,len); onWebSocketEncodeData(buffer);
} }
} }
......
...@@ -31,8 +31,10 @@ ...@@ -31,8 +31,10 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <memory> #include <memory>
using namespace std; #include "Network/Buffer.h"
using namespace std;
using namespace toolkit;
namespace mediakit { namespace mediakit {
...@@ -85,12 +87,10 @@ public: ...@@ -85,12 +87,10 @@ public:
/** /**
* 编码一个数据包 * 编码一个数据包
* 将触发2次onWebSocketEncodeData回调 * 将触发2次onWebSocketEncodeData回调
* 第一次是数据头,第二次是负载数据
* @param header 数据头 * @param header 数据头
* @param data 负载数据 * @param buffer 负载数据
* @param len 负载数据长度
*/ */
void encode(const WebSocketHeader &header,uint8_t *data,const uint64_t len); void encode(const WebSocketHeader &header,const Buffer::Ptr &buffer);
protected: protected:
/** /**
* 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调 * 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调
...@@ -119,7 +119,7 @@ protected: ...@@ -119,7 +119,7 @@ protected:
* @param ptr 数据指针 * @param ptr 数据指针
* @param len 数据指针长度 * @param len 数据指针长度
*/ */
virtual void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len){}; virtual void onWebSocketEncodeData(const Buffer::Ptr &buffer){};
private: private:
void onPlayloadData(uint8_t *data,uint64_t len); void onPlayloadData(uint8_t *data,uint64_t len);
private: private:
......
...@@ -135,9 +135,7 @@ void MP4Muxer::onTrackReady(const Track::Ptr &track) { ...@@ -135,9 +135,7 @@ void MP4Muxer::onTrackReady(const Track::Ptr &track) {
WarnL << "添加AAC Track失败:" << track_id; WarnL << "添加AAC Track失败:" << track_id;
return; return;
} }
track_info info; _codec_to_trackid[track->getCodecId()].track_id = track_id;
info.track_id = track_id;
_codec_to_trackid[track->getCodecId()] = info;
} }
break; break;
case CodecH264: { case CodecH264: {
...@@ -170,9 +168,7 @@ void MP4Muxer::onTrackReady(const Track::Ptr &track) { ...@@ -170,9 +168,7 @@ void MP4Muxer::onTrackReady(const Track::Ptr &track) {
WarnL << "添加H264 Track失败:" << track_id; WarnL << "添加H264 Track失败:" << track_id;
return; return;
} }
track_info info; _codec_to_trackid[track->getCodecId()].track_id = track_id;
info.track_id = track_id;
_codec_to_trackid[track->getCodecId()] = info;
} }
break; break;
case CodecH265: { case CodecH265: {
...@@ -205,9 +201,7 @@ void MP4Muxer::onTrackReady(const Track::Ptr &track) { ...@@ -205,9 +201,7 @@ void MP4Muxer::onTrackReady(const Track::Ptr &track) {
WarnL << "添加H265 Track失败:" << track_id; WarnL << "添加H265 Track失败:" << track_id;
return; return;
} }
track_info info; _codec_to_trackid[track->getCodecId()].track_id = track_id;
info.track_id = track_id;
_codec_to_trackid[track->getCodecId()] = info;
} }
break; break;
default: default:
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include <ctime> #include <ctime>
#include <sys/stat.h> #include <sys/stat.h>
#include "Common/config.h" #include "Common/config.h"
#include "Mp4Maker.h" #include "MP4Recorder.h"
#include "Util/util.h" #include "Util/util.h"
#include "Util/NoticeCenter.h" #include "Util/NoticeCenter.h"
#include "Thread/WorkThreadPool.h" #include "Thread/WorkThreadPool.h"
...@@ -53,7 +53,7 @@ string timeStr(const char *fmt) { ...@@ -53,7 +53,7 @@ string timeStr(const char *fmt) {
return buffer; return buffer;
} }
Mp4Maker::Mp4Maker(const string& strPath, MP4Recorder::MP4Recorder(const string& strPath,
const string &strVhost, const string &strVhost,
const string &strApp, const string &strApp,
const string &strStreamId) { const string &strStreamId) {
...@@ -64,11 +64,11 @@ Mp4Maker::Mp4Maker(const string& strPath, ...@@ -64,11 +64,11 @@ Mp4Maker::Mp4Maker(const string& strPath,
_info.strVhost = strVhost; _info.strVhost = strVhost;
_info.strFolder = strPath; _info.strFolder = strPath;
} }
Mp4Maker::~Mp4Maker() { MP4Recorder::~MP4Recorder() {
closeFile(); closeFile();
} }
void Mp4Maker::createFile() { void MP4Recorder::createFile() {
closeFile(); closeFile();
auto strDate = timeStr("%Y-%m-%d"); auto strDate = timeStr("%Y-%m-%d");
auto strTime = timeStr("%H-%M-%S"); auto strTime = timeStr("%H-%M-%S");
...@@ -100,7 +100,7 @@ void Mp4Maker::createFile() { ...@@ -100,7 +100,7 @@ void Mp4Maker::createFile() {
} }
} }
void Mp4Maker::asyncClose() { void MP4Recorder::asyncClose() {
auto muxer = _muxer; auto muxer = _muxer;
auto strFileTmp = _strFileTmp; auto strFileTmp = _strFileTmp;
auto strFile = _strFile; auto strFile = _strFile;
...@@ -121,14 +121,14 @@ void Mp4Maker::asyncClose() { ...@@ -121,14 +121,14 @@ void Mp4Maker::asyncClose() {
}); });
} }
void Mp4Maker::closeFile() { void MP4Recorder::closeFile() {
if (_muxer) { if (_muxer) {
asyncClose(); asyncClose();
_muxer = nullptr; _muxer = nullptr;
} }
} }
void Mp4Maker::onTrackFrame(const Frame::Ptr &frame) { void MP4Recorder::onTrackFrame(const Frame::Ptr &frame) {
GET_CONFIG(uint32_t,recordSec,Record::kFileSecond); GET_CONFIG(uint32_t,recordSec,Record::kFileSecond);
if(!_muxer || ((_createFileTicker.elapsedTime() > recordSec * 1000) && if(!_muxer || ((_createFileTicker.elapsedTime() > recordSec * 1000) &&
(!_haveVideo || (_haveVideo && frame->keyFrame()))) ){ (!_haveVideo || (_haveVideo && frame->keyFrame()))) ){
...@@ -145,7 +145,7 @@ void Mp4Maker::onTrackFrame(const Frame::Ptr &frame) { ...@@ -145,7 +145,7 @@ void Mp4Maker::onTrackFrame(const Frame::Ptr &frame) {
} }
} }
void Mp4Maker::onTrackReady(const Track::Ptr & track){ void MP4Recorder::onTrackReady(const Track::Ptr & track){
//保存所有的track,为创建MP4MuxerFile做准备 //保存所有的track,为创建MP4MuxerFile做准备
_tracks.emplace_back(track); _tracks.emplace_back(track);
if(track->getTrackType() == TrackVideo){ if(track->getTrackType() == TrackVideo){
......
...@@ -55,14 +55,14 @@ public: ...@@ -55,14 +55,14 @@ public:
string strStreamId;//流ID string strStreamId;//流ID
string strVhost;//vhost string strVhost;//vhost
}; };
class Mp4Maker : public MediaSink{ class MP4Recorder : public MediaSink{
public: public:
typedef std::shared_ptr<Mp4Maker> Ptr; typedef std::shared_ptr<MP4Recorder> Ptr;
Mp4Maker(const string &strPath, MP4Recorder(const string &strPath,
const string &strVhost , const string &strVhost ,
const string &strApp, const string &strApp,
const string &strStreamId); const string &strStreamId);
virtual ~Mp4Maker(); virtual ~MP4Recorder();
private: private:
/** /**
* 某Track输出frame,在onAllTrackReady触发后才会调用此方法 * 某Track输出frame,在onAllTrackReady触发后才会调用此方法
......
...@@ -58,15 +58,15 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp, ...@@ -58,15 +58,15 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp,
string m3u8FilePath; string m3u8FilePath;
if(enableVhost){ if(enableVhost){
m3u8FilePath = hlsPath + "/" + strVhost + "/" + strApp + "/" + strId + "/hls.m3u8"; m3u8FilePath = hlsPath + "/" + strVhost + "/" + strApp + "/" + strId + "/hls.m3u8";
_hlsMaker.reset(new HlsRecorder(m3u8FilePath,string(VHOST_KEY) + "=" + strVhost ,hlsBufSize, hlsDuration, hlsNum)); _hlsRecorder.reset(new HlsRecorder(m3u8FilePath,string(VHOST_KEY) + "=" + strVhost ,hlsBufSize, hlsDuration, hlsNum));
}else{ }else{
m3u8FilePath = hlsPath + "/" + strApp + "/" + strId + "/hls.m3u8"; m3u8FilePath = hlsPath + "/" + strApp + "/" + strId + "/hls.m3u8";
_hlsMaker.reset(new HlsRecorder(m3u8FilePath,"",hlsBufSize, hlsDuration, hlsNum)); _hlsRecorder.reset(new HlsRecorder(m3u8FilePath,"",hlsBufSize, hlsDuration, hlsNum));
} }
} }
#endif //defined(ENABLE_HLS) #endif //defined(ENABLE_HLS)
#if defined(ENABLE_MP4V2) #if defined(ENABLE_MP4RECORD)
GET_CONFIG(string,recordPath,Record::kFilePath); GET_CONFIG(string,recordPath,Record::kFilePath);
GET_CONFIG(string,recordAppName,Record::kAppName); GET_CONFIG(string,recordAppName,Record::kAppName);
...@@ -77,9 +77,9 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp, ...@@ -77,9 +77,9 @@ MediaRecorder::MediaRecorder(const string &strVhost_tmp,
} else { } else {
mp4FilePath = recordPath + "/" + recordAppName + "/" + strApp + "/" + strId + "/"; mp4FilePath = recordPath + "/" + recordAppName + "/" + strApp + "/" + strId + "/";
} }
_mp4Maker.reset(new Mp4Maker(mp4FilePath,strVhost,strApp,strId)); _mp4Recorder.reset(new MP4Recorder(mp4FilePath,strVhost,strApp,strId));
} }
#endif //defined(ENABLE_MP4V2) #endif //defined(ENABLE_MP4RECORD)
} }
MediaRecorder::~MediaRecorder() { MediaRecorder::~MediaRecorder() {
...@@ -87,28 +87,28 @@ MediaRecorder::~MediaRecorder() { ...@@ -87,28 +87,28 @@ MediaRecorder::~MediaRecorder() {
void MediaRecorder::inputFrame(const Frame::Ptr &frame) { void MediaRecorder::inputFrame(const Frame::Ptr &frame) {
#if defined(ENABLE_HLS) #if defined(ENABLE_HLS)
if (_hlsMaker) { if (_hlsRecorder) {
_hlsMaker->inputFrame(frame); _hlsRecorder->inputFrame(frame);
} }
#endif //defined(ENABLE_HLS) #endif //defined(ENABLE_HLS)
#if defined(ENABLE_MP4V2) #if defined(ENABLE_MP4RECORD)
if (_mp4Maker) { if (_mp4Recorder) {
_mp4Maker->inputFrame(frame); _mp4Recorder->inputFrame(frame);
} }
#endif //defined(ENABLE_MP4V2) #endif //defined(ENABLE_MP4RECORD)
} }
void MediaRecorder::addTrack(const Track::Ptr &track) { void MediaRecorder::addTrack(const Track::Ptr &track) {
#if defined(ENABLE_HLS) #if defined(ENABLE_HLS)
if (_hlsMaker) { if (_hlsRecorder) {
_hlsMaker->addTrack(track); _hlsRecorder->addTrack(track);
} }
#endif //defined(ENABLE_HLS) #endif //defined(ENABLE_HLS)
#if defined(ENABLE_MP4RECORD) #if defined(ENABLE_MP4RECORD)
if (_mp4Maker) { if (_mp4Recorder) {
_mp4Maker->addTrack(track); _mp4Recorder->addTrack(track);
} }
#endif //defined(ENABLE_MP4RECORD) #endif //defined(ENABLE_MP4RECORD)
} }
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
#include <memory> #include <memory>
#include "Player/PlayerBase.h" #include "Player/PlayerBase.h"
#include "Common/MediaSink.h" #include "Common/MediaSink.h"
#include "Mp4Maker.h" #include "MP4Recorder.h"
#include "HlsRecorder.h" #include "HlsRecorder.h"
using namespace toolkit; using namespace toolkit;
...@@ -61,11 +61,11 @@ public: ...@@ -61,11 +61,11 @@ public:
void addTrack(const Track::Ptr & track) override; void addTrack(const Track::Ptr & track) override;
private: private:
#if defined(ENABLE_HLS) #if defined(ENABLE_HLS)
std::shared_ptr<HlsRecorder> _hlsMaker; std::shared_ptr<HlsRecorder> _hlsRecorder;
#endif //defined(ENABLE_HLS) #endif //defined(ENABLE_HLS)
#if defined(ENABLE_MP4RECORD) #if defined(ENABLE_MP4RECORD)
std::shared_ptr<Mp4Maker> _mp4Maker; std::shared_ptr<MP4Recorder> _mp4Recorder;
#endif //defined(ENABLE_MP4RECORD) #endif //defined(ENABLE_MP4RECORD)
}; };
......
...@@ -29,16 +29,22 @@ ...@@ -29,16 +29,22 @@
namespace mediakit { namespace mediakit {
void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out) { void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out) {
if(!pts){
//没有播放时间戳,使其赋值为解码时间戳
pts = dts;
}
//pts和dts的差值
int pts_dts_diff = pts - dts;
if(_first){ if(_first){
//记录第一次时间戳,后面好计算时间戳增量 //记录第一次时间戳,后面好计算时间戳增量
_start_dts = dts; _start_dts = dts;
_first = false; _first = false;
_ticker = std::make_shared<SmoothTicker>(); _ticker.resetTime();
} }
//pts和dts的差值 if (!dts) {
int pts_dts_diff = pts - dts; //没有解码时间戳,我们生成解码时间戳
if(_modifyStamp){ dts = _ticker.elapsedTime();
dts = _ticker->elapsedTime();
} }
//相对时间戳 //相对时间戳
...@@ -60,11 +66,6 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou ...@@ -60,11 +66,6 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou
_dts_inc = dts_out; _dts_inc = dts_out;
//////////////以下是播放时间戳的计算////////////////// //////////////以下是播放时间戳的计算//////////////////
if(!pts){
//没有播放时间戳
pts = dts;
}
if(pts_dts_diff > 200 || pts_dts_diff < -200){ if(pts_dts_diff > 200 || pts_dts_diff < -200){
//如果差值大于200毫秒,则认为由于回环导致时间戳错乱了 //如果差值大于200毫秒,则认为由于回环导致时间戳错乱了
pts_dts_diff = 0; pts_dts_diff = 0;
......
...@@ -33,17 +33,18 @@ using namespace toolkit; ...@@ -33,17 +33,18 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
//该类解决时间戳回环、回退问题
//计算相对时间戳或者产生平滑时间戳
class Stamp { class Stamp {
public: public:
Stamp(bool modifyStamp = false) {_modifyStamp = modifyStamp;}; Stamp() = default;
~Stamp() = default; ~Stamp() = default;
void revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out); void revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out);
private: private:
int64_t _start_dts = 0; int64_t _start_dts = 0;
int64_t _dts_inc = 0; int64_t _dts_inc = 0;
bool _first = true; bool _first = true;
bool _modifyStamp; SmoothTicker _ticker;
std::shared_ptr<SmoothTicker> _ticker;
}; };
}//namespace mediakit }//namespace mediakit
......
...@@ -42,19 +42,13 @@ TsMuxer::~TsMuxer() { ...@@ -42,19 +42,13 @@ TsMuxer::~TsMuxer() {
void TsMuxer::addTrack(const Track::Ptr &track) { void TsMuxer::addTrack(const Track::Ptr &track) {
switch (track->getCodecId()){ switch (track->getCodecId()){
case CodecH264: { case CodecH264: {
track_info info; _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H264, nullptr, 0);
info.track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H264, nullptr, 0);
_codec_to_trackid[track->getCodecId()] = info;
} break; } break;
case CodecH265: { case CodecH265: {
track_info info; _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H265, nullptr, 0);
info.track_id = mpeg_ts_add_stream(_context, PSI_STREAM_H265, nullptr, 0);
_codec_to_trackid[track->getCodecId()] = info;
}break; }break;
case CodecAAC: { case CodecAAC: {
track_info info; _codec_to_trackid[track->getCodecId()].track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AAC, nullptr, 0);
info.track_id = mpeg_ts_add_stream(_context, PSI_STREAM_AAC, nullptr, 0);
_codec_to_trackid[track->getCodecId()] = info;
}break; }break;
default: default:
break; break;
...@@ -73,38 +67,28 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { ...@@ -73,38 +67,28 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) {
switch (frame->getCodecId()){ switch (frame->getCodecId()){
case CodecH265: case CodecH265:
case CodecH264: { case CodecH264: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理,
Buffer::Ptr merged_frame ; if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
if(frame->configFrame()){ Frame::Ptr back = _frameCached.back();
//配置帧,缓存后直接返回,以便下次输入关键帧时使用 Buffer::Ptr merged_frame = back;
_config_frame_cache.append("\x00\x00\x00\x01",4); if(_frameCached.size() != 1){
_config_frame_cache.append(frame->data() + frame->prefixSize(),frame->size() - frame->prefixSize()); string merged;
break; _frameCached.for_each([&](const Frame::Ptr &frame){
} if(frame->prefixSize()){
merged.append(frame->data(),frame->size());
if(frame->keyFrame()){ } else{
//关键帧 merged.append("\x00\x00\x00\x01",4);
if(!_config_frame_cache.empty()){ merged.append(frame->data(),frame->size());
//有配置帧,那么配置帧合并关键帧后输入ts打包
_config_frame_cache.append("\x00\x00\x00\x01",4);
_config_frame_cache.append(frame->data() + frame->prefixSize(),frame->size() - frame->prefixSize());
merged_frame = std::make_shared<BufferString>(std::move(_config_frame_cache));
_config_frame_cache.clear();
}else{
//这是非第一个的关键帧(h265有多种关键帧)
merged_frame = frame;
} }
}else{ });
//这里是普通帧,例如B/P, merged_frame = std::make_shared<BufferString>(std::move(merged));
merged_frame = frame;
//sps、pps这些配置帧清空掉
_config_frame_cache.clear();
} }
track_info.stamp.revise(back->dts(),back->pts(),dts_out,pts_out);
//输入到ts文件
track_info.stamp.revise(frame->dts(),frame->pts(),dts_out,pts_out);
_timestamp = dts_out; _timestamp = dts_out;
mpeg_ts_write(_context, track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL, dts_out * 90LL, merged_frame->data(), merged_frame->size()); mpeg_ts_write(_context, track_info.track_id, back->keyFrame() ? 0x0001 : 0, pts_out * 90LL, dts_out * 90LL, merged_frame->data(), merged_frame->size());
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
} }
break; break;
default: { default: {
......
...@@ -38,12 +38,12 @@ using namespace toolkit; ...@@ -38,12 +38,12 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
class TsMuxer { class TsMuxer : public MediaSink {
public: public:
TsMuxer(); TsMuxer();
virtual ~TsMuxer(); virtual ~TsMuxer();
void addTrack(const Track::Ptr &track) ; void addTrack(const Track::Ptr &track) override;
void inputFrame(const Frame::Ptr &frame) ; void inputFrame(const Frame::Ptr &frame) override;
protected: protected:
virtual void onTs(const void *packet, int bytes,uint32_t timestamp,int flags) = 0; virtual void onTs(const void *packet, int bytes,uint32_t timestamp,int flags) = 0;
void resetTracks(); void resetTracks();
...@@ -60,7 +60,7 @@ private: ...@@ -60,7 +60,7 @@ private:
Stamp stamp; Stamp stamp;
}; };
unordered_map<int,track_info> _codec_to_trackid; unordered_map<int,track_info> _codec_to_trackid;
string _config_frame_cache; List<Frame::Ptr> _frameCached;
}; };
}//namespace mediakit }//namespace mediakit
......
...@@ -76,8 +76,6 @@ void FlvMuxer::start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr & ...@@ -76,8 +76,6 @@ void FlvMuxer::start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &
} }
void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) { void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) {
CLEAR_ARR(_aui32FirstStamp);
//发送flv文件头 //发送flv文件头
char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video
bool is_have_audio = false,is_have_video = false; bool is_have_audio = false,is_have_video = false;
...@@ -158,20 +156,9 @@ void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_ ...@@ -158,20 +156,9 @@ void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_
} }
void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt) { void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt) {
auto modifiedStamp = pkt->timeStamp; int64_t dts_out;
auto &firstStamp = _aui32FirstStamp[pkt->typeId % 2]; _stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out);
if(!firstStamp){ onWriteFlvTag(pkt, dts_out);
firstStamp = modifiedStamp;
}
if(modifiedStamp >= firstStamp){
//计算时间戳增量
modifiedStamp -= firstStamp;
}else{
//发生回环,重新计算时间戳增量
CLEAR_ARR(_aui32FirstStamp);
modifiedStamp = 0;
}
onWriteFlvTag(pkt, modifiedStamp);
} }
void FlvMuxer::stop() { void FlvMuxer::stop() {
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "Rtmp/Rtmp.h" #include "Rtmp/Rtmp.h"
#include "Rtmp/RtmpMediaSource.h" #include "Rtmp/RtmpMediaSource.h"
#include "Network/Socket.h" #include "Network/Socket.h"
#include "MediaFile/Stamp.h"
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
...@@ -52,7 +53,8 @@ private: ...@@ -52,7 +53,8 @@ private:
void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp); void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp);
private: private:
RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; RtmpMediaSource::RingType::RingReader::Ptr _ring_reader;
uint32_t _aui32FirstStamp[2] = {0}; //时间戳修整器
Stamp _stamp[2];
}; };
......
...@@ -44,7 +44,7 @@ using namespace toolkit; ...@@ -44,7 +44,7 @@ using namespace toolkit;
using namespace mediakit::Client; using namespace mediakit::Client;
namespace mediakit { namespace mediakit {
//实现了rtmp播放器协议部分的功能,及数据接收功能
class RtmpPlayer:public PlayerBase, public TcpClient, public RtmpProtocol{ class RtmpPlayer:public PlayerBase, public TcpClient, public RtmpProtocol{
public: public:
typedef std::shared_ptr<RtmpPlayer> Ptr; typedef std::shared_ptr<RtmpPlayer> Ptr;
...@@ -63,11 +63,11 @@ protected: ...@@ -63,11 +63,11 @@ protected:
void onMediaData_l(const RtmpPacket::Ptr &chunkData); void onMediaData_l(const RtmpPacket::Ptr &chunkData);
void onPlayResult_l(const SockException &ex); void onPlayResult_l(const SockException &ex);
//for Tcpclient //form Tcpclient
void onRecv(const Buffer::Ptr &pBuf) override; void onRecv(const Buffer::Ptr &pBuf) override;
void onConnect(const SockException &err) override; void onConnect(const SockException &err) override;
void onErr(const SockException &ex) override; void onErr(const SockException &ex) override;
//fro RtmpProtocol //from RtmpProtocol
void onRtmpChunk(RtmpPacket &chunkData) override; void onRtmpChunk(RtmpPacket &chunkData) override;
void onStreamDry(uint32_t ui32StreamId) override; void onStreamDry(uint32_t ui32StreamId) override;
void onSendRawData(const Buffer::Ptr &buffer) override{ void onSendRawData(const Buffer::Ptr &buffer) override{
......
...@@ -532,10 +532,6 @@ void RtmpProtocol::handle_rtmp() { ...@@ -532,10 +532,6 @@ void RtmpProtocol::handle_rtmp() {
static const size_t HEADER_LENGTH[] = { 12, 8, 4, 1 }; static const size_t HEADER_LENGTH[] = { 12, 8, 4, 1 };
size_t iHeaderLen = HEADER_LENGTH[flags >> 6]; size_t iHeaderLen = HEADER_LENGTH[flags >> 6];
_iNowChunkID = flags & 0x3f; _iNowChunkID = flags & 0x3f;
if(_iNowChunkID >10){
int i=0;
i++;
}
switch (_iNowChunkID) { switch (_iNowChunkID) {
case 0: { case 0: {
//0 值表示二字节形式,并且 ID 范围 64 - 319 //0 值表示二字节形式,并且 ID 范围 64 - 319
......
...@@ -63,7 +63,7 @@ void RtmpSession::onError(const SockException& err) { ...@@ -63,7 +63,7 @@ void RtmpSession::onError(const SockException& err) {
} }
void RtmpSession::onManager() { void RtmpSession::onManager() {
GET_CONFIG(uint32_t,handshake_sec,Rtmp::kKeepAliveSecond); GET_CONFIG(uint32_t,handshake_sec,Rtmp::kHandshakeSecond);
GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);
if (_ticker.createdTime() > handshake_sec * 1000) { if (_ticker.createdTime() > handshake_sec * 1000) {
...@@ -397,20 +397,20 @@ void RtmpSession::setMetaData(AMFDecoder &dec) { ...@@ -397,20 +397,20 @@ void RtmpSession::setMetaData(AMFDecoder &dec) {
void RtmpSession::onProcessCmd(AMFDecoder &dec) { void RtmpSession::onProcessCmd(AMFDecoder &dec) {
typedef void (RtmpSession::*rtmpCMDHandle)(AMFDecoder &dec); typedef void (RtmpSession::*rtmpCMDHandle)(AMFDecoder &dec);
static unordered_map<string, rtmpCMDHandle> g_mapCmd; static unordered_map<string, rtmpCMDHandle> s_cmd_functions;
static onceToken token([]() { static onceToken token([]() {
g_mapCmd.emplace("connect",&RtmpSession::onCmd_connect); s_cmd_functions.emplace("connect",&RtmpSession::onCmd_connect);
g_mapCmd.emplace("createStream",&RtmpSession::onCmd_createStream); s_cmd_functions.emplace("createStream",&RtmpSession::onCmd_createStream);
g_mapCmd.emplace("publish",&RtmpSession::onCmd_publish); s_cmd_functions.emplace("publish",&RtmpSession::onCmd_publish);
g_mapCmd.emplace("deleteStream",&RtmpSession::onCmd_deleteStream); s_cmd_functions.emplace("deleteStream",&RtmpSession::onCmd_deleteStream);
g_mapCmd.emplace("play",&RtmpSession::onCmd_play); s_cmd_functions.emplace("play",&RtmpSession::onCmd_play);
g_mapCmd.emplace("play2",&RtmpSession::onCmd_play2); s_cmd_functions.emplace("play2",&RtmpSession::onCmd_play2);
g_mapCmd.emplace("seek",&RtmpSession::onCmd_seek); s_cmd_functions.emplace("seek",&RtmpSession::onCmd_seek);
g_mapCmd.emplace("pause",&RtmpSession::onCmd_pause);}, []() {}); s_cmd_functions.emplace("pause",&RtmpSession::onCmd_pause);}, []() {});
std::string method = dec.load<std::string>(); std::string method = dec.load<std::string>();
auto it = g_mapCmd.find(method); auto it = s_cmd_functions.find(method);
if (it == g_mapCmd.end()) { if (it == s_cmd_functions.end()) {
TraceP(this) << "can not support cmd:" << method; TraceP(this) << "can not support cmd:" << method;
return; return;
} }
...@@ -445,7 +445,9 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { ...@@ -445,7 +445,9 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
} }
GET_CONFIG(bool,rtmp_modify_stamp,Rtmp::kModifyStamp); GET_CONFIG(bool,rtmp_modify_stamp,Rtmp::kModifyStamp);
if(rtmp_modify_stamp){ if(rtmp_modify_stamp){
chunkData.timeStamp = _stampTicker[chunkData.typeId % 2].elapsedTime(); int64_t dts_out;
_stamp[chunkData.typeId % 2].revise(0, 0, dts_out, dts_out);
chunkData.timeStamp = dts_out;
} }
_pPublisherSrc->onWrite(std::make_shared<RtmpPacket>(std::move(chunkData))); _pPublisherSrc->onWrite(std::make_shared<RtmpPacket>(std::move(chunkData)));
} }
...@@ -473,20 +475,10 @@ void RtmpSession::onCmd_seek(AMFDecoder &dec) { ...@@ -473,20 +475,10 @@ void RtmpSession::onCmd_seek(AMFDecoder &dec) {
} }
void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
auto modifiedStamp = pkt->timeStamp; //rtmp播放器时间戳从零开始
auto &firstStamp = _aui32FirstStamp[pkt->typeId % 2]; int64_t dts_out;
if(!firstStamp){ _stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out);
firstStamp = modifiedStamp; sendRtmp(pkt->typeId, pkt->streamId, pkt, dts_out, pkt->chunkId);
}
if(modifiedStamp >= firstStamp){
//计算时间戳增量
modifiedStamp -= firstStamp;
}else{
//发生回环,重新计算时间戳增量
CLEAR_ARR(_aui32FirstStamp);
modifiedStamp = 0;
}
sendRtmp(pkt->typeId, pkt->streamId, pkt, modifiedStamp, pkt->chunkId);
} }
......
...@@ -37,6 +37,8 @@ ...@@ -37,6 +37,8 @@
#include "Util/util.h" #include "Util/util.h"
#include "Util/TimeTicker.h" #include "Util/TimeTicker.h"
#include "Network/TcpSession.h" #include "Network/TcpSession.h"
#include "MediaFile/Stamp.h"
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
...@@ -88,11 +90,11 @@ private: ...@@ -88,11 +90,11 @@ private:
MediaInfo _mediaInfo; MediaInfo _mediaInfo;
double _dNowReqID = 0; double _dNowReqID = 0;
Ticker _ticker;//数据接收时间 Ticker _ticker;//数据接收时间
SmoothTicker _stampTicker[2];//时间戳生产器
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader; RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader;
std::shared_ptr<RtmpMediaSource> _pPublisherSrc; std::shared_ptr<RtmpMediaSource> _pPublisherSrc;
std::weak_ptr<RtmpMediaSource> _pPlayerSrc; std::weak_ptr<RtmpMediaSource> _pPlayerSrc;
uint32_t _aui32FirstStamp[2] = {0}; //时间戳修整器
Stamp _stamp[2];
//消耗的总流量 //消耗的总流量
uint64_t _ui64TotalBytes = 0; uint64_t _ui64TotalBytes = 0;
......
...@@ -96,6 +96,17 @@ public: ...@@ -96,6 +96,17 @@ public:
int readerCount() override { int readerCount() override {
return RtmpMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); return RtmpMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0);
} }
/**
* 获取track
* @return
*/
vector<Track::Ptr> getTracks(bool trackReady) const override {
if(!_demuxer){
return this->RtmpMediaSource::getTracks(trackReady);
}
return _demuxer->getTracks(trackReady);
}
private: private:
RtmpDemuxer::Ptr _demuxer; RtmpDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include <vector> #include <vector>
#include <unordered_map> #include <unordered_map>
#include <map> #include <map>
#include <stdexcept>
enum AMFType { enum AMFType {
AMF_NUMBER, AMF_NUMBER,
AMF_INTEGER, AMF_INTEGER,
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#include <list> #include <list>
#include <type_traits> #include <type_traits>
#include "RtpBroadCaster.h" #include "RtpMultiCaster.h"
#include "Util/util.h" #include "Util/util.h"
#include "Network/sockutil.h" #include "Network/sockutil.h"
#include "RtspSession.h" #include "RtspSession.h"
...@@ -81,10 +81,10 @@ void MultiCastAddressMaker::release(uint32_t iAddr){ ...@@ -81,10 +81,10 @@ void MultiCastAddressMaker::release(uint32_t iAddr){
} }
recursive_mutex RtpBroadCaster::g_mtx; recursive_mutex RtpMultiCaster::g_mtx;
unordered_map<string, weak_ptr<RtpBroadCaster> > RtpBroadCaster::g_mapBroadCaster; unordered_map<string, weak_ptr<RtpMultiCaster> > RtpMultiCaster::g_mapBroadCaster;
void RtpBroadCaster::setDetachCB(void* listener, const onDetach& cb) { void RtpMultiCaster::setDetachCB(void* listener, const onDetach& cb) {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
if(cb){ if(cb){
_mapDetach.emplace(listener,cb); _mapDetach.emplace(listener,cb);
...@@ -92,12 +92,12 @@ void RtpBroadCaster::setDetachCB(void* listener, const onDetach& cb) { ...@@ -92,12 +92,12 @@ void RtpBroadCaster::setDetachCB(void* listener, const onDetach& cb) {
_mapDetach.erase(listener); _mapDetach.erase(listener);
} }
} }
RtpBroadCaster::~RtpBroadCaster() { RtpMultiCaster::~RtpMultiCaster() {
_pReader->setReadCB(nullptr); _pReader->setReadCB(nullptr);
_pReader->setDetachCB(nullptr); _pReader->setDetachCB(nullptr);
DebugL; DebugL;
} }
RtpBroadCaster::RtpBroadCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) { RtpMultiCaster::RtpMultiCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA,strVhost,strApp, strStream)); auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA,strVhost,strApp, strStream));
if(!src){ if(!src){
auto strErr = StrPrinter << "未找到媒体源:" << strVhost << " " << strApp << " " << strStream << endl; auto strErr = StrPrinter << "未找到媒体源:" << strVhost << " " << strApp << " " << strStream << endl;
...@@ -148,22 +148,22 @@ RtpBroadCaster::RtpBroadCaster(const EventPoller::Ptr &poller,const string &strL ...@@ -148,22 +148,22 @@ RtpBroadCaster::RtpBroadCaster(const EventPoller::Ptr &poller,const string &strL
<< strVhost << " " << strVhost << " "
<< strApp << " " << strStream; << strApp << " " << strStream;
} }
uint16_t RtpBroadCaster::getPort(TrackType trackType){ uint16_t RtpMultiCaster::getPort(TrackType trackType){
return _apUdpSock[trackType]->get_local_port(); return _apUdpSock[trackType]->get_local_port();
} }
string RtpBroadCaster::getIP(){ string RtpMultiCaster::getIP(){
return inet_ntoa(_aPeerUdpAddr[0].sin_addr); return inet_ntoa(_aPeerUdpAddr[0].sin_addr);
} }
RtpBroadCaster::Ptr RtpBroadCaster::make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream){ RtpMultiCaster::Ptr RtpMultiCaster::make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream){
try{ try{
auto ret = Ptr(new RtpBroadCaster(poller,strLocalIp,strVhost,strApp,strStream),[poller](RtpBroadCaster *ptr){ auto ret = Ptr(new RtpMultiCaster(poller,strLocalIp,strVhost,strApp,strStream),[poller](RtpMultiCaster *ptr){
poller->async([ptr]() { poller->async([ptr]() {
delete ptr; delete ptr;
}); });
}); });
lock_guard<recursive_mutex> lck(g_mtx); lock_guard<recursive_mutex> lck(g_mtx);
string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl; string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl;
weak_ptr<RtpBroadCaster> weakPtr = ret; weak_ptr<RtpMultiCaster> weakPtr = ret;
g_mapBroadCaster.emplace(strKey,weakPtr); g_mapBroadCaster.emplace(strKey,weakPtr);
return ret; return ret;
}catch (std::exception &ex) { }catch (std::exception &ex) {
...@@ -172,7 +172,7 @@ RtpBroadCaster::Ptr RtpBroadCaster::make(const EventPoller::Ptr &poller,const st ...@@ -172,7 +172,7 @@ RtpBroadCaster::Ptr RtpBroadCaster::make(const EventPoller::Ptr &poller,const st
} }
} }
RtpBroadCaster::Ptr RtpBroadCaster::get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) { RtpMultiCaster::Ptr RtpMultiCaster::get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) {
string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl; string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl;
lock_guard<recursive_mutex> lck(g_mtx); lock_guard<recursive_mutex> lck(g_mtx);
auto it = g_mapBroadCaster.find(strKey); auto it = g_mapBroadCaster.find(strKey);
......
...@@ -65,18 +65,18 @@ private: ...@@ -65,18 +65,18 @@ private:
recursive_mutex _mtx; recursive_mutex _mtx;
unordered_set<uint32_t> _setBadAddr; unordered_set<uint32_t> _setBadAddr;
}; };
class RtpBroadCaster { class RtpMultiCaster {
public: public:
typedef std::shared_ptr<RtpBroadCaster> Ptr; typedef std::shared_ptr<RtpMultiCaster> Ptr;
typedef function<void()> onDetach; typedef function<void()> onDetach;
virtual ~RtpBroadCaster(); virtual ~RtpMultiCaster();
static Ptr get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream); static Ptr get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);
void setDetachCB(void *listener,const onDetach &cb); void setDetachCB(void *listener,const onDetach &cb);
uint16_t getPort(TrackType trackType); uint16_t getPort(TrackType trackType);
string getIP(); string getIP();
private: private:
static recursive_mutex g_mtx; static recursive_mutex g_mtx;
static unordered_map<string , weak_ptr<RtpBroadCaster> > g_mapBroadCaster; static unordered_map<string , weak_ptr<RtpMultiCaster> > g_mapBroadCaster;
static Ptr make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream); static Ptr make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);
std::shared_ptr<uint32_t> _multiAddr; std::shared_ptr<uint32_t> _multiAddr;
...@@ -86,7 +86,7 @@ private: ...@@ -86,7 +86,7 @@ private:
Socket::Ptr _apUdpSock[2]; Socket::Ptr _apUdpSock[2];
struct sockaddr_in _aPeerUdpAddr[2]; struct sockaddr_in _aPeerUdpAddr[2];
RtpBroadCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream); RtpMultiCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);
}; };
......
...@@ -44,29 +44,47 @@ RtpReceiver::RtpReceiver() {} ...@@ -44,29 +44,47 @@ RtpReceiver::RtpReceiver() {}
RtpReceiver::~RtpReceiver() {} RtpReceiver::~RtpReceiver() {}
bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) {
if(rtp_raw_len < 12){
WarnL << "rtp包太小:" << rtp_raw_len;
return false;
}
uint8_t padding = 0;
if (rtp_raw_ptr[0] & 0x40) {
//获取padding大小
padding = rtp_raw_ptr[rtp_raw_len - 1];
//移除padding flag
rtp_raw_ptr[0] &= ~0x40;
//移除padding字节
rtp_raw_len -= padding;
}
auto rtp_ptr = _rtp_pool.obtain(); auto rtp_ptr = _rtp_pool.obtain();
auto &rtp = *rtp_ptr; auto &rtp = *rtp_ptr;
auto length = rtp_raw_len + 4;
rtp.type = track->_type;
rtp.interleaved = 2 * track->_type; rtp.interleaved = 2 * track->_type;
rtp.mark = rtp_raw_ptr[1] >> 7; rtp.mark = rtp_raw_ptr[1] >> 7;
rtp.PT = rtp_raw_ptr[1] & 0x7F; rtp.PT = rtp_raw_ptr[1] & 0x7F;
//序列号
memcpy(&rtp.sequence,rtp_raw_ptr+2,2);//内存对齐 //序列号,内存对齐
memcpy(&rtp.sequence, rtp_raw_ptr + 2, 2);
rtp.sequence = ntohs(rtp.sequence); rtp.sequence = ntohs(rtp.sequence);
//时间戳
memcpy(&rtp.timeStamp, rtp_raw_ptr+4, 4);//内存对齐 //时间戳,内存对齐
memcpy(&rtp.timeStamp, rtp_raw_ptr + 4, 4);
rtp.timeStamp = ntohl(rtp.timeStamp);
if(!track->_samplerate){ if(!track->_samplerate){
//无法把时间戳转换成毫秒 //无法把时间戳转换成毫秒
return false; return false;
} }
//时间戳转换成毫秒 //时间戳转换成毫秒
rtp.timeStamp = ntohl(rtp.timeStamp) * 1000LL / track->_samplerate; rtp.timeStamp = rtp.timeStamp * 1000LL / track->_samplerate;
//ssrc
memcpy(&rtp.ssrc,rtp_raw_ptr+8,4);//内存对齐 //ssrc,内存对齐
memcpy(&rtp.ssrc, rtp_raw_ptr + 8, 4);
rtp.ssrc = ntohl(rtp.ssrc); rtp.ssrc = ntohl(rtp.ssrc);
rtp.type = track->_type;
if (track->_ssrc != rtp.ssrc) { if (track->_ssrc != rtp.ssrc) {
if (track->_ssrc == 0) { if (track->_ssrc == 0) {
...@@ -99,19 +117,19 @@ bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned ch ...@@ -99,19 +117,19 @@ bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned ch
rtp.offset += ext; rtp.offset += ext;
} }
if(length <= rtp.offset){ if(rtp_raw_len + 4 <= rtp.offset){
WarnL << "无有效负载的rtp包:" << length << "<=" << (int)rtp.offset; WarnL << "无有效负载的rtp包:" << rtp_raw_len << " <= " << (int)rtp.offset;
return false; return false;
} }
if(length > RTP_MAX_SIZE){ if(rtp_raw_len > RTP_MAX_SIZE){
WarnL << "超大的rtp包:" << length << ">" << RTP_MAX_SIZE; WarnL << "超大的rtp包:" << rtp_raw_len << " > " << RTP_MAX_SIZE;
return false; return false;
} }
//设置rtp负载长度 //设置rtp负载长度
rtp.setCapacity(length); rtp.setCapacity(rtp_raw_len + 4);
rtp.setSize(length); rtp.setSize(rtp_raw_len + 4);
uint8_t *payload_ptr = (uint8_t *)rtp.data(); uint8_t *payload_ptr = (uint8_t *)rtp.data();
payload_ptr[0] = '$'; payload_ptr[0] = '$';
payload_ptr[1] = rtp.interleaved; payload_ptr[1] = rtp.interleaved;
......
...@@ -30,6 +30,61 @@ ...@@ -30,6 +30,61 @@
namespace mediakit{ namespace mediakit{
static void getAttrSdp(const map<string, string> &attr, _StrPrinter &printer){
const map<string, string>::value_type *ptr = nullptr;
for(auto &pr : attr){
if(pr.first == "control"){
ptr = &pr;
continue;
}
if(pr.second.empty()){
printer << "a=" << pr.first << "\r\n";
}else{
printer << "a=" << pr.first << ":" << pr.second << "\r\n";
}
}
if(ptr){
printer << "a=" << ptr->first << ":" << ptr->second << "\r\n";
}
}
string SdpTrack::toString() const {
_StrPrinter _printer;
switch (_type){
case TrackTitle:{
_printer << "v=" << 0 << "\r\n";
if(!_o.empty()){
_printer << "o="<< _o << "\r\n";
}
if(!_c.empty()){
_printer << "c=" << _c << "\r\n";
}
if(!_t.empty()){
_printer << "t=" << _t << "\r\n";
}
_printer << "s=RTSP Session, streamed by the ZLMediaKit\r\n";
_printer << "i=ZLMediaKit Live Stream\r\n";
getAttrSdp(_attr,_printer);
}
break;
case TrackAudio:
case TrackVideo:{
if(_type == TrackAudio){
_printer << "m=audio 0 RTP/AVP " << _pt << "\r\n";
}else{
_printer << "m=video 0 RTP/AVP " << _pt << "\r\n";
}
if(!_b.empty()){
_printer << "b=" <<_b << "\r\n";
}
getAttrSdp(_attr,_printer);
}
break;
default:
break;
}
return _printer;
}
void SdpParser::load(const string &sdp) { void SdpParser::load(const string &sdp) {
_track_map.clear(); _track_map.clear();
string key; string key;
...@@ -164,5 +219,28 @@ vector<SdpTrack::Ptr> SdpParser::getAvailableTrack() const { ...@@ -164,5 +219,28 @@ vector<SdpTrack::Ptr> SdpParser::getAvailableTrack() const {
return ret; return ret;
} }
string SdpParser::toString() const {
string title,audio,video;
for(auto &pr : _track_map){
switch (pr.second->_type){
case TrackTitle:{
title = pr.second->toString();
}
break;
case TrackVideo:{
video = pr.second->toString();
}
break;
case TrackAudio:{
audio = pr.second->toString();
}
break;
default:
break;
}
}
return title + video + audio;
}
}//namespace mediakit }//namespace mediakit
...@@ -59,7 +59,7 @@ public: ...@@ -59,7 +59,7 @@ public:
uint32_t timeStamp; uint32_t timeStamp;
uint16_t sequence; uint16_t sequence;
uint32_t ssrc; uint32_t ssrc;
uint8_t offset; uint32_t offset;
TrackType type; TrackType type;
}; };
...@@ -90,6 +90,8 @@ public: ...@@ -90,6 +90,8 @@ public:
map<char, string> _other; map<char, string> _other;
map<string, string> _attr; map<string, string> _attr;
string toString() const;
public: public:
int _pt; int _pt;
string _codec; string _codec;
...@@ -118,6 +120,7 @@ public: ...@@ -118,6 +120,7 @@ public:
bool available() const; bool available() const;
SdpTrack::Ptr getTrack(TrackType type) const; SdpTrack::Ptr getTrack(TrackType type) const;
vector<SdpTrack::Ptr> getAvailableTrack() const; vector<SdpTrack::Ptr> getAvailableTrack() const;
string toString() const ;
private: private:
map<string, SdpTrack::Ptr> _track_map; map<string, SdpTrack::Ptr> _track_map;
}; };
......
...@@ -38,10 +38,6 @@ RtspDemuxer::RtspDemuxer(const string& sdp) { ...@@ -38,10 +38,6 @@ RtspDemuxer::RtspDemuxer(const string& sdp) {
loadSdp(SdpParser(sdp)); loadSdp(SdpParser(sdp));
} }
RtspDemuxer::RtspDemuxer(const SdpParser &attr) {
loadSdp(attr);
}
void RtspDemuxer::loadSdp(const SdpParser &attr) { void RtspDemuxer::loadSdp(const SdpParser &attr) {
auto tracks = attr.getAvailableTrack(); auto tracks = attr.getAvailableTrack();
for (auto &track : tracks){ for (auto &track : tracks){
......
...@@ -41,7 +41,6 @@ class RtspDemuxer : public Demuxer{ ...@@ -41,7 +41,6 @@ class RtspDemuxer : public Demuxer{
public: public:
typedef std::shared_ptr<RtspDemuxer> Ptr; typedef std::shared_ptr<RtspDemuxer> Ptr;
RtspDemuxer(const string &sdp); RtspDemuxer(const string &sdp);
RtspDemuxer(const SdpParser &parser);
virtual ~RtspDemuxer(){}; virtual ~RtspDemuxer(){};
/** /**
......
...@@ -39,7 +39,7 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title){ ...@@ -39,7 +39,7 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title){
} }
void RtspMuxer::onTrackReady(const Track::Ptr &track) { void RtspMuxer::onTrackReady(const Track::Ptr &track) {
//根据track生sdp //根据track生sdp
Sdp::Ptr sdp = track->getSdp(); Sdp::Ptr sdp = track->getSdp();
if (!sdp) { if (!sdp) {
return; return;
......
...@@ -219,14 +219,14 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { ...@@ -219,14 +219,14 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
_strContentBase.pop_back(); _strContentBase.pop_back();
} }
SdpParser sdpParser(parser.Content());
//解析sdp //解析sdp
_sdpParser.load(parser.Content()); _aTrackInfo = sdpParser.getAvailableTrack();
_aTrackInfo = _sdpParser.getAvailableTrack();
if (_aTrackInfo.empty()) { if (_aTrackInfo.empty()) {
throw std::runtime_error("无有效的Sdp Track"); throw std::runtime_error("无有效的Sdp Track");
} }
if (!onCheckSDP(parser.Content(), _sdpParser)) { if (!onCheckSDP(sdpParser.toString())) {
throw std::runtime_error("onCheckSDP faied"); throw std::runtime_error("onCheckSDP faied");
} }
...@@ -450,7 +450,7 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) { ...@@ -450,7 +450,7 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) {
auto strRtpTime = FindField(strTrack.data(), "rtptime=", ";"); auto strRtpTime = FindField(strTrack.data(), "rtptime=", ";");
auto idx = getTrackIndexByControlSuffix(strControlSuffix); auto idx = getTrackIndexByControlSuffix(strControlSuffix);
if(idx != -1){ if(idx != -1){
_aiFistStamp[idx] = atoll(strRtpTime.data()) * 1000 / _aTrackInfo[idx]->_samplerate; _aiFistStamp[idx] = _aTrackInfo[idx]->_samplerate>0?atoll(strRtpTime.data()) * 1000 / _aTrackInfo[idx]->_samplerate :1;
_aiNowStamp[idx] = _aiFistStamp[idx]; _aiNowStamp[idx] = _aiFistStamp[idx];
DebugL << "rtptime(ms):" << strControlSuffix <<" " << strRtpTime; DebugL << "rtptime(ms):" << strControlSuffix <<" " << strRtpTime;
} }
......
...@@ -46,7 +46,7 @@ using namespace toolkit; ...@@ -46,7 +46,7 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
//实现了rtsp播放器协议部分的功能 //实现了rtsp播放器协议部分的功能,及数据接收功能
class RtspPlayer: public PlayerBase,public TcpClient, public RtspSplitter, public RtpReceiver { class RtspPlayer: public PlayerBase,public TcpClient, public RtspSplitter, public RtpReceiver {
public: public:
typedef std::shared_ptr<RtspPlayer> Ptr; typedef std::shared_ptr<RtspPlayer> Ptr;
...@@ -59,7 +59,7 @@ public: ...@@ -59,7 +59,7 @@ public:
float getPacketLossRate(TrackType type) const override; float getPacketLossRate(TrackType type) const override;
protected: protected:
//派生类回调函数 //派生类回调函数
virtual bool onCheckSDP(const string &strSdp, const SdpParser &parser) = 0; virtual bool onCheckSDP(const string &strSdp) = 0;
virtual void onRecvRTP(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) = 0; virtual void onRecvRTP(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) = 0;
uint32_t getProgressMilliSecond() const; uint32_t getProgressMilliSecond() const;
void seekToMilliSecond(uint32_t ms); void seekToMilliSecond(uint32_t ms);
...@@ -124,7 +124,6 @@ private: ...@@ -124,7 +124,6 @@ private:
void createUdpSockIfNecessary(int track_idx); void createUdpSockIfNecessary(int track_idx);
private: private:
string _strUrl; string _strUrl;
SdpParser _sdpParser;
vector<SdpTrack::Ptr> _aTrackInfo; vector<SdpTrack::Ptr> _aTrackInfo;
function<void(const Parser&)> _onHandshake; function<void(const Parser&)> _onHandshake;
Socket::Ptr _apRtpSock[2]; //RTP端口,trackid idx 为数组下标 Socket::Ptr _apRtpSock[2]; //RTP端口,trackid idx 为数组下标
......
...@@ -61,12 +61,12 @@ public: ...@@ -61,12 +61,12 @@ public:
}; };
private: private:
//派生类回调函数 //派生类回调函数
bool onCheckSDP(const string &sdp, const SdpParser &parser) override { bool onCheckSDP(const string &sdp) override {
_pRtspMediaSrc = dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc); _pRtspMediaSrc = dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc);
if(_pRtspMediaSrc){ if(_pRtspMediaSrc){
_pRtspMediaSrc->onGetSDP(sdp); _pRtspMediaSrc->onGetSDP(sdp);
} }
_parser.reset(new RtspDemuxer(parser)); _parser.reset(new RtspDemuxer(sdp));
return true; return true;
} }
void onRecvRTP(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) override { void onRecvRTP(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) override {
......
...@@ -113,7 +113,7 @@ void RtspSession::onError(const SockException& err) { ...@@ -113,7 +113,7 @@ void RtspSession::onError(const SockException& err) {
} }
void RtspSession::onManager() { void RtspSession::onManager() {
GET_CONFIG(uint32_t,handshake_sec,Rtsp::kKeepAliveSecond); GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond); GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
if (_ticker.createdTime() > handshake_sec * 1000) { if (_ticker.createdTime() > handshake_sec * 1000) {
...@@ -153,24 +153,24 @@ void RtspSession::onWholeRtspPacket(Parser &parser) { ...@@ -153,24 +153,24 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
} }
typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser); typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
static unordered_map<string, rtsp_request_handler> s_handler_map; static unordered_map<string, rtsp_request_handler> s_cmd_functions;
static onceToken token( []() { static onceToken token( []() {
s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options); s_cmd_functions.emplace("OPTIONS",&RtspSession::handleReq_Options);
s_handler_map.emplace("DESCRIBE",&RtspSession::handleReq_Describe); s_cmd_functions.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
s_handler_map.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE); s_cmd_functions.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
s_handler_map.emplace("RECORD",&RtspSession::handleReq_RECORD); s_cmd_functions.emplace("RECORD",&RtspSession::handleReq_RECORD);
s_handler_map.emplace("SETUP",&RtspSession::handleReq_Setup); s_cmd_functions.emplace("SETUP",&RtspSession::handleReq_Setup);
s_handler_map.emplace("PLAY",&RtspSession::handleReq_Play); s_cmd_functions.emplace("PLAY",&RtspSession::handleReq_Play);
s_handler_map.emplace("PAUSE",&RtspSession::handleReq_Pause); s_cmd_functions.emplace("PAUSE",&RtspSession::handleReq_Pause);
s_handler_map.emplace("TEARDOWN",&RtspSession::handleReq_Teardown); s_cmd_functions.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
s_handler_map.emplace("GET",&RtspSession::handleReq_Get); s_cmd_functions.emplace("GET",&RtspSession::handleReq_Get);
s_handler_map.emplace("POST",&RtspSession::handleReq_Post); s_cmd_functions.emplace("POST",&RtspSession::handleReq_Post);
s_handler_map.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); s_cmd_functions.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
s_handler_map.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {}); }, []() {});
auto it = s_handler_map.find(strCmd); auto it = s_cmd_functions.find(strCmd);
if (it == s_handler_map.end()) { if (it == s_cmd_functions.end()) {
sendRtspResponse("403 Forbidden"); sendRtspResponse("403 Forbidden");
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd)); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
return; return;
...@@ -242,13 +242,13 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { ...@@ -242,13 +242,13 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
throw SockException(Err_shutdown,err); throw SockException(Err_shutdown,err);
} }
SdpParser sdpParser(parser.Content());
_strSession = makeRandStr(12); _strSession = makeRandStr(12);
_strSdp = parser.Content(); _aTrackInfo = sdpParser.getAvailableTrack();
_aTrackInfo = SdpParser(_strSdp).getAvailableTrack();
_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid); _pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->onGetSDP(_strSdp); _pushSrc->onGetSDP(sdpParser.toString());
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
} }
...@@ -361,12 +361,11 @@ void RtspSession::onAuthSuccess() { ...@@ -361,12 +361,11 @@ void RtspSession::onAuthSuccess() {
strongSelf->shutdown(SockException(Err_shutdown,err)); strongSelf->shutdown(SockException(Err_shutdown,err));
return; return;
} }
//找到了响应的rtsp流 //找到了相应的rtsp流
strongSelf->_strSdp = rtsp_src->getSdp(); strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
SdpParser sdpParser(strongSelf->_strSdp);
strongSelf->_aTrackInfo = sdpParser.getAvailableTrack();
if (strongSelf->_aTrackInfo.empty()) { if (strongSelf->_aTrackInfo.empty()) {
//该流无效 //该流无效
DebugL << "无trackInfo,该流无效";
strongSelf->send_StreamNotFound(); strongSelf->send_StreamNotFound();
strongSelf->shutdown(SockException(Err_shutdown,"can not find any availabe track in sdp")); strongSelf->shutdown(SockException(Err_shutdown,"can not find any availabe track in sdp"));
return; return;
...@@ -383,7 +382,7 @@ void RtspSession::onAuthSuccess() { ...@@ -383,7 +382,7 @@ void RtspSession::onAuthSuccess() {
{"Content-Base",strongSelf->_strContentBase + "/", {"Content-Base",strongSelf->_strContentBase + "/",
"x-Accept-Retransmit","our-retransmit", "x-Accept-Retransmit","our-retransmit",
"x-Accept-Dynamic-Rate","1" "x-Accept-Dynamic-Rate","1"
},strongSelf->_strSdp); },rtsp_src->getSdp());
}); });
} }
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) { void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
...@@ -675,14 +674,14 @@ void RtspSession::handleReq_Setup(const Parser &parser) { ...@@ -675,14 +674,14 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
} }
break; break;
case Rtsp::RTP_MULTICAST: { case Rtsp::RTP_MULTICAST: {
if(!_pBrdcaster){ if(!_multicaster){
_pBrdcaster = RtpBroadCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid); _multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_pBrdcaster) { if (!_multicaster) {
send_NotAcceptable(); send_NotAcceptable();
throw SockException(Err_shutdown, "can not get a available udp multicast socket"); throw SockException(Err_shutdown, "can not get a available udp multicast socket");
} }
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pBrdcaster->setDetachCB(this, [weakSelf]() { _multicaster->setDetachCB(this, [weakSelf]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
...@@ -690,7 +689,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) { ...@@ -690,7 +689,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached")); strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
}); });
} }
int iSrvPort = _pBrdcaster->getPort(trackRef->_type); int iSrvPort = _multicaster->getPort(trackRef->_type);
//我们用trackIdx区分rtp和rtcp包 //我们用trackIdx区分rtp和rtcp包
//由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口 //由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1); auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
...@@ -704,7 +703,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) { ...@@ -704,7 +703,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP;multicast;" {"Transport",StrPrinter << "RTP/AVP;multicast;"
<< "destination=" << _pBrdcaster->getIP() << ";" << "destination=" << _multicaster->getIP() << ";"
<< "source=" << get_local_ip() << ";" << "source=" << get_local_ip() << ";"
<< "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";" << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
<< "ttl=" << udpTTL << ";" << "ttl=" << udpTTL << ";"
...@@ -918,6 +917,12 @@ inline void RtspSession::send_NotAcceptable() { ...@@ -918,6 +917,12 @@ inline void RtspSession::send_NotAcceptable() {
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) { void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
GET_CONFIG(bool,modify_stamp,Rtsp::kModifyStamp);
if(modify_stamp){
int64_t dts_out;
_stamp[trackidx].revise(0, 0, dts_out, dts_out);
rtppt->timeStamp = dts_out;
}
_pushSrc->onWrite(rtppt, false); _pushSrc->onWrite(rtppt, false);
} }
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) { inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
#include "Common/config.h" #include "Common/config.h"
#include "Network/TcpSession.h" #include "Network/TcpSession.h"
#include "Player/PlayerBase.h" #include "Player/PlayerBase.h"
#include "RtpBroadCaster.h" #include "RtpMultiCaster.h"
#include "RtspMediaSource.h" #include "RtspMediaSource.h"
#include "RtspSplitter.h" #include "RtspSplitter.h"
#include "RtpReceiver.h" #include "RtpReceiver.h"
...@@ -76,6 +76,7 @@ public: ...@@ -76,6 +76,7 @@ public:
RtspSession(const Socket::Ptr &pSock); RtspSession(const Socket::Ptr &pSock);
virtual ~RtspSession(); virtual ~RtspSession();
////TcpSession override////
void onRecv(const Buffer::Ptr &pBuf) override; void onRecv(const Buffer::Ptr &pBuf) override;
void onError(const SockException &err) override; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
...@@ -119,60 +120,100 @@ protected: ...@@ -119,60 +120,100 @@ protected:
*/ */
virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
private: private:
void handleReq_Options(const Parser &parser); //处理options方法 //处理options方法,获取服务器能力
void handleReq_Describe(const Parser &parser); //处理describe方法 void handleReq_Options(const Parser &parser);
void handleReq_ANNOUNCE(const Parser &parser); //处理options方法 //处理describe方法,请求服务器rtsp sdp信息
void handleReq_RECORD(const Parser &parser); //处理options方法 void handleReq_Describe(const Parser &parser);
void handleReq_Setup(const Parser &parser); //处理setup方法 //处理ANNOUNCE方法,请求推流,附带sdp
void handleReq_Play(const Parser &parser); //处理play方法 void handleReq_ANNOUNCE(const Parser &parser);
void handleReq_Pause(const Parser &parser); //处理pause方法 //处理record方法,开始推流
void handleReq_Teardown(const Parser &parser); //处理teardown方法 void handleReq_RECORD(const Parser &parser);
void handleReq_Get(const Parser &parser); //处理Get方法 //处理setup方法,播放和推流协商rtp传输方式用
void handleReq_Post(const Parser &parser); //处理Post方法 void handleReq_Setup(const Parser &parser);
void handleReq_SET_PARAMETER(const Parser &parser); //处理SET_PARAMETER方法 //处理play方法,开始或恢复播放
void handleReq_Play(const Parser &parser);
void inline send_StreamNotFound(); //rtsp资源未找到 //处理pause方法,暂停播放
void inline send_UnsupportedTransport(); //不支持的传输模式 void handleReq_Pause(const Parser &parser);
void inline send_SessionNotFound(); //会话id错误 //处理teardown方法,结束播放
void inline send_NotAcceptable(); //rtsp同时播放数限制 void handleReq_Teardown(const Parser &parser);
//处理Get方法,rtp over http才用到
void handleReq_Get(const Parser &parser);
//处理Post方法,rtp over http才用到
void handleReq_Post(const Parser &parser);
//处理SET_PARAMETER、GET_PARAMETER方法,一般用于心跳
void handleReq_SET_PARAMETER(const Parser &parser);
//rtsp资源未找到
void inline send_StreamNotFound();
//不支持的传输模式
void inline send_UnsupportedTransport();
//会话id错误
void inline send_SessionNotFound();
//一般rtsp服务器打开端口失败时触发
void inline send_NotAcceptable();
//ssrc转字符串
inline string printSSRC(uint32_t ui32Ssrc); inline string printSSRC(uint32_t ui32Ssrc);
//获取track下标
inline int getTrackIndexByTrackType(TrackType type); inline int getTrackIndexByTrackType(TrackType type);
inline int getTrackIndexByControlSuffix(const string &controlSuffix); inline int getTrackIndexByControlSuffix(const string &controlSuffix);
inline int getTrackIndexByInterleaved(int interleaved); inline int getTrackIndexByInterleaved(int interleaved);
//一般用于接收udp打洞包,也用于rtsp推流
inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr); inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr);
//配合onRcvPeerUdpData使用
inline void startListenPeerUdpData(int iTrackIdx); inline void startListenPeerUdpData(int iTrackIdx);
//认证相关 ////rtsp专有认证相关////
//认证成功
void onAuthSuccess(); void onAuthSuccess();
//认证失败
void onAuthFailed(const string &realm,const string &why,bool close = true); void onAuthFailed(const string &realm,const string &why,bool close = true);
//开始走rtsp专有认证流程
void onAuthUser(const string &realm,const string &authorization); void onAuthUser(const string &realm,const string &authorization);
//校验base64方式的认证加密
void onAuthBasic(const string &realm,const string &strBase64); void onAuthBasic(const string &realm,const string &strBase64);
//校验md5方式的认证加密
void onAuthDigest(const string &realm,const string &strMd5); void onAuthDigest(const string &realm,const string &strMd5);
//发送rtp给客户端
void sendRtpPacket(const RtpPacket::Ptr &pkt); void sendRtpPacket(const RtpPacket::Ptr &pkt);
//回复客户端
bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0");
bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0");
//服务器发送rtcp
void sendSenderReport(bool overTcp,int iTrackIndex); void sendSenderReport(bool overTcp,int iTrackIndex);
private: private:
//用于判断客户端是否超时
Ticker _ticker; Ticker _ticker;
//收到的seq,回复时一致
int _iCseq = 0; int _iCseq = 0;
//ContentBase
string _strContentBase; string _strContentBase;
string _strSdp; //Session号
string _strSession; string _strSession;
//是否第一次播放,第一次播放需要鉴权,第二次播放属于暂停恢复
bool _bFirstPlay = true; bool _bFirstPlay = true;
//url解析后保存的相关信息
MediaInfo _mediaInfo; MediaInfo _mediaInfo;
//rtsp播放器绑定的直播源
std::weak_ptr<RtspMediaSource> _pMediaSrc; std::weak_ptr<RtspMediaSource> _pMediaSrc;
//直播源读取器
RingBuffer<RtpPacket::Ptr>::RingReader::Ptr _pRtpReader; RingBuffer<RtpPacket::Ptr>::RingReader::Ptr _pRtpReader;
//推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid; Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid;
//sdp里面有效的track,包含音频或视频
vector<SdpTrack::Ptr> _aTrackInfo; vector<SdpTrack::Ptr> _aTrackInfo;
////////RTP over udp////////
//RTP over udp //RTP端口,trackid idx 为数组下标
Socket::Ptr _apRtpSock[2]; //RTP端口,trackid idx 为数组下标 Socket::Ptr _apRtpSock[2];
Socket::Ptr _apRtcpSock[2];//RTCP端口,trackid idx 为数组下标 //RTCP端口,trackid idx 为数组下标
Socket::Ptr _apRtcpSock[2];
//标记是否收到播放的udp打洞包,收到播放的udp打洞包后才能知道其外网udp端口号
unordered_set<int> _udpSockConnected; unordered_set<int> _udpSockConnected;
//RTP over udp_multicast ////////RTP over udp_multicast////////
RtpBroadCaster::Ptr _pBrdcaster; //共享的rtp组播对象
RtpMultiCaster::Ptr _multicaster;
//登录认证 //登录认证
string _strNonce; string _strNonce;
...@@ -184,13 +225,16 @@ private: ...@@ -184,13 +225,16 @@ private:
//一次发送 get 一次发送post,需要通过x-sessioncookie关联起来 //一次发送 get 一次发送post,需要通过x-sessioncookie关联起来
string _http_x_sessioncookie; string _http_x_sessioncookie;
function<void(const Buffer::Ptr &pBuf)> _onRecv; function<void(const Buffer::Ptr &pBuf)> _onRecv;
//是否开始发送rtp
bool _enableSendRtp; bool _enableSendRtp;
//rtsp推流相关 //rtsp推流相关
RtspToRtmpMediaSource::Ptr _pushSrc; RtspToRtmpMediaSource::Ptr _pushSrc;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标 RtcpCounter _aRtcpCnt[2];
Ticker _aRtcpTicker[2]; //rtcp发送时间,trackid idx 为数组下标 //rtcp发送时间,trackid idx 为数组下标
Ticker _aRtcpTicker[2];
//时间戳修整器
Stamp _stamp[2];
}; };
/** /**
......
...@@ -88,6 +88,17 @@ public: ...@@ -88,6 +88,17 @@ public:
int readerCount() override { int readerCount() override {
return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0);
} }
/**
* 获取track
* @return
*/
vector<Track::Ptr> getTracks(bool trackReady) const override {
if(!_demuxer){
return this->RtspMediaSource::getTracks(trackReady);
}
return _demuxer->getTracks(trackReady);
}
private: private:
RtspDemuxer::Ptr _demuxer; RtspDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
......
#include <stdlib.h>
#include <memory.h>
#if !defined(_WIN32)
#include <dirent.h>
#endif //!defined(_WIN32)
#include <set>
#include "Util/CMD.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/File.h"
#include "Util/uv_errno.h"
using namespace std;
using namespace toolkit;
class CMD_main : public CMD {
public:
CMD_main() {
_parser.reset(new OptionParser(nullptr));
(*_parser) << Option('r',/*该选项简称,如果是\x00则说明无简称*/
"rm",/*该选项全称,每个选项必须有全称;不得为null或空字符串*/
Option::ArgNone,/*该选项后面必须跟值*/
nullptr,/*该选项默认值*/
false,/*该选项是否必须赋值,如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/
"是否删除或添加bom,默认添加bom头",/*该选项说明文字*/
nullptr);
(*_parser) << Option('f',/*该选项简称,如果是\x00则说明无简称*/
"filter",/*该选项全称,每个选项必须有全称;不得为null或空字符串*/
Option::ArgRequired,/*该选项后面必须跟值*/
"c,cpp,cxx,c,h,hpp",/*该选项默认值*/
true,/*该选项是否必须赋值,如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/
"文件后缀过滤器",/*该选项说明文字*/
nullptr);
(*_parser) << Option('i',/*该选项简称,如果是\x00则说明无简称*/
"in",/*该选项全称,每个选项必须有全称;不得为null或空字符串*/
Option::ArgRequired,/*该选项后面必须跟值*/
nullptr,/*该选项默认值*/
true,/*该选项是否必须赋值,如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/
"文件夹或文件",/*该选项说明文字*/
nullptr);
}
virtual ~CMD_main() {}
virtual const char *description() const {
return "添加或删除bom";
}
};
void get_file_path(const char *path, const char *file_name, char *file_path) {
strcpy(file_path, path);
if (file_path[strlen(file_path) - 1] != '/') {
strcat(file_path, "/");
}
strcat(file_path, file_name);
}
template <typename FUNC>
void for_each_file(const char *path, FUNC &&func){
DIR *dir;
dirent *dir_info;
char file_path[PATH_MAX];
if (File::is_file(path)) {
func(path);
return;
}
if (File::is_dir(path)) {
if ((dir = opendir(path)) == NULL) {
closedir(dir);
return;
}
while ((dir_info = readdir(dir)) != NULL) {
if (File::is_special_dir(dir_info->d_name)) {
continue;
}
get_file_path(path, dir_info->d_name, file_path);
for_each_file(file_path,std::forward<FUNC>(func));
}
closedir(dir);
return;
}
}
static const char s_bom[] = "\xEF\xBB\xBF";
void add_or_rm_bom(const char *file,bool rm_bom){
auto file_str = File::loadFile(file);
if(rm_bom){
file_str.erase(0, sizeof(s_bom) - 1);
}else{
file_str.insert(0,s_bom,sizeof(s_bom) - 1);
}
File::saveFile(file_str,file);
}
void process_file(const char *file,bool rm_bom){
std::shared_ptr<FILE> fp(fopen(file, "rb+"), [](FILE *fp) {
if (fp) {
fclose(fp);
}
});
if (!fp) {
WarnL << "打开文件失败:" << file << " " << get_uv_errmsg();
return;
}
bool have_bom = rm_bom;
char buf[sizeof(s_bom) - 1] = {0};
if (sizeof(buf) == fread(buf,1,sizeof(buf),fp.get())) {
have_bom = (memcmp(s_bom, buf, sizeof(s_bom) - 1) == 0);
}
if (have_bom == !rm_bom) {
// DebugL << "无需" << (rm_bom ? "删除" : "添加") << "bom:" << file;
return;
}
fp = nullptr;
add_or_rm_bom(file,rm_bom);
InfoL << (rm_bom ? "删除" : "添加") << "bom:" << file;
}
int main(int argc, char *argv[]) {
CMD_main cmd_main;
try {
cmd_main.operator()(argc, argv);
} catch (std::exception &ex) {
cout << ex.what() << endl;
return -1;
}
bool rm_bom = cmd_main.hasKey("rm");
string path = cmd_main["in"];
string filter = cmd_main["filter"];
auto vec = split(filter,",");
set<string> filter_set;
for(auto ext : vec){
filter_set.emplace(ext);
}
bool no_filter = filter_set.find("*") != filter_set.end();
//设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>());
for_each_file(path.data(),[&](const char *path){
if(!no_filter){
//开启了过滤器
auto pos = strstr(path,".");
if(pos == nullptr){
//没有后缀
return;
}
auto ext = pos + 1;
if(filter_set.find(ext) == filter_set.end()){
//后缀不匹配
return;
}
}
//该文件匹配
process_file(path,rm_bom);
});
return 0;
}
...@@ -39,42 +39,6 @@ using namespace std; ...@@ -39,42 +39,6 @@ using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
#ifdef WIN32
std::string Utf8ToGbk(std::string src_str){
int len = MultiByteToWideChar(CP_UTF8, 0, src_str.c_str(), -1, NULL, 0);
wchar_t* wszGBK = new wchar_t[len + 1];
memset(wszGBK, 0, len * 2 + 2);
MultiByteToWideChar(CP_UTF8, 0, src_str.c_str(), -1, wszGBK, len);
len = WideCharToMultiByte(CP_ACP, 0, wszGBK, -1, NULL, 0, NULL, NULL);
char* szGBK = new char[len + 1];
memset(szGBK, 0, len + 1);
WideCharToMultiByte(CP_ACP, 0, wszGBK, -1, szGBK, len, NULL, NULL);
string strTemp(szGBK);
if (wszGBK) delete[] wszGBK;
if (szGBK) delete[] szGBK;
return strTemp;
}
class log4Channel : public LogChannel {
public:
log4Channel(const string &name = "log4Channel", LogLevel level = LTrace) :LogChannel(name, level)
{
}
~log4Channel() {}
void write(const Logger &logger, const LogContextPtr &logContext) override
{
if (_level > logContext->_level) {
return;
}
printf("%s %s\n", logContext->_function, Utf8ToGbk(logContext->str()).c_str());
}
};
#else
typedef ConsoleChannel log4Channel;
#endif
#ifdef WIN32 #ifdef WIN32
#include <TCHAR.h> #include <TCHAR.h>
...@@ -106,7 +70,7 @@ int main(int argc, char *argv[]) { ...@@ -106,7 +70,7 @@ int main(int argc, char *argv[]) {
//设置退出信号处理函数 //设置退出信号处理函数
signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); }); signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); });
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<log4Channel>()); Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
if (argc != 3) { if (argc != 3) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论