Commit f28b732a by baiyfcu Committed by GitHub

Merge pull request #6 from zlmediakit/master

update
parents a3c8cb11 96aa687f
Subproject commit 57e7c83d5667b1e06fb8f5ca73dbe3f04a9fc67f Subproject commit 0b406073125080ab8edd13ee7c14e573e54baa35
...@@ -6,12 +6,12 @@ ...@@ -6,12 +6,12 @@
cmake_minimum_required(VERSION 3.4.1) cmake_minimum_required(VERSION 3.4.1)
#设置生成的so动态库最后输出的路径 #设置生成的so动态库最后输出的路径
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/libs_export/${ANDROID_ABI}) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/libs_export/${ANDROID_ABI})
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/libs_export/${ANDROID_ABI}) set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/libs_export/${ANDROID_ABI})
#设置工程源码根目录 #设置工程源码根目录
set(ZLMediaKit_Root ${CMAKE_SOURCE_DIR}/../../../../../) set(ZLMediaKit_Root ${CMAKE_CURRENT_SOURCE_DIR}/../../../../../)
set(JNI_Root ${CMAKE_SOURCE_DIR}) set(JNI_Root ${CMAKE_CURRENT_SOURCE_DIR})
set(ToolKit_Root ${ZLMediaKit_Root}/3rdpart/ZLToolKit/src) set(ToolKit_Root ${ZLMediaKit_Root}/3rdpart/ZLToolKit/src)
set(MediaKit_Root ${ZLMediaKit_Root}/src) set(MediaKit_Root ${ZLMediaKit_Root}/src)
......
...@@ -168,7 +168,7 @@ static void initEvent() { ...@@ -168,7 +168,7 @@ static void initEvent() {
<< args._vhost << " " << args._app << " " << args._vhost << " " << args._app << " "
<< args._streamid << " " << args._streamid << " "
<< args._param_strs; << args._param_strs;
invoker("");//鉴权成功 invoker("",true,true,false);//鉴权成功
//invoker("this is auth failed message");//鉴权失败 //invoker("this is auth failed message");//鉴权失败
}); });
......
...@@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.1.3) ...@@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.1.3)
#使能c++11 #使能c++11
set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD 11)
#加载自定义模块 #加载自定义模块
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PROJECT_SOURCE_DIR}/cmake") set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
#set(CMAKE_BUILD_TYPE "Release") #set(CMAKE_BUILD_TYPE "Release")
...@@ -16,7 +16,7 @@ else() ...@@ -16,7 +16,7 @@ else()
endif() endif()
#设置bin和lib库目录 #设置bin和lib库目录
set(RELEASE_DIR ${CMAKE_SOURCE_DIR}/release) set(RELEASE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/release)
if (CMAKE_SYSTEM_NAME MATCHES "Linux") if (CMAKE_SYSTEM_NAME MATCHES "Linux")
SET(LIBRARY_OUTPUT_PATH ${RELEASE_DIR}/linux/${BuildType}) SET(LIBRARY_OUTPUT_PATH ${RELEASE_DIR}/linux/${BuildType})
SET(EXECUTABLE_OUTPUT_PATH ${RELEASE_DIR}/linux/${BuildType}) SET(EXECUTABLE_OUTPUT_PATH ${RELEASE_DIR}/linux/${BuildType})
...@@ -72,7 +72,7 @@ endif() ...@@ -72,7 +72,7 @@ endif()
if(ENABLE_MP4RECORD) if(ENABLE_MP4RECORD)
message(STATUS "ENABLE_MP4RECORD defined") message(STATUS "ENABLE_MP4RECORD defined")
add_definitions(-DENABLE_MP4RECORD) add_definitions(-DENABLE_MP4RECORD)
set(MediaServer_Root ${CMAKE_SOURCE_DIR}/3rdpart/media-server) set(MediaServer_Root ${CMAKE_CURRENT_SOURCE_DIR}/3rdpart/media-server)
list(APPEND LINK_LIB_LIST mov flv) list(APPEND LINK_LIB_LIST mov flv)
endif() endif()
#查找openssl是否安装 #查找openssl是否安装
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
- 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式 - 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式
- 代码经过大量的稳定性、性能测试,可满足商用服务器项目。 - 代码经过大量的稳定性、性能测试,可满足商用服务器项目。
- 支持linux、macos、ios、android、windows平台 - 支持linux、macos、ios、android、windows平台
- 支持画面秒开(GOP缓存)、极低延时(1秒内) - 支持画面秒开(GOP缓存)、极低延时([500毫秒内,最低可达100毫秒](https://github.com/zlmediakit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95))
- **支持websocket-flv直播** - **支持websocket-flv直播**
- [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86) - [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86)
......
...@@ -28,6 +28,8 @@ maxStreamWaitMS=5000 ...@@ -28,6 +28,8 @@ maxStreamWaitMS=5000
#某个流无人观看时,触发hook.on_stream_none_reader事件的最大等待时间,单位毫秒 #某个流无人观看时,触发hook.on_stream_none_reader事件的最大等待时间,单位毫秒
#在配合hook.on_stream_none_reader事件时,可以做到无人观看自动停止拉流或停止接收推流 #在配合hook.on_stream_none_reader事件时,可以做到无人观看自动停止拉流或停止接收推流
streamNoneReaderDelayMS=5000 streamNoneReaderDelayMS=5000
#是否开启低延时模式,该模式下禁用MSG_MORE,启用TCP_NODEALY,延时将降低,但数据发送性能将降低
ultraLowDelay=1
[hls] [hls]
#hls写文件的buf大小,调整参数可以提高文件io性能 #hls写文件的buf大小,调整参数可以提高文件io性能
...@@ -101,10 +103,11 @@ udpTTL=64 ...@@ -101,10 +103,11 @@ udpTTL=64
[record] [record]
#mp4录制或mp4点播的应用名,通过限制应用名,可以防止随意点播 #mp4录制或mp4点播的应用名,通过限制应用名,可以防止随意点播
#点播的文件必须放置在此文件夹下
appName=record appName=record
#mp4录制写文件缓存,单位BYTE,调整参数可以提高文件io性能 #mp4录制写文件缓存,单位BYTE,调整参数可以提高文件io性能
fileBufSize=65536 fileBufSize=65536
#mp4录制保存路径 #mp4录制保存、mp4点播根路径
filePath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot filePath=/Users/xzl/git/ZLMediaKit/release/mac/Release/httpRoot
#mp4录制切片时间,单位秒 #mp4录制切片时间,单位秒
fileSecond=3600 fileSecond=3600
......
...@@ -581,7 +581,12 @@ void installWebApi() { ...@@ -581,7 +581,12 @@ void installWebApi() {
////////////以下是注册的Hook API//////////// ////////////以下是注册的Hook API////////////
API_REGIST(hook,on_publish,{ API_REGIST(hook,on_publish,{
//开始推流事件 //开始推流事件
throw SuccessException(); //转换成rtsp或rtmp
val["enableRtxp"] = true;
//转换hls
val["enableHls"] = true;
//不录制mp4
val["enableMP4"] = false;
}); });
API_REGIST(hook,on_play,{ API_REGIST(hook,on_play,{
......
...@@ -195,7 +195,7 @@ void installWebHook(){ ...@@ -195,7 +195,7 @@ void installWebHook(){
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){
if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){ if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){
invoker(""); invoker("",true, true,false);
return; return;
} }
//异步执行该hook api,防止阻塞NoticeCenter //异步执行该hook api,防止阻塞NoticeCenter
...@@ -205,7 +205,31 @@ void installWebHook(){ ...@@ -205,7 +205,31 @@ void installWebHook(){
body["id"] = sender.getIdentifier(); body["id"] = sender.getIdentifier();
//执行hook //执行hook
do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){ do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){
invoker(err); if(err.empty()){
//推流鉴权成功
bool enableRtxp = true;
bool enableHls = true;
bool enableMP4 = false;
//兼容用户不传递enableRtxp、enableHls、enableMP4参数
if(obj.isMember("enableRtxp")){
enableRtxp = obj["enableRtxp"].asBool();
}
if(obj.isMember("enableHls")){
enableHls = obj["enableHls"].asBool();
}
if(obj.isMember("enableMP4")){
enableMP4 = obj["enableMP4"].asBool();
}
invoker(err,enableRtxp,enableHls,enableMP4);
}else{
//推流鉴权失败
invoker(err,false, false, false);
}
}); });
}); });
......
...@@ -76,11 +76,13 @@ const string kFlowThreshold = GENERAL_FIELD"flowThreshold"; ...@@ -76,11 +76,13 @@ const string kFlowThreshold = GENERAL_FIELD"flowThreshold";
const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS"; const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS";
const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS"; const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS";
const string kEnableVhost = GENERAL_FIELD"enableVhost"; const string kEnableVhost = GENERAL_FIELD"enableVhost";
const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024; mINI::Instance()[kFlowThreshold] = 1024;
mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000; mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000;
mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000; mINI::Instance()[kMaxStreamWaitTimeMS] = 5 * 1000;
mINI::Instance()[kEnableVhost] = 1; mINI::Instance()[kEnableVhost] = 1;
mINI::Instance()[kUltraLowDelay] = 1;
},nullptr); },nullptr);
}//namespace General }//namespace General
......
...@@ -92,13 +92,20 @@ extern const string kBroadcastOnGetRtspRealm; ...@@ -92,13 +92,20 @@ extern const string kBroadcastOnGetRtspRealm;
extern const string kBroadcastOnRtspAuth; extern const string kBroadcastOnRtspAuth;
#define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,TcpSession &sender #define BroadcastOnRtspAuthArgs const MediaInfo &args,const string &realm,const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker,TcpSession &sender
//鉴权结果回调对象 //推流鉴权结果回调对象
//如果errMessage为空则代表鉴权成功 //如果errMessage为空则代表鉴权成功
typedef std::function<void(const string &errMessage)> AuthInvoker; //enableHls: 是否允许转换hls
//enableMP4: 是否运行MP4录制
//enableRtxp: rtmp推流时是否运行转rtsp;rtsp推流时,是否允许转rtmp
typedef std::function<void(const string &errMessage,bool enableRtxp,bool enableHls,bool enableMP4)> PublishAuthInvoker;
//收到rtsp/rtmp推流事件广播,通过该事件控制推流鉴权 //收到rtsp/rtmp推流事件广播,通过该事件控制推流鉴权
extern const string kBroadcastMediaPublish; extern const string kBroadcastMediaPublish;
#define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender #define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::PublishAuthInvoker &invoker,TcpSession &sender
//播放鉴权结果回调对象
//如果errMessage为空则代表鉴权成功
typedef std::function<void(const string &errMessage)> AuthInvoker;
//播放rtsp/rtmp/http-flv事件广播,通过该事件控制播放鉴权 //播放rtsp/rtmp/http-flv事件广播,通过该事件控制播放鉴权
extern const string kBroadcastMediaPlayed; extern const string kBroadcastMediaPlayed;
...@@ -168,6 +175,8 @@ extern const string kStreamNoneReaderDelayMS; ...@@ -168,6 +175,8 @@ extern const string kStreamNoneReaderDelayMS;
extern const string kMaxStreamWaitTimeMS; extern const string kMaxStreamWaitTimeMS;
//是否启动虚拟主机 //是否启动虚拟主机
extern const string kEnableVhost; extern const string kEnableVhost;
//超低延时模式,默认打开,打开后会降低延时但是转发性能会稍差
extern const string kUltraLowDelay;
}//namespace General }//namespace General
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "H264Rtp.h" #include "H264Rtp.h"
#include "AACRtp.h" #include "AACRtp.h"
#include "H265Rtp.h" #include "H265Rtp.h"
#include "Common/Parser.h"
namespace mediakit{ namespace mediakit{
...@@ -76,26 +77,14 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { ...@@ -76,26 +77,14 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
if (strcasecmp(track->_codec.data(), "h265") == 0) { if (strcasecmp(track->_codec.data(), "h265") == 0) {
//a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA= //a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA=
int pt, id; auto map = Parser::parseArgs(track->_fmtp," ","=");
char sprop_vps[128] = {0},sprop_sps[128] = {0},sprop_pps[128] = {0}; for(auto &pr : map){
if (5 == sscanf(track->_fmtp.data(), "%d profile-id=%d; sprop-sps=%127[^;]; sprop-pps=%127[^;]; sprop-vps=%127[^;]", &pt, &id, sprop_sps,sprop_pps, sprop_vps)) { trim(pr.second," ;");
auto vps = decodeBase64(sprop_vps);
auto sps = decodeBase64(sprop_sps);
auto pps = decodeBase64(sprop_pps);
return std::make_shared<H265Track>(vps,sps,pps,0,0,0);
} }
if (4 == sscanf(track->_fmtp.data(), "%d sprop-vps=%127[^;]; sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt, sprop_vps,sprop_sps, sprop_pps)) { auto vps = decodeBase64(map["sprop-vps"]);
auto vps = decodeBase64(sprop_vps); auto sps = decodeBase64(map["sprop-sps"]);
auto sps = decodeBase64(sprop_sps); auto pps = decodeBase64(map["sprop-pps"]);
auto pps = decodeBase64(sprop_pps); return std::make_shared<H265Track>(vps,sps,pps,0,0,0);
return std::make_shared<H265Track>(vps,sps,pps,0,0,0);
}
if (3 == sscanf(track->_fmtp.data(), "%d sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt,sprop_sps, sprop_pps)) {
auto sps = decodeBase64(sprop_sps);
auto pps = decodeBase64(sprop_pps);
return std::make_shared<H265Track>("",sps,pps,0,0,0);
}
return std::make_shared<H265Track>();
} }
......
...@@ -238,38 +238,51 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -238,38 +238,51 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) {
auto pcData = frame->data() + frame->prefixSize(); auto pcData = frame->data() + frame->prefixSize();
auto uiStamp = frame->stamp(); auto uiStamp = frame->stamp();
auto iLen = frame->size() - frame->prefixSize(); auto iLen = frame->size() - frame->prefixSize();
unsigned char naluType = H264_TYPE(pcData[0]); //获取NALU的5bit 帧类型 //获取NALU的5bit 帧类型
unsigned char naluType = H264_TYPE(pcData[0]);
uiStamp %= cycleMS; uiStamp %= cycleMS;
int iSize = _ui32MtuSize - 2; int iSize = _ui32MtuSize - 2;
if (iLen > iSize) { //超过MTU //超过MTU则按照FU-A模式打包
const unsigned char s_e_r_Start = 0x80; if (iLen > iSize) {
const unsigned char s_e_r_Mid = 0x00; //最高位bit为forbidden_zero_bit,
const unsigned char s_e_r_End = 0x40; //后面2bit为nal_ref_idc(帧重要程度),00:可以丢,11:不能丢
//获取帧头数据,1byte //末尾5bit为nalu type,固定为28(FU-A)
unsigned char nal_ref_idc = *((unsigned char *) pcData) & 0x60; //获取NALU的2bit 帧重要程度 00 可以丢 11不能丢 unsigned char f_nri_flags = (*((unsigned char *) pcData) & 0x60) | 28;
//nal_ref_idc = 0x60; unsigned char s_e_r_flags;
//组装FU-A帧头数据 2byte
unsigned char f_nri_type = nal_ref_idc + 28;//F为0 1bit,nri上面获取到2bit,28为FU-A分片类型5bit
unsigned char s_e_r_type = naluType;
bool bFirst = true; bool bFirst = true;
bool mark = false; bool mark = false;
int nOffset = 1; int nOffset = 1;
while (!mark) { while (!mark) {
if (iLen < nOffset + iSize) { //是否拆分结束 if (iLen < nOffset + iSize) {
//已经拆分结束
iSize = iLen - nOffset; iSize = iLen - nOffset;
mark = true; mark = true;
s_e_r_type = s_e_r_End + naluType; //FU-A end
} else if (bFirst) { s_e_r_flags = (1 << 6) | naluType;
s_e_r_type = s_e_r_Start + naluType; } else if (bFirst) {
//FU-A start
s_e_r_flags = (1 << 7) | naluType;
} else { } else {
s_e_r_type = s_e_r_Mid + naluType; //FU-A mid
s_e_r_flags = naluType;
}
{
//传入nullptr先不做payload的内存拷贝
auto rtp = makeRtp(getTrackType(), nullptr, iSize + 2, mark, uiStamp);
//rtp payload 负载部分
uint8_t *payload = (uint8_t*)rtp->data() + rtp->offset;
//FU-A 第1个字节
payload[0] = f_nri_flags;
//FU-A 第2个字节
payload[1] = s_e_r_flags;
//H264 数据
memcpy(payload + 2, (unsigned char *) pcData + nOffset, iSize);
//输入到rtp环形缓存
RtpCodec::inputRtp(rtp,bFirst && naluType == H264Frame::NAL_IDR);
} }
memcpy(_aucSectionBuf, &f_nri_type, 1);
memcpy(_aucSectionBuf + 1, &s_e_r_type, 1);
memcpy(_aucSectionBuf + 2, (unsigned char *) pcData + nOffset, iSize);
nOffset += iSize; nOffset += iSize;
makeH264Rtp(naluType,_aucSectionBuf, iSize + 2, mark,bFirst, uiStamp);
bFirst = false; bFirst = false;
} }
} else { } else {
......
...@@ -97,8 +97,6 @@ public: ...@@ -97,8 +97,6 @@ public:
void inputFrame(const Frame::Ptr &frame) override; void inputFrame(const Frame::Ptr &frame) override;
private: private:
void makeH264Rtp(int nal_type,const void *pData, unsigned int uiLen, bool bMark, bool first_packet, uint32_t uiStamp); void makeH264Rtp(int nal_type,const void *pData, unsigned int uiLen, bool bMark, bool first_packet, uint32_t uiStamp);
private:
unsigned char _aucSectionBuf[1600];
}; };
}//namespace mediakit{ }//namespace mediakit{
......
...@@ -267,8 +267,10 @@ public: ...@@ -267,8 +267,10 @@ public:
* @param frame 数据帧 * @param frame 数据帧
*/ */
void inputFrame(const Frame::Ptr &frame) override{ void inputFrame(const Frame::Ptr &frame) override{
bool first_frame = true; int type = H265_TYPE(*((uint8_t *)frame->data() + frame->prefixSize()));
splitH264(frame->data() + frame->prefixSize(), if(type == H265Frame::NAL_VPS){
bool first_frame = true;
splitH264(frame->data() + frame->prefixSize(),
frame->size() - frame->prefixSize(), frame->size() - frame->prefixSize(),
[&](const char *ptr, int len){ [&](const char *ptr, int len){
if(first_frame){ if(first_frame){
...@@ -286,6 +288,9 @@ public: ...@@ -286,6 +288,9 @@ public:
inputFrame_l(sub_frame); inputFrame_l(sub_frame);
} }
}); });
}else{
inputFrame_l(frame);
}
} }
private: private:
......
...@@ -177,9 +177,10 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -177,9 +177,10 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) {
uiStamp %= cycleMS; uiStamp %= cycleMS;
int maxSize = _ui32MtuSize - 3; int maxSize = _ui32MtuSize - 3;
if (iLen > maxSize) { //超过MTU //超过MTU,按照FU方式打包
if (iLen > maxSize) {
//获取帧头数据,1byte //获取帧头数据,1byte
unsigned char s_e_type; unsigned char s_e_flags;
bool bFirst = true; bool bFirst = true;
bool mark = false; bool mark = false;
int nOffset = 2; int nOffset = 2;
...@@ -187,20 +188,34 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -187,20 +188,34 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) {
if (iLen < nOffset + maxSize) { //是否拆分结束 if (iLen < nOffset + maxSize) { //是否拆分结束
maxSize = iLen - nOffset; maxSize = iLen - nOffset;
mark = true; mark = true;
s_e_type = 1 << 6 | naluType; //FU end
s_e_flags = (1 << 6) | naluType;
} else if (bFirst) { } else if (bFirst) {
s_e_type = 1 << 7 | naluType; //FU start
s_e_flags = (1 << 7) | naluType;
} else { } else {
s_e_type = naluType; //FU mid
s_e_flags = naluType;
}
{
//传入nullptr先不做payload的内存拷贝
auto rtp = makeRtp(getTrackType(), nullptr, maxSize + 3, mark, uiStamp);
//rtp payload 负载部分
uint8_t *payload = (uint8_t*)rtp->data() + rtp->offset;
//FU 第1个字节,表明为FU
payload[0] = 49 << 1;
//FU 第2个字节貌似固定为1
payload[1] = 1;
//FU 第3个字节
payload[2] = s_e_flags;
//H265 数据
memcpy(payload + 3,pcData + nOffset, maxSize);
//输入到rtp环形缓存
RtpCodec::inputRtp(rtp,bFirst && H265Frame::isKeyFrame(naluType));
} }
//FU type
_aucSectionBuf[0] = 49 << 1;
_aucSectionBuf[1] = 1;
_aucSectionBuf[2] = s_e_type;
memcpy(_aucSectionBuf + 3, pcData + nOffset, maxSize);
nOffset += maxSize; nOffset += maxSize;
makeH265Rtp(naluType,_aucSectionBuf, maxSize + 3, mark,bFirst, uiStamp);
bFirst = false; bFirst = false;
} }
} else { } else {
......
...@@ -98,8 +98,6 @@ public: ...@@ -98,8 +98,6 @@ public:
void inputFrame(const Frame::Ptr &frame) override; void inputFrame(const Frame::Ptr &frame) override;
private: private:
void makeH265Rtp(int nal_type,const void *pData, unsigned int uiLen, bool bMark, bool first_packet,uint32_t uiStamp); void makeH265Rtp(int nal_type,const void *pData, unsigned int uiLen, bool bMark, bool first_packet,uint32_t uiStamp);
private:
unsigned char _aucSectionBuf[1600];
}; };
}//namespace mediakit{ }//namespace mediakit{
......
...@@ -48,7 +48,6 @@ using namespace toolkit; ...@@ -48,7 +48,6 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
static int kHlsCookieSecond = 10 * 60; static int kHlsCookieSecond = 10 * 60;
static const string kCookieName = "ZL_COOKIE"; static const string kCookieName = "ZL_COOKIE";
static const string kCookiePathKey = "kCookiePathKey"; static const string kCookiePathKey = "kCookiePathKey";
...@@ -283,10 +282,9 @@ inline bool HttpSession::checkLiveFlvStream(const function<void()> &cb){ ...@@ -283,10 +282,9 @@ inline bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
cb(); cb();
} }
//开始发送rtmp负载 //http-flv直播牺牲延时提升发送性能
//关闭tcp_nodelay ,优化性能 setSocketFlags();
SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags);
try{ try{
start(getPoller(),rtmp_src); start(getPoller(),rtmp_src);
}catch (std::exception &ex){ }catch (std::exception &ex){
...@@ -393,26 +391,30 @@ inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const f ...@@ -393,26 +391,30 @@ inline void HttpSession::canAccessPath(const string &path_in,bool is_dir,const f
auto uid = getClientUid(); auto uid = getClientUid();
//先根据http头中的cookie字段获取cookie //先根据http头中的cookie字段获取cookie
HttpServerCookie::Ptr cookie = HttpCookieManager::Instance().getCookie(kCookieName, _parser.getValues()); HttpServerCookie::Ptr cookie = HttpCookieManager::Instance().getCookie(kCookieName, _parser.getValues());
//如果不是从http头中找到的cookie,我们让http客户端设置下cookie
bool cookie_from_header = true;
if(!cookie){ if(!cookie){
//客户端请求中无cookie,再根据该用户的用户id获取cookie //客户端请求中无cookie,再根据该用户的用户id获取cookie
cookie = HttpCookieManager::Instance().getCookieByUid(kCookieName, uid); cookie = HttpCookieManager::Instance().getCookieByUid(kCookieName, uid);
cookie_from_header = false;
} }
if(cookie){ if(cookie){
//找到了cookie,对cookie上锁先 //找到了cookie,对cookie上锁先
auto lck = cookie->getLock(); auto lck = cookie->getLock();
auto accessErr = (*cookie)[kAccessErrKey]; auto accessErr = (*cookie)[kAccessErrKey].get<string>();
if(path.find((*cookie)[kCookiePathKey]) == 0){ auto cookiePath = (*cookie)[kCookiePathKey].get<string>();
if(path.find(cookiePath) == 0){
//上次cookie是限定本目录 //上次cookie是限定本目录
if(accessErr.empty()){ if(accessErr.empty()){
//上次鉴权成功 //上次鉴权成功
callback("", nullptr); callback("", cookie_from_header ? nullptr : cookie);
return; return;
} }
//上次鉴权失败,如果url发生变更,那么也重新鉴权 //上次鉴权失败,但是如果url参数发生变更,那么也重新鉴权下
if (_parser.Params().empty() || _parser.Params() == cookie->getUid()) { if (_parser.Params().empty() || _parser.Params() == cookie->getUid()) {
//url参数未变,那么判断无权限访问 //url参数未变,或者本来就没有url参数,那么判断本次请求为重复请求,无访问权限
callback(accessErr.empty() ? "无权限访问该目录" : accessErr.get<string>(), nullptr); callback(accessErr, cookie_from_header ? nullptr : cookie);
return; return;
} }
} }
...@@ -593,12 +595,10 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -593,12 +595,10 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) {
//分节下载返回Content-Range头 //分节下载返回Content-Range头
httpHeader.emplace("Content-Range",StrPrinter<<"bytes " << iRangeStart << "-" << iRangeEnd << "/" << tFileStat.st_size<< endl); httpHeader.emplace("Content-Range",StrPrinter<<"bytes " << iRangeStart << "-" << iRangeEnd << "/" << tFileStat.st_size<< endl);
} }
auto Origin = parser["Origin"];
if(!Origin.empty()){
httpHeader["Access-Control-Allow-Origin"] = Origin;
httpHeader["Access-Control-Allow-Credentials"] = "true";
}
if(cookie){
httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
}
//先回复HTTP头部分 //先回复HTTP头部分
sendResponse(pcHttpResult,httpHeader,""); sendResponse(pcHttpResult,httpHeader,"");
...@@ -657,10 +657,9 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -657,10 +657,9 @@ inline void HttpSession::Handle_Req_GET(int64_t &content_len) {
} }
return false; return false;
}; };
//关闭tcp_nodelay ,优化性能
SockUtil::setNoDelay(_sock->rawFD(),false); //文件下载提升发送性能
//设置MSG_MORE,优化性能 setSocketFlags();
(*this) << SocketFlags(kSockFlags);
onFlush(); onFlush();
_sock->setOnFlush(onFlush); _sock->setOnFlush(onFlush);
...@@ -950,6 +949,15 @@ inline void HttpSession::sendNotFound(bool bClose) { ...@@ -950,6 +949,15 @@ inline void HttpSession::sendNotFound(bool bClose) {
sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound);
} }
void HttpSession::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}
void HttpSession::onWrite(const Buffer::Ptr &buffer) { void HttpSession::onWrite(const Buffer::Ptr &buffer) {
_ticker.resetTime(); _ticker.resetTime();
......
...@@ -142,6 +142,9 @@ private: ...@@ -142,6 +142,9 @@ private:
* @return * @return
*/ */
inline string getClientUid(); inline string getClientUid();
//设置socket标志
void setSocketFlags();
private: private:
string _origin; string _origin;
Parser _parser; Parser _parser;
......
...@@ -49,7 +49,7 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou ...@@ -49,7 +49,7 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou
//相对时间戳 //相对时间戳
dts_out = dts - _start_dts; dts_out = dts - _start_dts;
if(dts_out < _dts_inc){ if(dts_out < _dts_inc && !_playback){
//本次相对时间戳竟然小于上次? //本次相对时间戳竟然小于上次?
if(dts_out < 0 || _dts_inc - dts_out > 0xFFFF){ if(dts_out < 0 || _dts_inc - dts_out > 0xFFFF){
//时间戳回环,保证下次相对时间戳与本次相对合理增长 //时间戳回环,保证下次相对时间戳与本次相对合理增长
...@@ -77,4 +77,8 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou ...@@ -77,4 +77,8 @@ void Stamp::revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_ou
} }
} }
void Stamp::setPlayBack(bool playback) {
_playback = playback;
}
}//namespace mediakit }//namespace mediakit
\ No newline at end of file
...@@ -39,8 +39,23 @@ class Stamp { ...@@ -39,8 +39,23 @@ class Stamp {
public: public:
Stamp() = default; Stamp() = default;
~Stamp() = default; ~Stamp() = default;
/**
* 设置回放模式,回放模式时间戳可以回退
* @param playback 是否为回放模式
*/
void setPlayBack(bool playback = true);
/**
* 修正时间戳
* @param dts 输入dts,如果为0则根据系统时间戳生成
* @param pts 输入pts,如果为0则等于dts
* @param dts_out 输出dts
* @param pts_out 输出pts
*/
void revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out); void revise(uint32_t dts, uint32_t pts, int64_t &dts_out, int64_t &pts_out);
private: private:
bool _playback = false;
int64_t _start_dts = 0; int64_t _start_dts = 0;
int64_t _dts_inc = 0; int64_t _dts_inc = 0;
bool _first = true; bool _first = true;
......
...@@ -33,8 +33,6 @@ using namespace mediakit::Client; ...@@ -33,8 +33,6 @@ using namespace mediakit::Client;
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){ RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){
_pMediaSrc=src; _pMediaSrc=src;
} }
...@@ -229,10 +227,19 @@ inline void RtmpPusher::send_metaData(){ ...@@ -229,10 +227,19 @@ inline void RtmpPusher::send_metaData(){
} }
}); });
onPublishResult(SockException(Err_success,"success")); onPublishResult(SockException(Err_success,"success"));
//提高发送性能 //提升发送性能
(*this) << SocketFlags(kSockFlags); setSocketFlags();
SockUtil::setNoDelay(_sock->rawFD(),false);
} }
void RtmpPusher::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
}
}
void RtmpPusher::onCmd_result(AMFDecoder &dec){ void RtmpPusher::onCmd_result(AMFDecoder &dec){
auto iReqId = dec.load<int>(); auto iReqId = dec.load<int>();
lock_guard<recursive_mutex> lck(_mtxOnResultCB); lock_guard<recursive_mutex> lck(_mtxOnResultCB);
......
...@@ -84,7 +84,7 @@ private: ...@@ -84,7 +84,7 @@ private:
inline void send_createStream(); inline void send_createStream();
inline void send_publish(); inline void send_publish();
inline void send_metaData(); inline void send_metaData();
void setSocketFlags();
private: private:
string _strApp; string _strApp;
string _strStream; string _strStream;
......
...@@ -31,8 +31,6 @@ ...@@ -31,8 +31,6 @@
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
DebugP(this); DebugP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);
...@@ -142,10 +140,10 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -142,10 +140,10 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
} }
})); }));
dec.load<AMFValue>();/* NULL */ dec.load<AMFValue>();/* NULL */
_mediaInfo.parse(_strTcUrl + "/" + dec.load<std::string>()); _mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load<std::string>()));
_mediaInfo._schema = RTMP_SCHEMA; _mediaInfo._schema = RTMP_SCHEMA;
auto onRes = [this,pToken](const string &err){ auto onRes = [this,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA, auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
_mediaInfo._vhost, _mediaInfo._vhost,
_mediaInfo._app, _mediaInfo._app,
...@@ -169,21 +167,25 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -169,21 +167,25 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
} }
_pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid)); _pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid));
_pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
//设置转协议
_pPublisherSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);
//如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能 //如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024)); _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
}; };
Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){ Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
strongSelf->async([weakSelf,onRes,err,pToken](){ strongSelf->async([weakSelf,onRes,err,pToken,enableRtxp,enableHls,enableMP4](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
onRes(err); onRes(err,enableRtxp,enableHls,enableMP4);
}); });
}; };
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,
...@@ -192,7 +194,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -192,7 +194,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
*this); *this);
if(!flag){ if(!flag){
//该事件无人监听,默认鉴权成功 //该事件无人监听,默认鉴权成功
onRes(""); onRes("",true,true,false);
} }
} }
...@@ -272,7 +274,6 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -272,7 +274,6 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
_pRingReader = src->getRing()->attach(getPoller()); _pRingReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
SockUtil::setNoDelay(_sock->rawFD(), false);
_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) { _pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
...@@ -291,10 +292,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -291,10 +292,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
if (src->readerCount() == 1) { if (src->readerCount() == 1) {
src->seekTo(0); src->seekTo(0);
} }
//提高服务器发送性能
//提高发送性能 setSocketFlags();
(*this) << SocketFlags(kSockFlags);
SockUtil::setNoDelay(_sock->rawFD(),false);
} }
void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool)> &cb){ void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool)> &cb){
...@@ -348,9 +347,41 @@ void RtmpSession::doPlay(AMFDecoder &dec){ ...@@ -348,9 +347,41 @@ void RtmpSession::doPlay(AMFDecoder &dec){
void RtmpSession::onCmd_play2(AMFDecoder &dec) { void RtmpSession::onCmd_play2(AMFDecoder &dec) {
doPlay(dec); doPlay(dec);
} }
string RtmpSession::getStreamId(const string &str){
string stream_id;
string params;
auto pos = str.find('?');
if(pos != string::npos){
//有url参数
stream_id = str.substr(0,pos);
//获取url参数
params = str.substr(pos + 1);
}else{
//没有url参数
stream_id = str;
}
pos = stream_id.find(":");
if(pos != string::npos){
//vlc和ffplay在播放 rtmp://127.0.0.1/record/0.mp4时,
//传过来的url会是rtmp://127.0.0.1/record/mp4:0,
//我们在这里还原成0.mp4
stream_id = stream_id.substr(pos + 1) + "." + stream_id.substr(0,pos);
}
if(params.empty()){
//没有url参数
return stream_id;
}
//有url参数
return stream_id + '?' + params;
}
void RtmpSession::onCmd_play(AMFDecoder &dec) { void RtmpSession::onCmd_play(AMFDecoder &dec) {
dec.load<AMFValue>();/* NULL */ dec.load<AMFValue>();/* NULL */
_mediaInfo.parse(_strTcUrl + "/" + dec.load<std::string>()); _mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load<std::string>()));
_mediaInfo._schema = RTMP_SCHEMA; _mediaInfo._schema = RTMP_SCHEMA;
doPlay(dec); doPlay(dec);
} }
...@@ -464,6 +495,8 @@ void RtmpSession::onCmd_seek(AMFDecoder &dec) { ...@@ -464,6 +495,8 @@ void RtmpSession::onCmd_seek(AMFDecoder &dec) {
InfoP(this) << "rtmp seekTo(ms):" << milliSeconds; InfoP(this) << "rtmp seekTo(ms):" << milliSeconds;
auto stongSrc = _pPlayerSrc.lock(); auto stongSrc = _pPlayerSrc.lock();
if (stongSrc) { if (stongSrc) {
_stamp[0].setPlayBack();
_stamp[1].setPlayBack();
stongSrc->seekTo(milliSeconds); stongSrc->seekTo(milliSeconds);
} }
AMFValue status(AMF_OBJECT); AMFValue status(AMF_OBJECT);
...@@ -500,4 +533,13 @@ void RtmpSession::onNoneReader(MediaSource &sender) { ...@@ -500,4 +533,13 @@ void RtmpSession::onNoneReader(MediaSource &sender) {
MediaSourceEvent::onNoneReader(sender); MediaSourceEvent::onNoneReader(sender);
} }
void RtmpSession::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}
} /* namespace mediakit */ } /* namespace mediakit */
...@@ -85,13 +85,15 @@ private: ...@@ -85,13 +85,15 @@ private:
bool close(MediaSource &sender,bool force) override ; bool close(MediaSource &sender,bool force) override ;
void onNoneReader(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override;
void setSocketFlags();
string getStreamId(const string &str);
private: private:
std::string _strTcUrl; std::string _strTcUrl;
MediaInfo _mediaInfo; MediaInfo _mediaInfo;
double _dNowReqID = 0; double _dNowReqID = 0;
Ticker _ticker;//数据接收时间 Ticker _ticker;//数据接收时间
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader; RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader;
std::shared_ptr<RtmpMediaSource> _pPublisherSrc; std::shared_ptr<RtmpToRtspMediaSource> _pPublisherSrc;
std::weak_ptr<RtmpMediaSource> _pPlayerSrc; std::weak_ptr<RtmpMediaSource> _pPlayerSrc;
//时间戳修整器 //时间戳修整器
Stamp _stamp[2]; Stamp _stamp[2];
......
...@@ -52,11 +52,7 @@ public: ...@@ -52,11 +52,7 @@ public:
RtmpToRtspMediaSource(const string &vhost, RtmpToRtspMediaSource(const string &vhost,
const string &app, const string &app,
const string &id, const string &id,
bool bEnableHls = true,
bool bEnableMp4 = false,
int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){ int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){
_bEnableHls = bEnableHls;
_bEnableMp4 = bEnableMp4;
_demuxer = std::make_shared<RtmpDemuxer>(); _demuxer = std::make_shared<RtmpDemuxer>();
} }
virtual ~RtmpToRtspMediaSource(){} virtual ~RtmpToRtspMediaSource(){}
...@@ -66,17 +62,17 @@ public: ...@@ -66,17 +62,17 @@ public:
RtmpMediaSource::onGetMetaData(metadata); RtmpMediaSource::onGetMetaData(metadata);
} }
void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos) override { void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos = true) override {
_demuxer->inputRtmp(pkt); _demuxer->inputRtmp(pkt);
if(!_muxer && _demuxer->isInited(2000)){ if(!_muxer && _demuxer->isInited(2000)){
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), _muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(),
getApp(), getApp(),
getId(), getId(),
_demuxer->getDuration(), _demuxer->getDuration(),
true,//转rtsp _enableRtsp,
false,//不重复生成rtmp false,//不重复生成rtmp
_bEnableHls, _enableHls,
_bEnableMp4); _enableMP4);
for (auto &track : _demuxer->getTracks(false)){ for (auto &track : _demuxer->getTracks(false)){
_muxer->addTrack(track); _muxer->addTrack(track);
track->addDelegate(_muxer); track->addDelegate(_muxer);
...@@ -107,11 +103,25 @@ public: ...@@ -107,11 +103,25 @@ public:
} }
return _demuxer->getTracks(trackReady); return _demuxer->getTracks(trackReady);
} }
/**
* 设置协议转换
* @param enableRtsp 是否转换成rtsp
* @param enableHls 是否转换成hls
* @param enableMP4 是否mp4录制
*/
void setProtocolTranslation(bool enableRtsp,bool enableHls,bool enableMP4){
// DebugL << enableRtsp << " " << enableHls << " " << enableMP4;
_enableRtsp = enableRtsp;
_enableHls = enableHls;
_enableMP4 = enableMP4;
}
private: private:
RtmpDemuxer::Ptr _demuxer; RtmpDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
bool _bEnableHls; bool _enableHls = true;
bool _bEnableMp4; bool _enableMP4 = false;
bool _enableRtsp = true;
}; };
} /* namespace mediakit */ } /* namespace mediakit */
......
...@@ -49,8 +49,11 @@ RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, unsigned int l ...@@ -49,8 +49,11 @@ RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, unsigned int l
memcpy(&pucRtp[8], &ts, 4); memcpy(&pucRtp[8], &ts, 4);
//ssrc //ssrc
memcpy(&pucRtp[12], &sc, 4); memcpy(&pucRtp[12], &sc, 4);
//playload
memcpy(&pucRtp[16], data, len); if(data){
//playload
memcpy(&pucRtp[16], data, len);
}
rtppkt->PT = _ui8PlayloadType; rtppkt->PT = _ui8PlayloadType;
rtppkt->interleaved = _ui8Interleaved; rtppkt->interleaved = _ui8Interleaved;
......
...@@ -11,8 +11,6 @@ using namespace mediakit::Client; ...@@ -11,8 +11,6 @@ using namespace mediakit::Client;
namespace mediakit { namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspPusher::RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src) : TcpClient(poller){ RtspPusher::RtspPusher(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src) : TcpClient(poller){
_pMediaSrc = src; _pMediaSrc = src;
} }
...@@ -392,13 +390,21 @@ void RtspPusher::sendRecord() { ...@@ -392,13 +390,21 @@ void RtspPusher::sendRecord() {
},getPoller())); },getPoller()));
} }
onPublishResult(SockException(Err_success,"success")); onPublishResult(SockException(Err_success,"success"));
//提高发送性能 //提升发送性能
(*this) << SocketFlags(kSockFlags); setSocketFlags();
SockUtil::setNoDelay(_sock->rawFD(),false);
}; };
sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"}); sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"});
} }
void RtspPusher::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
}
}
void RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header,const string &sdp ) { void RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header,const string &sdp ) {
string key; string key;
StrCaseMap header_map; StrCaseMap header_map;
......
...@@ -67,6 +67,7 @@ private: ...@@ -67,6 +67,7 @@ private:
void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = ""); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = "");
void createUdpSockIfNecessary(int track_idx); void createUdpSockIfNecessary(int track_idx);
void setSocketFlags();
private: private:
//rtsp鉴权相关 //rtsp鉴权相关
string _rtspMd5Nonce; string _rtspMd5Nonce;
......
...@@ -71,7 +71,6 @@ namespace mediakit { ...@@ -71,7 +71,6 @@ namespace mediakit {
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter; static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护 //对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter; static recursive_mutex g_mtxGetter;
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
DebugP(this); DebugP(this);
...@@ -143,6 +142,12 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -143,6 +142,12 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
} }
} }
//字符串是否以xx结尾
static inline bool end_of(const string &str, const string &substr){
auto pos = str.rfind(substr);
return pos != string::npos && pos == str.size() - substr.size();
};
void RtspSession::onWholeRtspPacket(Parser &parser) { void RtspSession::onWholeRtspPacket(Parser &parser) {
string strCmd = parser.Method(); //提取出请求命令字 string strCmd = parser.Method(); //提取出请求命令字
_iCseq = atoi(parser["CSeq"].data()); _iCseq = atoi(parser["CSeq"].data());
...@@ -242,6 +247,13 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { ...@@ -242,6 +247,13 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
throw SockException(Err_shutdown,err); throw SockException(Err_shutdown,err);
} }
auto full_url = parser.FullUrl();
if(end_of(full_url,".sdp")){
//去除.sdp后缀,防止EasyDarwin推流器强制添加.sdp后缀
full_url = full_url.substr(0,full_url.length() - 4);
_mediaInfo.parse(full_url);
}
SdpParser sdpParser(parser.Content()); SdpParser sdpParser(parser.Content());
_strSession = makeRandStr(12); _strSession = makeRandStr(12);
_aTrackInfo = sdpParser.getAvailableTrack(); _aTrackInfo = sdpParser.getAvailableTrack();
...@@ -249,7 +261,8 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { ...@@ -249,7 +261,8 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid); _pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->onGetSDP(sdpParser.toString()); _pushSrc->onGetSDP(sdpParser.toString());
sendRtspResponse("200 OK");
sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"});
} }
void RtspSession::handleReq_RECORD(const Parser &parser){ void RtspSession::handleReq_RECORD(const Parser &parser){
...@@ -257,7 +270,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ ...@@ -257,7 +270,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
send_SessionNotFound(); send_SessionNotFound();
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record"); throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
} }
auto onRes = [this](const string &err){ auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
bool authSuccess = err.empty(); bool authSuccess = err.empty();
if(!authSuccess){ if(!authSuccess){
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
...@@ -265,6 +278,9 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ ...@@ -265,6 +278,9 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
return; return;
} }
//设置转协议
_pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);
_StrPrinter rtp_info; _StrPrinter rtp_info;
for(auto &track : _aTrackInfo){ for(auto &track : _aTrackInfo){
if (track->_inited == false) { if (track->_inited == false) {
...@@ -277,26 +293,25 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ ...@@ -277,26 +293,25 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
rtp_info.pop_back(); rtp_info.pop_back();
sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
SockUtil::setNoDelay(_sock->rawFD(),false);
if(_rtpType == Rtsp::RTP_TCP){ if(_rtpType == Rtsp::RTP_TCP){
//如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能 //如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024)); _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
} }
(*this) << SocketFlags(kSockFlags);
}; };
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){ Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
strongSelf->async([weakSelf,onRes,err](){ strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
} }
onRes(err); onRes(err,enableRtxp,enableHls,enableMP4);
}); });
}; };
...@@ -304,7 +319,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ ...@@ -304,7 +319,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this); auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
if(!flag){ if(!flag){
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
onRes(""); onRes("",true,true,false);
} }
} }
...@@ -571,7 +586,7 @@ inline void RtspSession::send_SessionNotFound() { ...@@ -571,7 +586,7 @@ inline void RtspSession::send_SessionNotFound() {
void RtspSession::handleReq_Setup(const Parser &parser) { void RtspSession::handleReq_Setup(const Parser &parser) {
//处理setup命令,该函数可能进入多次 //处理setup命令,该函数可能进入多次
auto controlSuffix = split(parser.Url(),"/").back();// parser.FullUrl().substr(_strContentBase.size()); auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
if(controlSuffix.front() == '/'){ if(controlSuffix.front() == '/'){
controlSuffix = controlSuffix.substr(1); controlSuffix = controlSuffix.substr(1);
} }
...@@ -780,10 +795,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { ...@@ -780,10 +795,7 @@ void RtspSession::handleReq_Play(const Parser &parser) {
}); });
_enableSendRtp = true; _enableSendRtp = true;
setSocketFlags();
//提高发送性能
SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags);
if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) { if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
...@@ -1230,6 +1242,16 @@ void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) { ...@@ -1230,6 +1242,16 @@ void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
} }
} }
void RtspSession::setSocketFlags(){
GET_CONFIG(bool,ultraLowDelay,General::kUltraLowDelay);
if(!ultraLowDelay) {
//推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
//播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
(*this) << SocketFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
}
} }
/* namespace mediakit */ /* namespace mediakit */
...@@ -183,6 +183,8 @@ private: ...@@ -183,6 +183,8 @@ private:
bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0");
//服务器发送rtcp //服务器发送rtcp
void sendSenderReport(bool overTcp,int iTrackIndex); void sendSenderReport(bool overTcp,int iTrackIndex);
//设置socket标志
void setSocketFlags();
private: private:
//用于判断客户端是否超时 //用于判断客户端是否超时
Ticker _ticker; Ticker _ticker;
......
...@@ -43,11 +43,7 @@ public: ...@@ -43,11 +43,7 @@ public:
RtspToRtmpMediaSource(const string &vhost, RtspToRtmpMediaSource(const string &vhost,
const string &app, const string &app,
const string &id, const string &id,
bool bEnableHls = true,
bool bEnableMp4 = false,
int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) { int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) {
_bEnableHls = bEnableHls;
_bEnableMp4 = bEnableMp4;
} }
virtual ~RtspToRtmpMediaSource() {} virtual ~RtspToRtmpMediaSource() {}
...@@ -66,9 +62,9 @@ public: ...@@ -66,9 +62,9 @@ public:
getId(), getId(),
_demuxer->getDuration(), _demuxer->getDuration(),
false,//不重复生成rtsp false,//不重复生成rtsp
true,//转rtmp _enableRtmp,
_bEnableHls, _enableHls,
_bEnableMp4); _enableMP4);
for (auto &track : _demuxer->getTracks(false)) { for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track); _muxer->addTrack(track);
track->addDelegate(_muxer); track->addDelegate(_muxer);
...@@ -99,11 +95,25 @@ public: ...@@ -99,11 +95,25 @@ public:
} }
return _demuxer->getTracks(trackReady); return _demuxer->getTracks(trackReady);
} }
/**
* 设置协议转换
* @param enableRtmp 是否转换成rtmp
* @param enableHls 是否转换成hls
* @param enableMP4 是否mp4录制
*/
void setProtocolTranslation(bool enableRtmp,bool enableHls,bool enableMP4){
// DebugL << enableRtmp << " " << enableHls << " " << enableMP4;
_enableRtmp = enableRtmp;
_enableHls = enableHls;
_enableMP4 = enableMP4;
}
private: private:
RtspDemuxer::Ptr _demuxer; RtspDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
bool _bEnableHls; bool _enableHls = true;
bool _bEnableMp4; bool _enableMP4 = false;
bool _enableRtmp = true;
}; };
} /* namespace mediakit */ } /* namespace mediakit */
......
...@@ -40,6 +40,40 @@ using namespace toolkit; ...@@ -40,6 +40,40 @@ using namespace toolkit;
using namespace mediakit; using namespace mediakit;
/**
* 合并一些时间戳相同的frame
*/
class FrameMerger {
public:
FrameMerger() = default;
virtual ~FrameMerger() = default;
void inputFrame(const Frame::Ptr &frame,const function<void(uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer)> &cb){
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
if(_frameCached.size() != 1){
string merged;
_frameCached.for_each([&](const Frame::Ptr &frame){
if(frame->prefixSize()){
merged.append(frame->data(),frame->size());
} else{
merged.append("\x00\x00\x00\x01",4);
merged.append(frame->data(),frame->size());
}
});
merged_frame = std::make_shared<BufferString>(std::move(merged));
}
cb(back->dts(),back->pts(),merged_frame);
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
private:
List<Frame::Ptr> _frameCached;
};
#ifdef WIN32 #ifdef WIN32
#include <TCHAR.h> #include <TCHAR.h>
...@@ -101,18 +135,26 @@ int main(int argc, char *argv[]) { ...@@ -101,18 +135,26 @@ int main(int argc, char *argv[]) {
SDLDisplayerHelper::Instance().doTask([frame,storage]() { SDLDisplayerHelper::Instance().doTask([frame,storage]() {
auto &decoder = (*storage)["decoder"]; auto &decoder = (*storage)["decoder"];
auto &displayer = (*storage)["displayer"]; auto &displayer = (*storage)["displayer"];
auto &merger = (*storage)["merger"];
if(!decoder){ if(!decoder){
decoder.set<H264Decoder>(); decoder.set<H264Decoder>();
} }
if(!displayer){ if(!displayer){
displayer.set<YuvDisplayer>(); displayer.set<YuvDisplayer>();
} }
if(!merger){
merger.set<FrameMerger>();
};
merger.get<FrameMerger>().inputFrame(frame,[&](uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer){
AVFrame *pFrame = nullptr;
bool flag = decoder.get<H264Decoder>().inputVideo((unsigned char *) buffer->data(), buffer->size(), dts, &pFrame);
if (flag) {
displayer.get<YuvDisplayer>().displayYUV(pFrame);
}
});
AVFrame *pFrame = nullptr;
bool flag = decoder.get<H264Decoder>().inputVideo((unsigned char *) frame->data(), frame->size(), frame->stamp(), &pFrame);
if (flag) {
displayer.get<YuvDisplayer>().displayYUV(pFrame);
}
return true; return true;
}); });
})); }));
......
...@@ -157,7 +157,7 @@ void initEventListener() { ...@@ -157,7 +157,7 @@ void initEventListener() {
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) { NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) {
DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " "
<< args._param_strs; << args._param_strs;
invoker("");//鉴权成功 invoker("", true, true, false);//鉴权成功
//invoker("this is auth failed message");//鉴权失败 //invoker("this is auth failed message");//鉴权失败
}); });
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论