Commit 7805558d by baiyfcu Committed by GitHub

Merge pull request #13 from xiongziliang/master

update
parents 40afa204 003cd583
ZLToolKit @ 987683f1
Subproject commit 681be205ef164db08effd83f925bb750eb1fe149
Subproject commit 987683f1045613098e2bcd534bc90a13d16df8a4
media-server @ 24519a59
Subproject commit 97cf5e47a5af1ff3d4d187f3ebffd9254595df75
Subproject commit 24519a594c2c634b21fbe09fad28d54c4eba0885
......@@ -20,7 +20,7 @@
- RTSP player and pusher.
- RTP Transport : `rtp over udp` `rtp over tcp` `rtp over http` `rtp udp multicast` .
- Basic/Digest/Url Authentication.
- H265/H264/AAC codec.
- H264/H265/AAC/G711 codec.
- Recorded as mp4.
- Vod of mp4.
......@@ -28,7 +28,7 @@
- RTMP server,support player and pusher.
- RTMP player and pusher.
- Support HTTP-FLV player.
- H264/AAC codec.
- H264/H265/AAC/G711 codec.
- Recorded as flv or mp4.
- Vod of mp4.
- support [RTMP-H265](https://github.com/ksvc/FFmpeg/wiki)
......
......@@ -25,11 +25,14 @@ typedef void *mk_media;
* @param app 应用名,推荐为live
* @param stream 流id,例如camera
* @param duration 时长(单位秒),直播则为0
* @param rtsp_enabled 是否启用rtsp协议
* @param rtmp_enabled 是否启用rtmp协议
* @param hls_enabled 是否生成hls
* @param mp4_enabled 是否生成mp4
* @return 对象指针
*/
API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, float duration, int hls_enabled, int mp4_enabled);
API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, float duration,
int rtsp_enabled, int rtmp_enabled, int hls_enabled, int mp4_enabled);
/**
* 销毁媒体源
......@@ -38,42 +41,24 @@ API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app,
API_EXPORT void API_CALL mk_media_release(mk_media ctx);
/**
* 添加h264视频轨道
* 添加视频轨道
* @param ctx 对象指针
* @param track_id 0:CodecH264/1:CodecH265
* @param width 视频宽度
* @param height 视频高度
* @param fps 视频fps
*/
API_EXPORT void API_CALL mk_media_init_h264(mk_media ctx, int width, int height, int fps);
API_EXPORT void API_CALL mk_media_init_video(mk_media ctx, int track_id, int width, int height, int fps);
/**
* 添加h265视频轨道
* 添加频轨道
* @param ctx 对象指针
* @param width 视频宽度
* @param height 视频高度
* @param fps 视频fps
*/
API_EXPORT void API_CALL mk_media_init_h265(mk_media ctx, int width, int height, int fps);
/**
* 添加aac音频轨道
* @param ctx 对象指针
* @param channel 通道数
* @param sample_bit 采样位数,只支持16
* @param sample_rate 采样率
* @param profile aac编码profile,在不输入adts头时用于生产adts头
*/
API_EXPORT void API_CALL mk_media_init_aac(mk_media ctx, int channel, int sample_bit, int sample_rate, int profile);
/**
* 添加g711音频轨道
* @param ctx 对象指针
* @param au 1.G711A 2.G711U
* @param track_id 2:CodecAAC/3:CodecG711A/4:CodecG711U
* @param channel 通道数
* @param sample_bit 采样位数,只支持16
* @param sample_rate 采样率
*/
API_EXPORT void API_CALL mk_media_init_g711(mk_media ctx, int au, int sample_bit, int sample_rate);
API_EXPORT void API_CALL mk_media_init_audio(mk_media ctx, int track_id, int sample_rate, int channels, int sample_bit);
/**
* 初始化h264/h265/aac完毕后调用此函数,
......@@ -104,16 +89,6 @@ API_EXPORT void API_CALL mk_media_input_h264(mk_media ctx, void *data, int len,
API_EXPORT void API_CALL mk_media_input_h265(mk_media ctx, void *data, int len, uint32_t dts, uint32_t pts);
/**
* 输入单帧AAC音频
* @param ctx 对象指针
* @param data 单帧AAC数据
* @param len 单帧AAC数据字节数
* @param dts 时间戳,毫秒
* @param with_adts_header data中是否包含7个字节的adts头
*/
API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, uint32_t dts, int with_adts_header);
/**
* 输入单帧AAC音频(单独指定adts头)
* @param ctx 对象指针
* @param data 不包含adts头的单帧AAC数据
......@@ -121,7 +96,7 @@ API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, u
* @param dts 时间戳,毫秒
* @param adts adts头
*/
API_EXPORT void API_CALL mk_media_input_aac1(mk_media ctx, void *data, int len, uint32_t dts, void *adts);
API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, uint32_t dts, void *adts);
/**
* 输入单帧G711音频
......
......@@ -31,7 +31,7 @@ typedef void(API_CALL *on_mk_play_event)(void *user_data,int err_code,const char
* 收到音视频数据回调
* @param user_data 用户数据指针
* @param track_type 0:视频,1:音频
* @param codec_id 0:H264,1:H265,2:AAC
* @param codec_id 0:H264,1:H265,2:AAC 3.G711A 4.G711U
* @param data 数据指针
* @param len 数据长度
* @param dts 解码时间戳,单位毫秒
......@@ -98,13 +98,15 @@ API_EXPORT void API_CALL mk_player_set_on_shutdown(mk_player ctx, on_mk_play_eve
/**
* 设置音视频数据回调函数
* 该接口只能在播放成功事件触发后才能调用
* 该接口在播放成功事件触发后才有效
* @param ctx 播放器指针
* @param cb 回调函数指针,不得为null
* @param user_data 用户数据指针
*/
API_EXPORT void API_CALL mk_player_set_on_data(mk_player ctx, on_mk_play_data cb, void *user_data);
///////////////////////////获取音视频相关信息接口在播放成功回调触发后才有效///////////////////////////////
/**
* 获取视频codec_id -1:不存在 0:H264,1:H265,2:AAC 3.G711A 4.G711U
* @param ctx 播放器指针
......
......@@ -96,9 +96,11 @@ API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx){
return (*obj)->getChannel()->totalReaderCount();
}
API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, float duration, int hls_enabled, int mp4_enabled) {
API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, float duration,
int rtsp_enabled, int rtmp_enabled, int hls_enabled, int mp4_enabled) {
assert(vhost && app && stream);
MediaHelper::Ptr *obj(new MediaHelper::Ptr(new MediaHelper(vhost, app, stream, duration, true, true, hls_enabled, mp4_enabled)));
MediaHelper::Ptr *obj(new MediaHelper::Ptr(new MediaHelper(vhost, app, stream, duration,
rtsp_enabled, rtmp_enabled, hls_enabled, mp4_enabled)));
(*obj)->attachEvent();
return (mk_media) obj;
}
......@@ -109,48 +111,25 @@ API_EXPORT void API_CALL mk_media_release(mk_media ctx) {
delete obj;
}
API_EXPORT void API_CALL mk_media_init_h264(mk_media ctx, int width, int height, int frameRate) {
API_EXPORT void API_CALL mk_media_init_video(mk_media ctx, int track_id, int width, int height, int fps){
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
VideoInfo info;
info.iFrameRate = frameRate;
info.codecId = (CodecId)track_id;
info.iFrameRate = fps;
info.iWidth = width;
info.iHeight = height;
(*obj)->getChannel()->initVideo(info);
}
API_EXPORT void API_CALL mk_media_init_h265(mk_media ctx, int width, int height, int frameRate) {
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
VideoInfo info;
info.iFrameRate = frameRate;
info.iWidth = width;
info.iHeight = height;
(*obj)->getChannel()->initH265Video(info);
}
API_EXPORT void API_CALL mk_media_init_aac(mk_media ctx, int channel, int sample_bit, int sample_rate, int profile) {
API_EXPORT void API_CALL mk_media_init_audio(mk_media ctx, int track_id, int sample_rate, int channels, int sample_bit){
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
AudioInfo info;
info.codecId = (CodecId)track_id;
info.iSampleRate = sample_rate;
info.iChannel = channel;
info.iChannel = channels;
info.iSampleBit = sample_bit;
info.iProfile = profile;
(*obj)->getChannel()->initAudio(info);
}
API_EXPORT void API_CALL mk_media_init_g711(mk_media ctx, int au, int sample_bit, int sample_rate)
{
assert(ctx);
MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx;
AudioInfo info;
info.iSampleRate = sample_rate;
info.iChannel = 1;
info.iSampleBit = sample_bit;
info.iProfile = 0;
info.codecId = (CodecId)au;
(*obj)->getChannel()->initAudio(info);
}
......@@ -172,24 +151,14 @@ API_EXPORT void API_CALL mk_media_input_h265(mk_media ctx, void *data, int len,
(*obj)->getChannel()->inputH265((char *) data, len, dts, pts);
}
API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, uint32_t dts, int with_adts_header) {
assert(ctx && data && len > 0);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
(*obj)->getChannel()->inputAAC((char *) data, len, dts, with_adts_header);
}
API_EXPORT void API_CALL mk_media_input_aac1(mk_media ctx, void *data, int len, uint32_t dts, void *adts) {
API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, uint32_t dts, void *adts) {
assert(ctx && data && len > 0 && adts);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
(*obj)->getChannel()->inputAAC((char *) data, len, dts, (char *) adts);
}
API_EXPORT void API_CALL mk_media_input_g711(mk_media ctx, void* data, int len, uint32_t dts)
{
API_EXPORT void API_CALL mk_media_input_g711(mk_media ctx, void* data, int len, uint32_t dts){
assert(ctx && data && len > 0);
MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx;
(*obj)->getChannel()->inputG711((char*)data, len, dts);
}
......@@ -101,9 +101,7 @@ API_EXPORT void API_CALL mk_player_set_on_data(mk_player ctx, on_mk_play_data cb
});
}
API_EXPORT int API_CALL mk_player_video_codecId(mk_player ctx)
{
API_EXPORT int API_CALL mk_player_video_codecId(mk_player ctx){
assert(ctx);
MediaPlayer::Ptr& player = *((MediaPlayer::Ptr*)ctx);
auto track = dynamic_pointer_cast<VideoTrack>(player->getTrack(TrackVideo));
......@@ -131,9 +129,7 @@ API_EXPORT int API_CALL mk_player_video_fps(mk_player ctx) {
return track ? track->getVideoFps() : 0;
}
API_EXPORT int API_CALL mk_player_audio_codecId(mk_player ctx)
{
API_EXPORT int API_CALL mk_player_audio_codecId(mk_player ctx){
assert(ctx);
MediaPlayer::Ptr& player = *((MediaPlayer::Ptr*)ctx);
auto track = dynamic_pointer_cast<AudioTrack>(player->getTrack(TrackAudio));
......
......@@ -42,6 +42,9 @@ publishToRtxp=1
publishToHls=1
#是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置
publishToMP4=0
#合并写缓存大小(单位毫秒),合并写指服务器缓存一定的数据后才会一次性写入socket,这样能提高性能,但是会提高延时
#在开启低延时模式后,该参数不起作用
mergeWriteMS=300
[hls]
#hls写文件的buf大小,调整参数可以提高文件io性能
......
......@@ -38,11 +38,12 @@ using namespace mediakit;
namespace API {
typedef enum {
InvalidArgs = -300,
SqlFailed = -200,
AuthFailed = -100,
OtherFailed = -1,
Success = 0
Exception = -400,//代码抛异常
InvalidArgs = -300,//参数不合法
SqlFailed = -200,//sql执行失败
AuthFailed = -100,//鉴权失败
OtherFailed = -1,//业务代码执行失败,
Success = 0//执行成功
} ApiErr;
#define API_FIELD "api."
......@@ -154,7 +155,8 @@ static inline void addHttpListener(){
HttpSession::KeyValue headerOut;
auto allArgs = getAllArgs(parser);
HttpSession::KeyValue &headerIn = parser.getValues();
headerOut["Content-Type"] = "application/json; charset=utf-8";
GET_CONFIG(string,charSet,Http::kCharSet);
headerOut["Content-Type"] = StrPrinter << "application/json; charset=" << charSet;
if(api_debug){
auto newInvoker = [invoker,parser,allArgs](const string &codeOut,
const HttpSession::KeyValue &headerOut,
......@@ -207,7 +209,7 @@ static inline void addHttpListener(){
}
#endif// ENABLE_MYSQL
catch (std::exception &ex) {
val["code"] = API::OtherFailed;
val["code"] = API::Exception;
val["msg"] = ex.what();
invoker("200 OK", headerOut, val.toStyledString());
}
......@@ -447,9 +449,11 @@ void installWebApi() {
bool flag = src->close(allArgs["force"].as<bool>());
val["result"] = flag ? 0 : -1;
val["msg"] = flag ? "success" : "close failed";
val["code"] = API::OtherFailed;
}else{
val["result"] = -2;
val["msg"] = "can not find the stream";
val["code"] = API::OtherFailed;
}
});
......@@ -725,21 +729,27 @@ void installWebApi() {
api_regist1("/index/api/startRecord",[](API_ARGS1){
CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream");
val["result"] = Recorder::startRecord((Recorder::type) allArgs["type"].as<int>(),
auto result = Recorder::startRecord((Recorder::type) allArgs["type"].as<int>(),
allArgs["vhost"],
allArgs["app"],
allArgs["stream"],
allArgs["customized_path"]);
val["result"] = result;
val["code"] = result ? API::Success : API::OtherFailed;
val["msg"] = result ? "success" : "start record failed";
});
// 停止录制hls或MP4
api_regist1("/index/api/stopRecord",[](API_ARGS1){
CHECK_SECRET();
CHECK_ARGS("type","vhost","app","stream");
val["result"] = Recorder::stopRecord((Recorder::type) allArgs["type"].as<int>(),
auto result = Recorder::stopRecord((Recorder::type) allArgs["type"].as<int>(),
allArgs["vhost"],
allArgs["app"],
allArgs["stream"]);
val["result"] = result;
val["code"] = result ? API::Success : API::OtherFailed;
val["msg"] = result ? "success" : "stop record failed";
});
// 获取hls或MP4录制状态
......@@ -802,12 +812,10 @@ void installWebApi() {
api_regist1("/index/hook/on_play",[](API_ARGS1){
//开始播放事件
throw SuccessException();
});
api_regist1("/index/hook/on_flow_report",[](API_ARGS1){
//流量统计hook api
throw SuccessException();
});
api_regist1("/index/hook/on_rtsp_realm",[](API_ARGS1){
......@@ -827,7 +835,6 @@ void installWebApi() {
api_regist1("/index/hook/on_stream_changed",[](API_ARGS1){
//媒体注册或反注册事件
throw SuccessException();
});
......@@ -890,12 +897,10 @@ void installWebApi() {
api_regist1("/index/hook/on_record_mp4",[](API_ARGS1){
//录制mp4分片完毕事件
throw SuccessException();
});
api_regist1("/index/hook/on_shell_login",[](API_ARGS1){
//shell登录调试事件
throw SuccessException();
});
api_regist1("/index/hook/on_stream_none_reader",[](API_ARGS1){
......@@ -931,7 +936,6 @@ void installWebApi() {
api_regist1("/index/hook/on_server_started",[](API_ARGS1){
//服务器重启报告
throw SuccessException();
});
......
......@@ -8,13 +8,9 @@
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <stdio.h>
#include <stdio.h>
#include "Device.h"
#include "Util/logger.h"
#include "Util/util.h"
#include "Util/base64.h"
#include "Util/TimeTicker.h"
#include "Extension/AAC.h"
#include "Extension/G711.h"
#include "Extension/H264.h"
......@@ -24,15 +20,15 @@ using namespace toolkit;
namespace mediakit {
DevChannel::DevChannel(const string &strVhost,
const string &strApp,
const string &strId,
float fDuration,
bool bEanbleRtsp,
bool bEanbleRtmp,
bool bEanbleHls,
bool bEnableMp4) :
MultiMediaSourceMuxer(strVhost, strApp, strId, fDuration, bEanbleRtsp, bEanbleRtmp, bEanbleHls, bEnableMp4) {}
DevChannel::DevChannel(const string &vhost,
const string &app,
const string &stream_id,
float duration,
bool enable_rtsp,
bool enable_rtmp,
bool enable_hls,
bool enable_mp4) :
MultiMediaSourceMuxer(vhost, app, stream_id, duration, enable_rtsp, enable_rtmp, enable_hls, enable_mp4) {}
DevChannel::~DevChannel() {}
......@@ -68,14 +64,14 @@ void DevChannel::inputPCM(char* pcData, int iDataLen, uint32_t uiStamp) {
if (_pAacEnc) {
unsigned char *pucOut;
int iRet = _pAacEnc->inputData(pcData, iDataLen, &pucOut);
if (iRet > 0) {
inputAAC((char *) pucOut, iRet, uiStamp);
if (iRet > 7) {
inputAAC((char *) pucOut + 7, iRet - 7, uiStamp, (char *)pucOut);
}
}
}
#endif //ENABLE_FAAC
void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t dts,uint32_t pts) {
void DevChannel::inputH264(const char *data, int len, uint32_t dts, uint32_t pts) {
if(dts == 0){
dts = (uint32_t)_aTicker[0].elapsedTime();
}
......@@ -83,24 +79,27 @@ void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t dts,uint32
pts = dts;
}
int prefixeSize;
if (memcmp("\x00\x00\x00\x01", pcData, 4) == 0) {
if (memcmp("\x00\x00\x00\x01", data, 4) == 0) {
prefixeSize = 4;
} else if (memcmp("\x00\x00\x01", pcData, 3) == 0) {
} else if (memcmp("\x00\x00\x01", data, 3) == 0) {
prefixeSize = 3;
} else {
prefixeSize = 0;
}
//由于rtmp/hls/mp4需要缓存时间戳相同的帧,
//所以使用FrameNoCacheAble类型的帧反而会在转换成FrameCacheAble时多次内存拷贝
//在此处只拷贝一次,性能开销更低
H264Frame::Ptr frame = std::make_shared<H264Frame>();
frame->_dts = dts;
frame->_pts = pts;
frame->_buffer.assign("\x00\x00\x00\x01",4);
frame->_buffer.append(pcData + prefixeSize, iDataLen - prefixeSize);
frame->_buffer.append(data + prefixeSize, len - prefixeSize);
frame->_prefix_size = 4;
inputFrame(frame);
}
void DevChannel::inputH265(const char* pcData, int iDataLen, uint32_t dts,uint32_t pts) {
void DevChannel::inputH265(const char *data, int len, uint32_t dts, uint32_t pts) {
if(dts == 0){
dts = (uint32_t)_aTicker[0].elapsedTime();
}
......@@ -108,98 +107,81 @@ void DevChannel::inputH265(const char* pcData, int iDataLen, uint32_t dts,uint32
pts = dts;
}
int prefixeSize;
if (memcmp("\x00\x00\x00\x01", pcData, 4) == 0) {
if (memcmp("\x00\x00\x00\x01", data, 4) == 0) {
prefixeSize = 4;
} else if (memcmp("\x00\x00\x01", pcData, 3) == 0) {
} else if (memcmp("\x00\x00\x01", data, 3) == 0) {
prefixeSize = 3;
} else {
prefixeSize = 0;
}
//由于rtmp/hls/mp4需要缓存时间戳相同的帧,
//所以使用FrameNoCacheAble类型的帧反而会在转换成FrameCacheAble时多次内存拷贝
//在此处只拷贝一次,性能开销更低
H265Frame::Ptr frame = std::make_shared<H265Frame>();
frame->_dts = dts;
frame->_pts = pts;
frame->_buffer.assign("\x00\x00\x00\x01",4);
frame->_buffer.append(pcData + prefixeSize, iDataLen - prefixeSize);
frame->_buffer.append(data + prefixeSize, len - prefixeSize);
frame->_prefix_size = 4;
inputFrame(frame);
}
void DevChannel::inputAAC(const char* pcData, int iDataLen, uint32_t uiStamp,bool withAdtsHeader) {
if(withAdtsHeader){
inputAAC(pcData+7,iDataLen-7,uiStamp,pcData);
} else if(_audio) {
inputAAC(pcData,iDataLen,uiStamp,(char *)_adtsHeader);
}
}
class AACFrameCacheAble : public AACFrameNoCacheAble{
public:
template <typename ... ARGS>
AACFrameCacheAble(ARGS && ...args) : AACFrameNoCacheAble(std::forward<ARGS>(args)...){};
virtual ~AACFrameCacheAble() {
delete [] _ptr;
};
void DevChannel::inputAAC(const char *pcDataWithoutAdts,int iDataLen, uint32_t uiStamp,const char *pcAdtsHeader){
if(uiStamp == 0){
uiStamp = (uint32_t)_aTicker[1].elapsedTime();
}
if(pcAdtsHeader + 7 == pcDataWithoutAdts){
inputFrame(std::make_shared<AACFrameNoCacheAble>((char *)pcDataWithoutAdts - 7,iDataLen + 7,uiStamp,0,7));
} else {
char *dataWithAdts = new char[iDataLen + 7];
memcpy(dataWithAdts,pcAdtsHeader,7);
memcpy(dataWithAdts + 7 , pcDataWithoutAdts , iDataLen);
inputFrame(std::make_shared<AACFrameNoCacheAble>(dataWithAdts,iDataLen + 7,uiStamp,0,7));
delete [] dataWithAdts;
bool cacheAble() const override {
return true;
}
}
};
void DevChannel::inputG711(const char* pcData, int iDataLen, uint32_t uiStamp)
{
if (uiStamp == 0) {
uiStamp = (uint32_t)_aTicker[1].elapsedTime();
void DevChannel::inputAAC(const char *data_without_adts, int len, uint32_t dts, const char *adts_header){
if(dts == 0){
dts = (uint32_t)_aTicker[1].elapsedTime();
}
if(adts_header){
if(adts_header + 7 == data_without_adts){
//adts头和帧在一起
inputFrame(std::make_shared<AACFrameNoCacheAble>((char *)data_without_adts - 7, len + 7, dts, 0, 7));
}else{
//adts头和帧不在一起
char *dataWithAdts = new char[len + 7];
memcpy(dataWithAdts, adts_header, 7);
memcpy(dataWithAdts + 7 , data_without_adts , len);
inputFrame(std::make_shared<AACFrameCacheAble>(dataWithAdts, len + 7, dts, 0, 7));
}
}
inputFrame(std::make_shared<G711FrameNoCacheAble>(_audio->codecId, (char*)pcData, iDataLen, uiStamp, 0));
}
void DevChannel::initVideo(const VideoInfo& info) {
_video = std::make_shared<VideoInfo>(info);
addTrack(std::make_shared<H264Track>());
void DevChannel::inputG711(const char *data, int len, uint32_t dts){
if (dts == 0) {
dts = (uint32_t)_aTicker[1].elapsedTime();
}
inputFrame(std::make_shared<G711FrameNoCacheAble>(_audio->codecId, (char*)data, len, dts, 0));
}
void DevChannel::initH265Video(const VideoInfo &info){
void DevChannel::initVideo(const VideoInfo &info) {
_video = std::make_shared<VideoInfo>(info);
addTrack(std::make_shared<H265Track>());
switch (info.codecId){
case CodecH265 : addTrack(std::make_shared<H265Track>()); break;
case CodecH264 : addTrack(std::make_shared<H264Track>()); break;
default: WarnL << "不支持该类型的视频编码类型:" << info.codecId; break;
}
}
void DevChannel::initAudio(const AudioInfo& info) {
_audio = std::make_shared<AudioInfo>(info);
if (info.codecId == CodecAAC)
{
addTrack(std::make_shared<AACTrack>());
AACFrame adtsHeader;
adtsHeader.syncword = 0x0FFF;
adtsHeader.id = 0;
adtsHeader.layer = 0;
adtsHeader.protection_absent = 1;
adtsHeader.profile = info.iProfile;//audioObjectType - 1;
int i = 0;
for (auto rate : samplingFrequencyTable) {
if (rate == info.iSampleRate) {
adtsHeader.sf_index = i;
};
++i;
}
adtsHeader.private_bit = 0;
adtsHeader.channel_configuration = info.iChannel;
adtsHeader.original = 0;
adtsHeader.home = 0;
adtsHeader.copyright_identification_bit = 0;
adtsHeader.copyright_identification_start = 0;
adtsHeader.aac_frame_length = 7;
adtsHeader.adts_buffer_fullness = 2047;
adtsHeader.no_raw_data_blocks_in_frame = 0;
writeAdtsHeader(adtsHeader, _adtsHeader);
}
else if (info.codecId == CodecG711A || info.codecId == CodecG711U)
{
addTrack(std::make_shared<G711Track>(info.codecId, info.iSampleBit, info.iSampleRate));
void DevChannel::initAudio(const AudioInfo &info) {
_audio = std::make_shared<AudioInfo>(info);
switch (info.codecId) {
case CodecAAC : addTrack(std::make_shared<AACTrack>()); break;
case CodecG711A :
case CodecG711U : addTrack(std::make_shared<G711Track>(info.codecId, info.iSampleRate, info.iChannel, info.iSampleBit)); break;
default: WarnL << "不支持该类型的音频编码类型:" << info.codecId; break;
}
}
......
......@@ -30,105 +30,91 @@ using namespace toolkit;
#include "Codec/H264Encoder.h"
#endif //ENABLE_X264
namespace mediakit {
class VideoInfo {
public:
CodecId codecId = CodecH264;
int iWidth;
int iHeight;
float iFrameRate;
};
class AudioInfo {
public:
CodecId codecId;
CodecId codecId = CodecAAC;
int iChannel;
int iSampleBit;
int iSampleRate;
int iProfile;
};
/**
* 该类已经废弃,保留只为兼容旧代码,请直接使用MultiMediaSourceMuxer类!
* MultiMediaSourceMuxer类的包装,方便初学者使用
*/
class DevChannel : public MultiMediaSourceMuxer{
public:
typedef std::shared_ptr<DevChannel> Ptr;
//fDuration<=0为直播,否则为点播
DevChannel(const string &strVhost,
const string &strApp,
const string &strId,
float fDuration = 0,
bool bEanbleRtsp = true,
bool bEanbleRtmp = true,
bool bEanbleHls = true,
bool bEnableMp4 = false);
DevChannel(const string &vhost,
const string &app,
const string &stream_id,
float duration = 0,
bool enable_rtsp = true,
bool enable_rtmp = true,
bool enable_hls = true,
bool enable_mp4 = false);
virtual ~DevChannel();
/**
* 初始化h264视频Track
* 相当于MultiMediaSourceMuxer::addTrack(H264Track::Ptr );
* @param info
* 初始化视频Track
* 相当于MultiMediaSourceMuxer::addTrack(VideoTrack::Ptr );
* @param info 视频相关信息
*/
void initVideo(const VideoInfo &info);
/**
* 初始化h265视频Track
* @param info
*/
void initH265Video(const VideoInfo &info);
/**
* 初始化aac音频Track
* 相当于MultiMediaSourceMuxer::addTrack(AACTrack::Ptr );
* @param info
* 初始化音频Track
* 相当于MultiMediaSourceMuxer::addTrack(AudioTrack::Ptr );
* @param info 音频相关信息
*/
void initAudio(const AudioInfo &info);
/**
* 输入264帧
* @param pcData 264单帧数据指针
* @param iDataLen 数据指针长度
* @param data 264单帧数据指针
* @param len 数据指针长度
* @param dts 解码时间戳,单位毫秒;等于0时内部会自动生成时间戳
* @param pts 播放时间戳,单位毫秒;等于0时内部会赋值为dts
*/
void inputH264(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0);
void inputH264(const char *data, int len, uint32_t dts, uint32_t pts = 0);
/**
* 输入265帧
* @param pcData 265单帧数据指针
* @param iDataLen 数据指针长度
* @param data 265单帧数据指针
* @param len 数据指针长度
* @param dts 解码时间戳,单位毫秒;等于0时内部会自动生成时间戳
* @param pts 播放时间戳,单位毫秒;等于0时内部会赋值为dts
*/
void inputH265(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0);
void inputH265(const char *data, int len, uint32_t dts, uint32_t pts = 0);
/**
* 输入可能带adts头的aac帧
* @param pcDataWithAdts 可能带adts头的aac帧
* @param iDataLen 帧数据长度
* @param uiStamp 时间戳,单位毫秒,等于0时内部会自动生成时间戳
* @param withAdtsHeader 是否带adts头
* 输入aac帧
* @param data_without_adts 不带adts头的aac帧
* @param len 帧数据长度
* @param dts 时间戳,单位毫秒
* @param adts_header adts头
*/
void inputAAC(const char *pcDataWithAdts, int iDataLen, uint32_t uiStamp, bool withAdtsHeader = true);
/**
* 输入不带adts头的aac帧
* @param pcDataWithoutAdts 不带adts头的aac帧
* @param iDataLen 帧数据长度
* @param uiStamp 时间戳,单位毫秒
* @param pcAdtsHeader adts头
*/
void inputAAC(const char *pcDataWithoutAdts,int iDataLen, uint32_t uiStamp,const char *pcAdtsHeader);
void inputAAC(const char *data_without_adts, int len, uint32_t dts, const char *adts_header);
/**
* G711音频帧
* @param pcData 音频帧
* @param iDataLen 帧数据长度
* @param uiStamp 时间戳,单位毫秒
* @param data 音频帧
* @param len 帧数据长度
* @param dts 时间戳,单位毫秒
*/
void inputG711(const char* pcData, int iDataLen, uint32_t uiStamp);
void inputG711(const char* data, int len, uint32_t dts);
#ifdef ENABLE_X264
/**
* 输入yuv420p视频帧,内部会完成编码并调用inputH264方法
......@@ -151,6 +137,7 @@ public:
#endif //ENABLE_FAAC
private:
#ifdef ENABLE_X264
std::shared_ptr<H264Encoder> _pH264Enc;
#endif //ENABLE_X264
......@@ -160,9 +147,7 @@ private:
#endif //ENABLE_FAAC
std::shared_ptr<VideoInfo> _video;
std::shared_ptr<AudioInfo> _audio;
SmoothTicker _aTicker[2];
uint8_t _adtsHeader[7];
};
} /* namespace mediakit */
......
......@@ -34,23 +34,16 @@ void MediaSink::addTrack(const Track::Ptr &track_in) {
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
if (_allTrackReady) {
onTrackFrame(frame);
return;
}
else
{
if (frame->getTrackType() == TrackVideo)
{
checkTrackIfReady(nullptr);
if (_allTrackReady) {
onTrackFrame(frame);
}
else
{
ErrorL << " 还有track未准备好,丢帧 codecName: " << frame->getCodecName();
}
}else
ErrorL << " 还有track未准备好,丢帧 codecName: " << frame->getCodecName();
//还有track未准备好,如果是视频的话,如果直接丢帧可能导致丢失I帧
checkTrackIfReady(nullptr);
if (_allTrackReady) {
//运行至这里说明Track状态由未就绪切换为已就绪状态,那么这帧就不应该丢弃
onTrackFrame(frame);
} else if(frame->keyFrame()){
WarnL << "some track is unready,drop key frame of: " << frame->getCodecName();
}
}));
}
......@@ -70,8 +63,8 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) {
if (it == _track_map.end()) {
return;
}
it->second->inputFrame(frame);
checkTrackIfReady(it->second);
it->second->inputFrame(frame);
}
void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){
......@@ -140,7 +133,7 @@ void MediaSink::emitAllTrackReady() {
//移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second->ready()) {
WarnL << "该track长时间未被初始化,已忽略:" << it->second->getCodecName();
WarnL << "track not ready for a long time, ignored: " << it->second->getCodecName();
it = _track_map.erase(it);
continue;
}
......
......@@ -450,4 +450,50 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &
#endif //ENABLE_MP4
}
static bool isFlushAble_default(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
if (new_stamp < last_stamp) {
//时间戳回退(可能seek中)
return true;
}
if (!is_audio) {
//这是视频,时间戳发送变化或者缓存超过1024个
return last_stamp != new_stamp || cache_size >= 1024;
}
//这是音频,缓存超过100ms或者缓存个数超过10个
return new_stamp > last_stamp + 100 || cache_size > 10;
}
static bool isFlushAble_merge(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) {
if (new_stamp < last_stamp) {
//时间戳回退(可能seek中)
return true;
}
if(new_stamp > last_stamp + merge_ms){
//时间戳增量超过合并写阈值
return true;
}
if (!is_audio) {
//这是视频,缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题
//而且sendmsg接口一般最多只能发送1024个数据包
return cache_size >= 1024;
}
//这是音频,音频缓存超过20个
return cache_size > 20;
}
bool FlushPolicy::isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
GET_CONFIG(bool,ultraLowDelay, General::kUltraLowDelay);
GET_CONFIG(int,mergeWriteMS, General::kMergeWriteMS);
if(ultraLowDelay || mergeWriteMS <= 0){
//关闭了合并写或者合并写阈值小于等于0
return isFlushAble_default(_is_audio, last_stamp, new_stamp, cache_size);
}
return isFlushAble_merge(_is_audio, last_stamp, new_stamp, cache_size,mergeWriteMS);
}
} /* namespace mediakit */
\ No newline at end of file
......@@ -21,6 +21,9 @@
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
#include "Util/List.h"
#include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h"
#include "Extension/Track.h"
#include "Record/Recorder.h"
......@@ -153,6 +156,114 @@ private:
static recursive_mutex g_mtxMediaSrc;
};
///缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy(bool is_audio) {
_is_audio = is_audio;
};
~FlushPolicy() = default;
uint32_t getStamp(const RtpPacket::Ptr &packet) {
return packet->timeStamp;
}
uint32_t getStamp(const RtmpPacket::Ptr &packet) {
return packet->timeStamp;
}
bool isFlushAble(uint32_t last_stamp, uint32_t new_stamp, int cache_size);
private:
bool _is_audio;
};
/// 视频合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class VideoPacketCache {
public:
VideoPacketCache() : _policy(true) {
_cache = std::make_shared<packet_list>();
}
virtual ~VideoPacketCache() = default;
void inputVideo(const std::shared_ptr<packet> &rtp, bool key_pos) {
auto new_stamp = _policy.getStamp(rtp);
if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_stamp = new_stamp;
if (key_pos) {
_key_pos = key_pos;
}
}
virtual void onFlushVideo(std::shared_ptr<packet_list> &, bool key_pos) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushVideo(_cache, _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
private:
policy _policy;
std::shared_ptr<packet_list> _cache;
uint32_t _last_stamp = 0;
bool _key_pos = false;
};
/// 音频频合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
class AudioPacketCache {
public:
AudioPacketCache() : _policy(false) {
_cache = std::make_shared<packet_list>();
}
virtual ~AudioPacketCache() = default;
void inputAudio(const std::shared_ptr<packet> &rtp) {
auto new_stamp = _policy.getStamp(rtp);
if (_policy.isFlushAble(_last_stamp, new_stamp, _cache->size())) {
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_stamp = new_stamp;
}
virtual void onFlushAudio(std::shared_ptr<packet_list> &) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushAudio(_cache);
_cache = std::make_shared<packet_list>();
}
private:
policy _policy;
std::shared_ptr<packet_list> _cache;
uint32_t _last_stamp = 0;
};
} /* namespace mediakit */
......
......@@ -67,6 +67,7 @@ const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay";
const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp";
const string kPublishToHls = GENERAL_FIELD"publishToHls";
const string kPublishToMP4 = GENERAL_FIELD"publishToMP4";
const string kMergeWriteMS = GENERAL_FIELD"mergeWriteMS";
onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024;
......@@ -79,6 +80,7 @@ onceToken token([](){
mINI::Instance()[kPublishToRtxp] = 1;
mINI::Instance()[kPublishToHls] = 1;
mINI::Instance()[kPublishToMP4] = 0;
mINI::Instance()[kMergeWriteMS] = 300;
},nullptr);
}//namespace General
......@@ -286,6 +288,8 @@ const string kTimeoutMS = "protocol_timeout_ms";
const string kMediaTimeoutMS = "media_timeout_ms";
const string kBeatIntervalMS = "beat_interval_ms";
const string kMaxAnalysisMS = "max_analysis_ms";
const string kBenchmarkMode = "benchmark_mode";
}
} // namespace mediakit
......
......@@ -173,6 +173,9 @@ extern const string kPublishToRtxp ;
extern const string kPublishToHls ;
//是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置
extern const string kPublishToMP4 ;
//合并写缓存大小(单位毫秒),合并写指服务器缓存一定的数据后才会一次性写入socket,这样能提高性能,但是会提高延时
//在开启低延时模式后,该参数不起作用
extern const string kMergeWriteMS ;
}//namespace General
......@@ -315,6 +318,8 @@ extern const string kMediaTimeoutMS;
extern const string kBeatIntervalMS;
//Track编码格式探测最大时间,单位毫秒,默认2000
extern const string kMaxAnalysisMS;
//是否为性能测试模式,性能测试模式开启后不会解析rtp或rtmp包
extern const string kBenchmarkMode;
}
} // namespace mediakit
......
......@@ -98,10 +98,9 @@ void getAACInfo(const AACFrame &adts,int &iSampleRate,int &iChannel){
iChannel = adts.channel_configuration;
}
Sdp::Ptr AACTrack::getSdp() {
if(!ready()){
WarnL << "AAC Track未准备好";
WarnL << getCodecName() << " Track未准备好";
return nullptr;
}
return std::make_shared<AACSdp>(getAacCfg(),getAudioSampleRate());
......
......@@ -102,10 +102,12 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) {
RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmpPkt->strBuf.clear();
//////////header
//header
uint8_t is_config = false;
rtmpPkt->strBuf.push_back(_ui8AudioFlags);
rtmpPkt->strBuf.push_back(_audio_flv_flags);
rtmpPkt->strBuf.push_back(!is_config);
//aac data
rtmpPkt->strBuf.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize());
rtmpPkt->bodySize = rtmpPkt->strBuf.size();
......@@ -115,45 +117,18 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) {
rtmpPkt->typeId = MSG_AUDIO;
RtmpCodec::inputRtmp(rtmpPkt, false);
}
}
void AACRtmpEncoder::makeAudioConfigPkt() {
makeAdtsHeader(_aac_cfg,*_adts);
int iSampleRate , iChannel , iSampleBit = 16;
getAACInfo(*_adts,iSampleRate,iChannel);
uint8_t flvStereoOrMono = (iChannel > 1);
uint8_t flvSampleRate;
switch (iSampleRate) {
case 48000:
case 44100:
flvSampleRate = 3;
break;
case 24000:
case 22050:
flvSampleRate = 2;
break;
case 12000:
case 11025:
flvSampleRate = 1;
break;
default:
flvSampleRate = 0;
break;
}
uint8_t flvSampleBit = iSampleBit == 16;
uint8_t flvAudioType = FLV_CODEC_AAC;
_ui8AudioFlags = (flvAudioType << 4) | (flvSampleRate << 2) | (flvSampleBit << 1) | flvStereoOrMono;
_audio_flv_flags = getAudioRtmpFlags(std::make_shared<AACTrack>(_aac_cfg));
RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmpPkt->strBuf.clear();
//////////header
//header
uint8_t is_config = true;
rtmpPkt->strBuf.push_back(_ui8AudioFlags);
rtmpPkt->strBuf.push_back(_audio_flv_flags);
rtmpPkt->strBuf.push_back(!is_config);
//aac config
rtmpPkt->strBuf.append(_aac_cfg);
rtmpPkt->bodySize = rtmpPkt->strBuf.size();
......
......@@ -79,7 +79,7 @@ public:
private:
void makeAudioConfigPkt();
private:
uint8_t _ui8AudioFlags;
uint8_t _audio_flv_flags;
AACTrack::Ptr _track;
};
......
......@@ -48,11 +48,11 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
}
if (strcasecmp(track->_codec.data(), "PCMA") == 0) {
return std::make_shared<G711Track>(CodecG711A);
return std::make_shared<G711Track>(CodecG711A, track->_samplerate, track->_channel, 16);
}
if (strcasecmp(track->_codec.data(), "PCMU") == 0) {
return std::make_shared<G711Track>(CodecG711U);
return std::make_shared<G711Track>(CodecG711U, track->_samplerate, track->_channel, 16);
}
if (strcasecmp(track->_codec.data(), "h264") == 0) {
......@@ -84,34 +84,18 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
return std::make_shared<H265Track>(vps,sps,pps,0,0,0);
}
//可以根据传统的payload type 获取编码类型以及采样率等信息
CodecId codec_id = RtpPayload::getCodecId(track->_pt);
switch (codec_id){
case CodecG711A :
case CodecG711U : return std::make_shared<G711Track>(codec_id, track->_samplerate, track->_channel, 16);
default : break;
}
WarnL << "暂不支持该sdp:" << track->getName();
return nullptr;
}
Track::Ptr Factory::getTrackByCodecId(CodecId codecId) {
switch (codecId){
case CodecH264:{
return std::make_shared<H264Track>();
}
case CodecH265:{
return std::make_shared<H265Track>();
}
case CodecAAC:{
return std::make_shared<AACTrack>();
}
case CodecG711A: {
return std::make_shared<G711Track>(CodecG711A);
}
case CodecG711U: {
return std::make_shared<G711Track>(CodecG711U);
}
default:
WarnL << "暂不支持该CodecId:" << codecId;
return nullptr;
}
}
RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) {
GET_CONFIG(uint32_t,audio_mtu,Rtp::kAudioMtuSize);
GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize);
......@@ -135,59 +119,29 @@ RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) {
auto interleaved = sdp->getTrackType() * 2;
auto codec_id = sdp->getCodecId();
switch (codec_id){
case CodecH264:
return std::make_shared<H264RtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecH265:
return std::make_shared<H265RtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecAAC:
return std::make_shared<AACRtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecG711A:
case CodecG711U:
return std::make_shared<G711RtpEncoder>(ssrc, mtu, sample_rate, pt, interleaved);
default:
WarnL << "暂不支持该CodecId:" << codec_id;
return nullptr;
case CodecH264 : return std::make_shared<H264RtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecH265 : return std::make_shared<H265RtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecAAC : return std::make_shared<AACRtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecG711A :
case CodecG711U : return std::make_shared<G711RtpEncoder>(ssrc, mtu, sample_rate, pt, interleaved);
default : WarnL << "暂不支持该CodecId:" << codec_id; return nullptr;
}
}
RtpCodec::Ptr Factory::getRtpDecoderByTrack(const Track::Ptr &track) {
switch (track->getCodecId()){
case CodecH264:
return std::make_shared<H264RtpDecoder>();
case CodecH265:
return std::make_shared<H265RtpDecoder>();
case CodecAAC:
return std::make_shared<AACRtpDecoder>(track->clone());
case CodecG711A:
case CodecG711U:
return std::make_shared<G711RtpDecoder>(track->clone());
default:
WarnL << "暂不支持该CodecId:" << track->getCodecName();
return nullptr;
case CodecH264 : return std::make_shared<H264RtpDecoder>();
case CodecH265 : return std::make_shared<H265RtpDecoder>();
case CodecAAC : return std::make_shared<AACRtpDecoder>(track->clone());
case CodecG711A :
case CodecG711U : return std::make_shared<G711RtpDecoder>(track->clone());
default : WarnL << "暂不支持该CodecId:" << track->getCodecName(); return nullptr;
}
}
/////////////////////////////rtmp相关///////////////////////////////////////////
Track::Ptr Factory::getVideoTrackByAmf(const AMFValue &amf) {
CodecId codecId = getCodecIdByAmf(amf);
if(codecId == CodecInvalid){
return nullptr;
}
return getTrackByCodecId(codecId);
}
mediakit::Track::Ptr Factory::getAudioTrackByAmf(const AMFValue& amf)
{
CodecId codecId = getAudioCodecIdByAmf(amf);
if (codecId == CodecInvalid) {
return nullptr;
}
return getTrackByCodecId(codecId);
}
CodecId Factory::getCodecIdByAmf(const AMFValue &val){
static CodecId getVideoCodecIdByAmf(const AMFValue &val){
if (val.type() == AMF_STRING){
auto str = val.as_string();
if(str == "avc1"){
......@@ -209,20 +163,34 @@ CodecId Factory::getCodecIdByAmf(const AMFValue &val){
case FLV_CODEC_H264: return CodecH264;
case FLV_CODEC_AAC: return CodecAAC;
case FLV_CODEC_H265: return CodecH265;
default:
WarnL << "暂不支持该Amf:" << type_id;
return CodecInvalid;
default : WarnL << "暂不支持该Amf:" << type_id; return CodecInvalid;
}
}else{
WarnL << "Metadata不存在相应的Track";
}
return CodecInvalid;
}
CodecId Factory::getAudioCodecIdByAmf(const AMFValue& val)
{
Track::Ptr getTrackByCodecId(CodecId codecId, int sample_rate = 0, int channels = 0, int sample_bit = 0) {
switch (codecId){
case CodecH264 : return std::make_shared<H264Track>();
case CodecH265 : return std::make_shared<H265Track>();
case CodecAAC : return std::make_shared<AACTrack>();
case CodecG711A :
case CodecG711U : return (sample_rate && channels && sample_bit) ? std::make_shared<G711Track>(codecId, sample_rate, channels, sample_bit) : nullptr;
default : WarnL << "暂不支持该CodecId:" << codecId; return nullptr;
}
}
Track::Ptr Factory::getVideoTrackByAmf(const AMFValue &amf) {
CodecId codecId = getVideoCodecIdByAmf(amf);
if(codecId == CodecInvalid){
return nullptr;
}
return getTrackByCodecId(codecId);
}
static CodecId getAudioCodecIdByAmf(const AMFValue &val) {
if (val.type() == AMF_STRING) {
auto str = val.as_string();
if (str == "mp4a") {
......@@ -235,41 +203,38 @@ CodecId Factory::getAudioCodecIdByAmf(const AMFValue& val)
if (val.type() != AMF_NULL) {
auto type_id = val.as_integer();
switch (type_id) {
case FLV_CODEC_AAC: return CodecAAC;
case FLV_CODEC_G711A: return CodecG711A;
case FLV_CODEC_G711U: return CodecG711U;
default:
WarnL << "暂不支持该Amf:" << type_id;
return CodecInvalid;
case FLV_CODEC_AAC : return CodecAAC;
case FLV_CODEC_G711A : return CodecG711A;
case FLV_CODEC_G711U : return CodecG711U;
default : WarnL << "暂不支持该Amf:" << type_id; return CodecInvalid;
}
}
else {
WarnL << "Metadata不存在相应的Track";
}
return CodecInvalid;
}
Track::Ptr Factory::getAudioTrackByAmf(const AMFValue& amf, int sample_rate, int channels, int sample_bit){
CodecId codecId = getAudioCodecIdByAmf(amf);
if (codecId == CodecInvalid) {
return nullptr;
}
return getTrackByCodecId(codecId, sample_rate, channels, sample_bit);
}
RtmpCodec::Ptr Factory::getRtmpCodecByTrack(const Track::Ptr &track) {
switch (track->getCodecId()){
case CodecH264:
return std::make_shared<H264RtmpEncoder>(track);
case CodecAAC:
return std::make_shared<AACRtmpEncoder>(track);
case CodecH265:
return std::make_shared<H265RtmpEncoder>(track);
case CodecG711A:
case CodecG711U:
return std::make_shared<G711RtmpEncoder>(track);
default:
WarnL << "暂不支持该CodecId:" << track->getCodecName();
return nullptr;
case CodecH264 : return std::make_shared<H264RtmpEncoder>(track);
case CodecAAC : return std::make_shared<AACRtmpEncoder>(track);
case CodecH265 : return std::make_shared<H265RtmpEncoder>(track);
case CodecG711A :
case CodecG711U : return std::make_shared<G711RtmpEncoder>(track);
default : WarnL << "暂不支持该CodecId:" << track->getCodecName(); return nullptr;
}
}
AMFValue Factory::getAmfByCodecId(CodecId codecId) {
switch (codecId){
//此处用string标明rtmp编码类型目的是为了兼容某些android系统
case CodecAAC: return AMFValue("mp4a");
case CodecH264: return AMFValue("avc1");
case CodecH265: return AMFValue(FLV_CODEC_H265);
......@@ -279,6 +244,5 @@ AMFValue Factory::getAmfByCodecId(CodecId codecId) {
}
}
}//namespace mediakit
......@@ -24,14 +24,6 @@ namespace mediakit{
class Factory {
public:
/**
* 根据CodecId获取Track,该Track的ready()状态一般都为false
* @param codecId 编解码器id
* @return
*/
static Track::Ptr getTrackByCodecId(CodecId codecId);
////////////////////////////////rtsp相关//////////////////////////////////
/**
* 根据sdp生成Track对象
......@@ -41,14 +33,11 @@ public:
/**
* 根据sdp生成rtp编码器
* @param sdp sdp对象
* @return
*/
static RtpCodec::Ptr getRtpEncoderBySdp(const Sdp::Ptr &sdp);
/**
* 根据Track生成Rtp解包器
* @param track
* @return
*/
static RtpCodec::Ptr getRtpDecoderByTrack(const Track::Ptr &track);
......@@ -58,43 +47,23 @@ public:
/**
* 根据amf对象获取视频相应的Track
* @param amf rtmp metadata中的videocodecid的值
* @return
*/
static Track::Ptr getVideoTrackByAmf(const AMFValue &amf);
/**
* 根据amf对象获取音频相应的Track
* @param amf rtmp metadata中的audiocodecid的值
* @return
*/
static Track::Ptr getAudioTrackByAmf(const AMFValue& amf);
/**
* 根据amf对象获取相应的CodecId
* @param val rtmp metadata中的videocodecid或audiocodecid的值
* @return
*/
static CodecId getCodecIdByAmf(const AMFValue &val);
/**
* 根据amf对象获取音频相应的CodecId
* @param val rtmp metadata中的audiocodecid的值
* @return
*/
static CodecId getAudioCodecIdByAmf(const AMFValue& val);
static Track::Ptr getAudioTrackByAmf(const AMFValue& amf, int sample_rate, int channels, int sample_bit);
/**
* 根据Track获取Rtmp的编解码器
* @param track 媒体描述对象
* @return
*/
static RtmpCodec::Ptr getRtmpCodecByTrack(const Track::Ptr &track);
/**
* 根据codecId获取rtmp的codec描述
* @param codecId
* @return
*/
static AMFValue getAmfByCodecId(CodecId codecId);
};
......
......@@ -12,13 +12,12 @@
namespace mediakit{
Sdp::Ptr G711Track::getSdp() {
if(!ready()){
WarnL << getCodecName() << " Track未准备好";
return nullptr;
}
return std::make_shared<G711Sdp>(getCodecId(), getAudioSampleRate(), getCodecId() == CodecG711A ? 8 : 0, getAudioSampleBit());
return std::make_shared<G711Sdp>(getCodecId(), getAudioSampleRate(), getAudioChannel());
}
}//namespace mediakit
......
......@@ -16,40 +16,27 @@
namespace mediakit{
class G711Frame;
unsigned const samplingFrequencyTableG711[16] = { 96000, 88200,
64000, 48000,
44100, 32000,
24000, 22050,
16000, 12000,
11025, 8000,
7350, 0, 0, 0 };
void makeAdtsHeader(const string &strAudioCfg,G711Frame &adts);
void writeAdtsHeader(const G711Frame &adts, uint8_t *pcAdts) ;
string makeG711AdtsConfig(const uint8_t *pcAdts);
void getAACInfo(const G711Frame &adts,int &iSampleRate,int &iChannel);
/**
* aac帧,包含adts头
* G711帧
*/
class G711Frame : public Frame {
public:
typedef std::shared_ptr<G711Frame> Ptr;
char *data() const override{
return (char *)buffer;
return (char *)buffer.data();
}
uint32_t size() const override {
return frameLength;
return buffer.size();
}
uint32_t dts() const override {
return timeStamp;
}
uint32_t prefixSize() const override{
return iPrefixSize;
return 0;
}
TrackType getTrackType() const override{
......@@ -69,17 +56,15 @@ public:
}
public:
CodecId _codecId = CodecG711A;
unsigned int frameLength; // 一个帧的长度包括 raw data block
unsigned char buffer[2 * 1024 + 7];
string buffer;
uint32_t timeStamp;
uint32_t iPrefixSize = 0;
} ;
class G711FrameNoCacheAble : public FrameNoCacheAble {
public:
typedef std::shared_ptr<G711FrameNoCacheAble> Ptr;
G711FrameNoCacheAble(CodecId codecId, char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 7){
G711FrameNoCacheAble(CodecId codecId, char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 0){
_codecId = codecId;
_ptr = ptr;
_size = size;
......@@ -105,91 +90,61 @@ public:
private:
CodecId _codecId;
} ;
};
/**
* g711音频通道
* G711音频通道
*/
class G711Track : public AudioTrack{
public:
typedef std::shared_ptr<G711Track> Ptr;
/**
* 延后获取adts头信息
* 在随后的inputFrame中获取adts头信息
*/
G711Track(){}
/**
* G711A G711U
*/
G711Track(CodecId codecId, int sampleBit = 16, int sampleRate = 8000){
G711Track(CodecId codecId,int sample_rate, int channels, int sample_bit){
_codecid = codecId;
_sampleBit = sampleBit;
_sampleRate = sampleRate;
onReady();
_sample_rate = sample_rate;
_channels = channels;
_sample_bit = sample_bit;
}
/**
* 返回编码类型
* @return
*/
CodecId getCodecId() const override{
return _codecid;
}
/**
* 在获取aac_cfg前是无效的Track
* @return
* 是否已经初始化
*/
bool ready() override {
return true;
}
/**
* 返回音频采样率
* @return
*/
* 返回音频采样率
*/
int getAudioSampleRate() const override{
return _sampleRate;
return _sample_rate;
}
/**
* 返回音频采样位数,一般为16或8
* @return
*/
int getAudioSampleBit() const override{
return _sampleBit;
return _sample_bit;
}
/**
* 返回音频通道数
* @return
*/
int getAudioChannel() const override{
return _channel;
return _channels;
}
/**
* 输入数据帧,并获取aac_cfg
* @param frame 数据帧
*/
void inputFrame(const Frame::Ptr &frame) override{
AudioTrack::inputFrame(frame);
}
private:
/**
*
*/
void onReady(){
/*
if(_cfg.size() < 2){
return;
}
G711Frame aacFrame;
makeAdtsHeader(_cfg,aacFrame);
getAACInfo(aacFrame,_sampleRate,_channel);*/
}
Track::Ptr clone() override {
return std::make_shared<std::remove_reference<decltype(*this)>::type >(*this);
}
......@@ -197,34 +152,31 @@ private:
//生成sdp
Sdp::Ptr getSdp() override ;
private:
string _cfg;
CodecId _codecid = CodecG711A;
int _sampleRate = 8000;
int _sampleBit = 16;
int _channel = 1;
CodecId _codecid;
int _sample_rate;
int _channels;
int _sample_bit;
};
/**
* aac类型SDP
*/
* G711类型SDP
*/
class G711Sdp : public Sdp {
public:
/**
*
* @param aac_codecId G711A G711U
* G711采样率固定为8000
* @param codecId G711A G711U
* @param sample_rate 音频采样率
* @param playload_type rtp playload type 默认0为G711U, 8为G711A
* @param playload_type rtp playload
* @param bitrate 比特率
*/
G711Sdp(CodecId codecId,
int sample_rate,
int playload_type = 0,
int bitrate = 128) : Sdp(sample_rate,playload_type), _codecId(codecId){
int sample_rate,
int channels,
int playload_type = 98,
int bitrate = 128) : Sdp(sample_rate,playload_type), _codecId(codecId){
_printer << "m=audio 0 RTP/AVP " << playload_type << "\r\n";
//_printer << "b=AS:" << bitrate << "\r\n";
_printer << "a=rtpmap:" << playload_type << (codecId == CodecG711A ? " PCMA/" : " PCMU/") << sample_rate << "\r\n";
_printer << "a=rtpmap:" << playload_type << (codecId == CodecG711A ? " PCMA/" : " PCMU/") << sample_rate << "/" << channels << "\r\n";
_printer << "a=control:trackID=" << getTrackType() << "\r\n";
}
......@@ -235,12 +187,14 @@ public:
TrackType getTrackType() const override {
return TrackAudio;
}
CodecId getCodecId() const override {
return _codecId;
}
private:
_StrPrinter _printer;
CodecId _codecId = CodecG711A;
CodecId _codecId;
};
}//namespace mediakit
......
......@@ -12,57 +12,52 @@
namespace mediakit{
G711RtmpDecoder::G711RtmpDecoder() {
_adts = obtainFrame();
G711RtmpDecoder::G711RtmpDecoder(CodecId codecId) {
_frame = obtainFrame();
_codecId = codecId;
}
G711Frame::Ptr G711RtmpDecoder::obtainFrame() {
//从缓存池重新申请对象,防止覆盖已经写入环形缓存的对象
auto frame = ResourcePoolHelper<G711Frame>::obtainObj();
frame->frameLength = 0;
frame->iPrefixSize = 0;
frame->buffer.clear();
frame->_codecId = _codecId;
return frame;
}
bool G711RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt, bool key_pos) {
onGetG711(pkt->strBuf.data() + 2, pkt->strBuf.size() - 2, pkt->timeStamp);
bool G711RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt, bool) {
//拷贝G711负载
_frame->buffer.assign(pkt->strBuf.data() + 1, pkt->strBuf.size() - 1);
_frame->timeStamp = pkt->timeStamp;
//写入环形缓存
RtmpCodec::inputFrame(_frame);
_frame = obtainFrame();
return false;
}
void G711RtmpDecoder::onGetG711(const char* pcData, int iLen, uint32_t ui32TimeStamp) {
if(iLen + 7 > sizeof(_adts->buffer)){
WarnL << "Illegal adts data, exceeding the length limit.";
return;
}
//拷贝aac负载
memcpy(_adts->buffer, pcData, iLen);
_adts->frameLength = iLen;
_adts->timeStamp = ui32TimeStamp;
//写入环形缓存
RtmpCodec::inputFrame(_adts);
_adts = obtainFrame();
}
/////////////////////////////////////////////////////////////////////////////////////
G711RtmpEncoder::G711RtmpEncoder(const Track::Ptr &track) {
_track = dynamic_pointer_cast<G711Track>(track);
G711RtmpEncoder::G711RtmpEncoder(const Track::Ptr &track) : G711RtmpDecoder(track->getCodecId()) {
_audio_flv_flags = getAudioRtmpFlags(track);
}
void G711RtmpEncoder::inputFrame(const Frame::Ptr& frame) {
void G711RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
if(!_audio_flv_flags){
return;
}
RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmpPkt->strBuf.clear();
rtmpPkt->strBuf.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize());
//header
rtmpPkt->strBuf.push_back(_audio_flv_flags);
//g711 data
rtmpPkt->strBuf.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize());
rtmpPkt->bodySize = rtmpPkt->strBuf.size();
rtmpPkt->chunkId = CHUNK_AUDIO;
rtmpPkt->streamId = STREAM_MEDIA;
rtmpPkt->timeStamp = frame->dts();
rtmpPkt->typeId = MSG_AUDIO;
RtmpCodec::inputRtmp(rtmpPkt, false);
}
}//namespace mediakit
\ No newline at end of file
......@@ -17,13 +17,13 @@
namespace mediakit{
/**
* G711 Rtmp转adts
* G711 Rtmp转G711 Frame
*/
class G711RtmpDecoder : public RtmpCodec , public ResourcePoolHelper<G711Frame> {
public:
typedef std::shared_ptr<G711RtmpDecoder> Ptr;
G711RtmpDecoder();
G711RtmpDecoder(CodecId codecId);
~G711RtmpDecoder() {}
/**
......@@ -37,48 +37,32 @@ public:
return TrackAudio;
}
void setCodecId(CodecId codecId)
{
_codecid = codecId;
}
CodecId getCodecId() const override{
return _codecid;
return _codecId;
}
protected:
void onGetG711(const char* pcData, int iLen, uint32_t ui32TimeStamp);
private:
G711Frame::Ptr obtainFrame();
protected:
G711Frame::Ptr _adts;
CodecId _codecid = CodecInvalid;
private:
G711Frame::Ptr _frame;
CodecId _codecId;
};
/**
* aac adts转Rtmp
* G711 RTMP打包
*/
class G711RtmpEncoder : public G711RtmpDecoder , public ResourcePoolHelper<RtmpPacket> {
public:
typedef std::shared_ptr<G711RtmpEncoder> Ptr;
/**
* 构造函数,track可以为空,此时则在inputFrame时输入adts头
* 如果track不为空且包含adts头相关信息,
* 那么inputFrame时可以不输入adts头
* @param track
*/
G711RtmpEncoder(const Track::Ptr &track);
~G711RtmpEncoder() {}
/**
* 输入aac 数据,可以不带adts头
* @param frame aac数据
* 输入G711 数据
*/
void inputFrame(const Frame::Ptr &frame) override;
private:
G711Track::Ptr _track;
uint8_t _audio_flv_flags = 0;
};
}//namespace mediakit
......
......@@ -12,16 +12,63 @@
namespace mediakit{
G711RtpDecoder::G711RtpDecoder(const Track::Ptr &track){
_codecid = track->getCodecId();
_frame = obtainFrame();
}
G711Frame::Ptr G711RtpDecoder::obtainFrame() {
//从缓存池重新申请对象,防止覆盖已经写入环形缓存的对象
auto frame = ResourcePoolHelper<G711Frame>::obtainObj();
frame->buffer.clear();
frame->_codecId = _codecid;
frame->timeStamp = 0;
return frame;
}
bool G711RtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool) {
// 获取rtp数据长度
int length = rtppack->size() - rtppack->offset;
// 获取rtp数据
const char *rtp_packet_buf = rtppack->data() + rtppack->offset;
if (rtppack->timeStamp != _frame->timeStamp) {
//时间戳变更,清空上一帧
onGetG711(_frame);
}
//追加数据
_frame->buffer.append(rtp_packet_buf, length);
//赋值时间戳
_frame->timeStamp = rtppack->timeStamp;
if (rtppack->mark || _frame->buffer.size() > 10 * 1024) {
//标记为mark时,或者内存快溢出时,我们认为这是该帧最后一个包
onGetG711(_frame);
}
return false;
}
void G711RtpDecoder::onGetG711(const G711Frame::Ptr &frame) {
if(!frame->buffer.empty()){
//写入环形缓存
RtpCodec::inputFrame(frame);
_frame = obtainFrame();
}
}
/////////////////////////////////////////////////////////////////////////////////////
G711RtpEncoder::G711RtpEncoder(uint32_t ui32Ssrc,
uint32_t ui32MtuSize,
uint32_t ui32SampleRate,
uint8_t ui8PlayloadType,
uint8_t ui8Interleaved) :
uint32_t ui32MtuSize,
uint32_t ui32SampleRate,
uint8_t ui8PlayloadType,
uint8_t ui8Interleaved) :
RtpInfo(ui32Ssrc,
ui32MtuSize,
ui32SampleRate,
ui8PlayloadType,
ui8Interleaved){
ui8Interleaved) {
}
void G711RtpEncoder::inputFrame(const Frame::Ptr &frame) {
......@@ -45,56 +92,9 @@ void G711RtpEncoder::inputFrame(const Frame::Ptr &frame) {
}
void G711RtpEncoder::makeG711Rtp(const void *data, unsigned int len, bool mark, uint32_t uiStamp) {
RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp), false);
RtpCodec::inputRtp(makeRtp(getTrackType(), data, len, mark, uiStamp), false);
}
/////////////////////////////////////////////////////////////////////////////////////
G711RtpDecoder::G711RtpDecoder(const Track::Ptr &track){
auto g711Track = dynamic_pointer_cast<G711Track>(track);
_codecid = g711Track->getCodecId();
if(!g711Track || !g711Track->ready()){
WarnL << "该g711 track无效!";
}else{
}
_adts = obtainFrame();
}
G711RtpDecoder::G711RtpDecoder() {
_adts = obtainFrame();
}
G711Frame::Ptr G711RtpDecoder::obtainFrame() {
//从缓存池重新申请对象,防止覆盖已经写入环形缓存的对象
auto frame = ResourcePoolHelper<G711Frame>::obtainObj();
frame->frameLength = 0;
frame->iPrefixSize = 0;
return frame;
}
bool G711RtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) {
// 获取rtp数据长度
int length = rtppack->size() - rtppack->offset;
// 获取rtp数据
const uint8_t *rtp_packet_buf = (uint8_t *)rtppack->data() + rtppack->offset;
_adts->frameLength = length;
memcpy(_adts->buffer, rtp_packet_buf, length);
_adts->_codecId = _codecid;
if (rtppack->mark == true) {
_adts->timeStamp = rtppack->timeStamp;
onGetG711(_adts);
}
return false;
}
void G711RtpDecoder::onGetG711(const G711Frame::Ptr &frame) {
//写入环形缓存
RtpCodec::inputFrame(frame);
_adts = obtainFrame();
}
}//namespace mediakit
......
......@@ -10,12 +10,12 @@
#ifndef ZLMEDIAKIT_G711RTPCODEC_H
#define ZLMEDIAKIT_G711RTPCODEC_H
#include "Rtsp/RtpCodec.h"
#include "Extension/G711.h"
namespace mediakit{
/**
* G711 rtp转adts
* rtp转G711
*/
class G711RtpDecoder : public RtpCodec , public ResourcePoolHelper<G711Frame> {
public:
......@@ -34,20 +34,23 @@ public:
TrackType getTrackType() const override{
return TrackAudio;
}
CodecId getCodecId() const override{
return _codecid;
}
protected:
G711RtpDecoder();
G711RtpDecoder() {}
private:
void onGetG711(const G711Frame::Ptr &frame);
G711Frame::Ptr obtainFrame();
private:
G711Frame::Ptr _adts;
CodecId _codecid = CodecInvalid;
G711Frame::Ptr _frame;
CodecId _codecid;
};
/**
* g711 转rtp类
*/
......@@ -63,10 +66,10 @@ public:
* @param ui8Interleaved rtsp interleaved 值
*/
G711RtpEncoder(uint32_t ui32Ssrc,
uint32_t ui32MtuSize,
uint32_t ui32SampleRate,
uint8_t ui8PlayloadType = 0,
uint8_t ui8Interleaved = TrackAudio * 2);
uint32_t ui32MtuSize,
uint32_t ui32SampleRate,
uint8_t ui8PlayloadType = 0,
uint8_t ui8Interleaved = TrackAudio * 2);
~G711RtpEncoder() {}
/**
......@@ -75,8 +78,6 @@ public:
void inputFrame(const Frame::Ptr &frame) override;
private:
void makeG711Rtp(const void *pData, unsigned int uiLen, bool bMark, uint32_t uiStamp);
private:
unsigned char _aucSectionBuf[1600];
};
}//namespace mediakit
......
......@@ -62,7 +62,7 @@ void splitH264(const char *ptr, int len, const std::function<void(const char *,
Sdp::Ptr H264Track::getSdp() {
if(!ready()){
WarnL << "H264 Track未准备好";
WarnL << getCodecName() << " Track未准备好";
return nullptr;
}
return std::make_shared<H264Sdp>(getSps(),getPps());
......
......@@ -52,7 +52,7 @@ bool getHEVCInfo(const string &strVps, const string &strSps, int &iVideoWidth, i
Sdp::Ptr H265Track::getSdp() {
if(!ready()){
WarnL << "H265 Track未准备好";
WarnL << getCodecName() << " Track未准备好";
return nullptr;
}
return std::make_shared<H265Sdp>(getVps(),getSps(),getPps());
......
......@@ -615,20 +615,29 @@ void HttpSession::setSocketFlags(){
}
}
void HttpSession::onWrite(const Buffer::Ptr &buffer) {
void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) {
if(flush){
//需要flush那么一次刷新缓存
HttpSession::setSendFlushFlag(true);
}
_ticker.resetTime();
if(!_flv_over_websocket){
_ui64TotalBytes += buffer->size();
send(buffer);
return;
}else{
WebSocketHeader header;
header._fin = true;
header._reserved = 0;
header._opcode = WebSocketHeader::BINARY;
header._mask_flag = false;
WebSocketSplitter::encode(header,buffer);
}
WebSocketHeader header;
header._fin = true;
header._reserved = 0;
header._opcode = WebSocketHeader::BINARY;
header._mask_flag = false;
WebSocketSplitter::encode(header,buffer);
if(flush){
//本次刷新缓存后,下次不用刷新缓存
HttpSession::setSendFlushFlag(false);
}
}
void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){
......
......@@ -49,7 +49,7 @@ public:
static string urlDecode(const string &str);
protected:
//FlvMuxer override
void onWrite(const Buffer::Ptr &data) override ;
void onWrite(const Buffer::Ptr &data, bool flush) override ;
void onDetach() override;
std::shared_ptr<FlvMuxer> getSharedPtr() override;
......
......@@ -142,7 +142,9 @@ struct Context{
BufferRaw::Ptr buffer;
};
Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame) {
Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) {
keyFrame = false;
eof = false;
static mov_reader_onread mov_reader_onread = [](void *param, uint32_t track_id, const void *buffer, size_t bytes, int64_t pts, int64_t dts, int flags) {
Context *ctx = (Context *) param;
ctx->pts = pts;
......@@ -162,17 +164,21 @@ Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame) {
Context ctx = {this, 0};
auto ret = mov_reader_read2(_mov_reader.get(), mov_onalloc, mov_reader_onread, &ctx);
switch (ret) {
case 0 :
case 0 : {
eof = true;
return nullptr;
}
case 1 : {
keyFrame = ctx.flags & MOV_AV_FLAG_KEYFREAME;
return makeFrame(ctx.track_id, ctx.buffer, ctx.pts, ctx.dts);
}
default:
default : {
eof = true;
WarnL << "读取mp4文件数据失败:" << ret;
return nullptr;
}
}
}
......
......@@ -22,7 +22,7 @@ public:
MP4Demuxer(const char *file);
~MP4Demuxer() override;
int64_t seekTo(int64_t stamp_ms);
Frame::Ptr readFrame(bool &keyFrame);
Frame::Ptr readFrame(bool &keyFrame, bool &eof);
vector<Track::Ptr> getTracks(bool trackReady) const override ;
uint64_t getDurationMS() const;
private:
......
......@@ -48,11 +48,10 @@ MP4Reader::MP4Reader(const string &strVhost,const string &strApp, const string &
bool MP4Reader::readSample() {
bool keyFrame = false;
bool eof = false;
while (true) {
auto frame = _demuxer->readFrame(keyFrame);
while (!eof) {
auto frame = _demuxer->readFrame(keyFrame, eof);
if (!frame) {
eof = true;
break;
continue;
}
_mediaMuxer->inputFrame(frame);
if (frame->dts() > getCurrentStamp()) {
......@@ -122,11 +121,12 @@ bool MP4Reader::seekTo(uint32_t ui32Stamp){
}
//搜索到下一帧关键帧
bool keyFrame = false;
while (true) {
auto frame = _demuxer->readFrame(keyFrame);
bool eof = false;
while (!eof) {
auto frame = _demuxer->readFrame(keyFrame, eof);
if(!frame){
//文件读完了都未找到下一帧关键帧
return false;
continue;
}
if(keyFrame || frame->keyFrame() || frame->configFrame()){
//定位到key帧
......@@ -136,6 +136,7 @@ bool MP4Reader::seekTo(uint32_t ui32Stamp){
return true;
}
}
return false;
}
bool MP4Reader::close(MediaSource &sender,bool force){
......
......@@ -50,12 +50,17 @@ void FlvMuxer::start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &
}
strongSelf->onDetach();
});
_ring_reader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
_ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
strongSelf->onWriteRtmp(pkt);
int i = 0;
int size = pkt->size();
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
strongSelf->onWriteRtmp(rtmp, ++i == size);
});
});
}
......@@ -84,11 +89,11 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) {
}
//flv header
onWrite(std::make_shared<BufferRaw>(flv_file_header, sizeof(flv_file_header) - 1));
onWrite(std::make_shared<BufferRaw>(flv_file_header, sizeof(flv_file_header) - 1), false);
auto size = htonl(0);
//PreviousTagSize0 Always 0
onWrite(std::make_shared<BufferRaw>((char *)&size,4));
onWrite(std::make_shared<BufferRaw>((char *)&size,4), false);
auto &metadata = mediaSrc->getMetaData();
......@@ -97,12 +102,12 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) {
//其实metadata没什么用,有些推流器不产生metadata
AMFEncoder invoke;
invoke << "onMetaData" << metadata;
onWriteFlvTag(MSG_DATA, std::make_shared<BufferString>(invoke.data()), 0);
onWriteFlvTag(MSG_DATA, std::make_shared<BufferString>(invoke.data()), 0, false);
}
//config frame
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
onWriteRtmp(pkt);
onWriteRtmp(pkt, true);
});
}
......@@ -125,29 +130,29 @@ public:
#pragma pack(pop)
#endif // defined(_WIN32)
void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) {
onWriteFlvTag(pkt->typeId,pkt,ui32TimeStamp);
void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp , bool flush) {
onWriteFlvTag(pkt->typeId,pkt,ui32TimeStamp, flush);
}
void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp) {
void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush) {
RtmpTagHeader header;
header.type = ui8Type;
set_be24(header.data_size, buffer->size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
//tag header
onWrite(std::make_shared<BufferRaw>((char *)&header, sizeof(header)));
onWrite(std::make_shared<BufferRaw>((char *)&header, sizeof(header)), false);
//tag data
onWrite(buffer);
onWrite(buffer, false);
auto size = htonl((buffer->size() + sizeof(header)));
//PreviousTagSize
onWrite(std::make_shared<BufferRaw>((char *)&size,4));
onWrite(std::make_shared<BufferRaw>((char *)&size,4), flush);
}
void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt) {
void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush) {
int64_t dts_out;
_stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out);
onWriteFlvTag(pkt, dts_out);
onWriteFlvTag(pkt, dts_out,flush);
}
void FlvMuxer::stop() {
......@@ -187,7 +192,7 @@ void FlvRecorder::startRecord(const EventPoller::Ptr &poller,const RtmpMediaSour
start(poller,media);
}
void FlvRecorder::onWrite(const Buffer::Ptr &data) {
void FlvRecorder::onWrite(const Buffer::Ptr &data, bool flush) {
lock_guard<recursive_mutex> lck(_file_mtx);
if(_file){
fwrite(data->data(),data->size(),1,_file.get());
......
......@@ -27,14 +27,14 @@ public:
void stop();
protected:
void start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media);
virtual void onWrite(const Buffer::Ptr &data) = 0;
virtual void onWrite(const Buffer::Ptr &data, bool flush) = 0;
virtual void onDetach() = 0;
virtual std::shared_ptr<FlvMuxer> getSharedPtr() = 0;
private:
void onWriteFlvHeader(const RtmpMediaSource::Ptr &media);
void onWriteRtmp(const RtmpPacket::Ptr &pkt);
void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp);
void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp);
void onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush);
void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp, bool flush);
void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush);
private:
RtmpMediaSource::RingType::RingReader::Ptr _ring_reader;
//时间戳修整器
......@@ -50,7 +50,7 @@ public:
void startRecord(const EventPoller::Ptr &poller,const string &vhost,const string &app,const string &stream,const string &file_path);
void startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media,const string &file_path);
private:
virtual void onWrite(const Buffer::Ptr &data) override ;
virtual void onWrite(const Buffer::Ptr &data, bool flush) override ;
virtual void onDetach() override;
virtual std::shared_ptr<FlvMuxer> getSharedPtr() override;
private:
......
......@@ -36,11 +36,68 @@ AudioMeta::AudioMeta(const AudioTrack::Ptr &audio,int datarate){
_metadata.set("audiosamplesize", audio->getAudioSampleBit());
}
if(audio->getAudioChannel() > 0){
_metadata.set("audiochannels", audio->getAudioChannel());
_metadata.set("stereo", audio->getAudioChannel() > 1);
}
_codecId = audio->getCodecId();
_metadata.set("audiocodecid", Factory::getAmfByCodecId(_codecId));
}
uint8_t getAudioRtmpFlags(const Track::Ptr &track){
switch (track->getTrackType()){
case TrackAudio : {
auto audioTrack = dynamic_pointer_cast<AudioTrack>(track);
if (!audioTrack) {
WarnL << "获取AudioTrack失败";
return 0;
}
auto iSampleRate = audioTrack->getAudioSampleRate();
auto iChannel = audioTrack->getAudioChannel();
auto iSampleBit = audioTrack->getAudioSampleBit();
uint8_t flvAudioType ;
switch (track->getCodecId()){
case CodecG711A : flvAudioType = FLV_CODEC_G711A; break;
case CodecG711U : flvAudioType = FLV_CODEC_G711U; break;
case CodecAAC : {
flvAudioType = FLV_CODEC_AAC;
//aac不通过flags获取音频相关信息
iSampleRate = 44100;
iSampleBit = 16;
iChannel = 2;
break;
}
default: WarnL << "该编码格式不支持转换为RTMP: " << track->getCodecName(); return 0;
}
uint8_t flvSampleRate;
switch (iSampleRate) {
case 44100:
flvSampleRate = 3;
break;
case 22050:
flvSampleRate = 2;
break;
case 11025:
flvSampleRate = 1;
break;
case 16000: // nellymoser only
case 8000: // nellymoser only
case 5512: // not MP3
flvSampleRate = 0;
break;
default:
WarnL << "FLV does not support sample rate " << iSampleRate << " ,choose from (44100, 22050, 11025)";
return 0;
}
uint8_t flvStereoOrMono = (iChannel > 1);
uint8_t flvSampleBit = iSampleBit == 16;
return (flvAudioType << 4) | (flvSampleRate << 2) | (flvSampleBit << 1) | flvStereoOrMono;
}
default : return 0;
}
}
}//namespace mediakit
\ No newline at end of file
......@@ -161,27 +161,26 @@ public:
strBuf = std::move(that.strBuf);
}
bool isVideoKeyFrame() const {
return typeId == MSG_VIDEO && (uint8_t) strBuf[0] >> 4 == FLV_KEY_FRAME
&& (uint8_t) strBuf[1] == 1;
return typeId == MSG_VIDEO && (uint8_t) strBuf[0] >> 4 == FLV_KEY_FRAME && (uint8_t) strBuf[1] == 1;
}
bool isCfgFrame() const {
return (typeId == MSG_VIDEO || typeId == MSG_AUDIO)
&& (uint8_t) strBuf[1] == 0;
switch (typeId){
case MSG_VIDEO : return strBuf[1] == 0;
case MSG_AUDIO : {
switch (getMediaType()){
case FLV_CODEC_AAC : return strBuf[1] == 0;
default : return false;
}
}
default : return false;
}
}
int getMediaType() const {
switch (typeId) {
case MSG_VIDEO: {
return (uint8_t) strBuf[0] & 0x0F;
}
break;
case MSG_AUDIO: {
return (uint8_t) strBuf[0] >> 4;
}
break;
default:
break;
case MSG_VIDEO : return (uint8_t) strBuf[0] & 0x0F;
case MSG_AUDIO : return (uint8_t) strBuf[0] >> 4;
default : return 0;
}
return 0;
}
int getAudioSampleRate() const {
if (typeId != MSG_AUDIO) {
......@@ -209,8 +208,6 @@ public:
}
};
/**
* rtmp metadata基类,用于描述rtmp格式信息
*/
......@@ -316,6 +313,8 @@ private:
CodecId _codecId;
};
//根据音频track获取flags
uint8_t getAudioRtmpFlags(const Track::Ptr &track);
}//namespace mediakit
......
......@@ -15,14 +15,55 @@ namespace mediakit {
void RtmpDemuxer::loadMetaData(const AMFValue &val){
try {
makeVideoTrack(val["videocodecid"]);
makeAudioTrack(val["audiocodecid"]);
int audiosamplerate = 0;
int audiochannels = 0;
int audiosamplesize = 0;
const AMFValue *audiocodecid = nullptr;
const AMFValue *videocodecid = nullptr;
val.object_for_each([&](const string &key, const AMFValue &val) {
if (key == "duration") {
_fDuration = val.as_number();
return;
}
if(key == "audiosamplerate"){
audiosamplerate = val.as_integer();
return;
}
if(key == "audiosamplesize"){
audiosamplesize = val.as_integer();
return;
}
if(key == "stereo"){
audiochannels = val.as_boolean() ? 2 : 1;
return;
}
if(key == "videocodecid"){
//找到视频
videocodecid = &val;
return;
}
if(key == "audiocodecid"){
//找到音频
audiocodecid = &val;
return;
}
});
if(videocodecid){
//有视频
makeVideoTrack(*videocodecid);
}
if(audiocodecid){
//有音频
makeAudioTrack(*audiocodecid, audiosamplerate, audiochannels, audiosamplesize);
}
}catch (std::exception &ex){
WarnL << ex.what();
}
......@@ -46,7 +87,7 @@ bool RtmpDemuxer::inputRtmp(const RtmpPacket::Ptr &pkt) {
if(!_tryedGetAudioTrack) {
_tryedGetAudioTrack = true;
auto codec = AMFValue(pkt->getMediaType());
makeAudioTrack(codec);
makeAudioTrack(codec, pkt->getAudioSampleRate(), pkt->getAudioChannel(), pkt->getAudioSampleBit());
}
if(_audioRtmpDecoder){
_audioRtmpDecoder->inputRtmp(pkt, false);
......@@ -69,6 +110,7 @@ void RtmpDemuxer::makeVideoTrack(const AMFValue &videoCodec) {
//设置rtmp解码器代理,生成的frame写入该Track
_videoRtmpDecoder->addDelegate(_videoTrack);
onAddTrack(_videoTrack);
_tryedGetVideoTrack = true;
} else {
//找不到相应的rtmp解码器,该track无效
_videoTrack.reset();
......@@ -76,9 +118,9 @@ void RtmpDemuxer::makeVideoTrack(const AMFValue &videoCodec) {
}
}
void RtmpDemuxer::makeAudioTrack(const AMFValue &audioCodec) {
void RtmpDemuxer::makeAudioTrack(const AMFValue &audioCodec,int sample_rate, int channels, int sample_bit) {
//生成Track对象
_audioTrack = dynamic_pointer_cast<AudioTrack>(Factory::getAudioTrackByAmf(audioCodec));
_audioTrack = dynamic_pointer_cast<AudioTrack>(Factory::getAudioTrackByAmf(audioCodec, sample_rate, channels, sample_bit));
if (_audioTrack) {
//生成rtmpCodec对象以便解码rtmp
_audioRtmpDecoder = Factory::getRtmpCodecByTrack(_audioTrack);
......@@ -86,6 +128,7 @@ void RtmpDemuxer::makeAudioTrack(const AMFValue &audioCodec) {
//设置rtmp解码器代理,生成的frame写入该Track
_audioRtmpDecoder->addDelegate(_audioTrack);
onAddTrack(_audioTrack);
_tryedGetAudioTrack = true;
} else {
//找不到相应的rtmp解码器,该track无效
_audioTrack.reset();
......
......@@ -40,7 +40,7 @@ public:
bool inputRtmp(const RtmpPacket::Ptr &pkt);
private:
void makeVideoTrack(const AMFValue &val);
void makeAudioTrack(const AMFValue &val);
void makeAudioTrack(const AMFValue &val, int sample_rate, int channels, int sample_bit);
private:
bool _tryedGetVideoTrack = false;
bool _tryedGetAudioTrack = false;
......
......@@ -33,6 +33,9 @@ using namespace toolkit;
#define RTMP_GOP_SIZE 512
namespace mediakit {
typedef VideoPacketCache<RtmpPacket> RtmpVideoCache;
typedef AudioPacketCache<RtmpPacket> RtmpAudioCache;
/**
* rtmp媒体源的数据抽象
* rtmp有关键的三要素,分别是metadata、config帧,普通帧
......@@ -40,10 +43,11 @@ namespace mediakit {
* 只要生成了这三要素,那么要实现rtmp推流、rtmp服务器就很简单了
* rtmp推拉流协议中,先传递metadata,然后传递config帧,然后一直传递普通帧
*/
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr> {
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr>, public RtmpVideoCache, public RtmpAudioCache{
public:
typedef std::shared_ptr<RtmpMediaSource> Ptr;
typedef RingBuffer<RtmpPacket::Ptr> RingType;
typedef std::shared_ptr<List<RtmpPacket::Ptr> > RingDataType;
typedef RingBuffer<RingDataType> RingType;
/**
* 构造函数
......@@ -122,6 +126,9 @@ public:
return;
}
//保存当前时间戳
_track_stamps_map[pkt->typeId] = pkt->timeStamp;
if (!_ring) {
weak_ptr<RtmpMediaSource> weakSelf = dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) {
......@@ -142,9 +149,12 @@ public:
regist();
}
}
_track_stamps_map[pkt->typeId] = pkt->timeStamp;
//不存在视频,为了减少缓存延时,那么关闭GOP缓存
_ring->write(pkt, _have_video ? pkt->isVideoKeyFrame() : true);
if(pkt->typeId == MSG_VIDEO){
RtmpVideoCache::inputVideo(pkt, key);
}else{
RtmpAudioCache::inputAudio(pkt);
}
}
/**
......@@ -163,6 +173,25 @@ public:
}
private:
/**
* 批量flush时间戳相同的视频rtmp包时触发该函数
* @param rtmp_list 时间戳相同的rtmp包列表
* @param key_pos 是否包含关键帧
*/
void onFlushVideo(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list, bool key_pos) override {
_ring->write(rtmp_list, key_pos);
}
/**
* 批量flush一定数量的音频rtmp包时触发该函数
* @param rtmp_list rtmp包列表
*/
void onFlushAudio(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list) override{
//只有音频的话,就不存在gop缓存的意义
_ring->write(rtmp_list, !_have_video);
}
/**
* 每次增减消费者都会触发该函数
*/
......@@ -177,7 +206,7 @@ private:
bool _have_video = false;
mutable recursive_mutex _mtx;
AMFValue _metadata;
RingBuffer<RtmpPacket::Ptr>::Ptr _ring;
RingType::Ptr _ring;
unordered_map<int, uint32_t> _track_stamps_map;
unordered_map<int, RtmpPacket::Ptr> _config_frame_map;
};
......
......@@ -118,6 +118,8 @@ void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeComplete
//开始播放阶段
_pPlayTimer.reset();
onPlayResult(ex);
//是否为性能测试模式
_benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
} else if (ex) {
//播放成功后异常断开回调
onShutdown(ex);
......@@ -146,6 +148,11 @@ void RtmpPlayer::onConnect(const SockException &err){
}
void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){
try {
if(_benchmark_mode && !_pPlayTimer){
//在性能测试模式下,如果rtmp握手完毕后,不再解析rtmp包
_mediaTicker.resetTime();
return;
}
onParseRtmp(pBuf->data(), pBuf->size());
} catch (exception &e) {
SockException ex(Err_other, e.what());
......
......@@ -102,6 +102,8 @@ private:
uint32_t _aiNowStamp[2] = { 0, 0 };
Ticker _aNowStampTicker[2];
bool _metadata_got = false;
//是否为性能测试模式
bool _benchmark_mode = false;
};
} /* namespace mediakit */
......
......@@ -200,12 +200,21 @@ inline void RtmpPusher::send_metaData(){
_pRtmpReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
_pRtmpReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
_pRtmpReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId);
int i = 0;
int size = pkt->size();
strongSelf->setSendFlushFlag(false);
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
if(++i == size){
strongSelf->setSendFlushFlag(true);
}
strongSelf->sendRtmp(rtmp->typeId, strongSelf->_ui32StreamId, rtmp, rtmp->timeStamp, rtmp->chunkId);
});
});
_pRtmpReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock();
......
......@@ -193,7 +193,6 @@ void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
throw std::runtime_error(StrPrinter << "Stop publishing" << endl);
}
void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){
bool authSuccess = err.empty();
bool ok = (src.operator bool() && authSuccess);
......@@ -272,12 +271,23 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
_pRingReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
_pRingReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onSendMedia(pkt);
if(strongSelf->_paused){
return;
}
int i = 0;
int size = pkt->size();
strongSelf->setSendFlushFlag(false);
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
if(++i == size){
strongSelf->setSendFlushFlag(true);
}
strongSelf->onSendMedia(rtmp);
});
});
_pRingReader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock();
......@@ -393,24 +403,9 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) {
status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify");
status.set("description", paused ? "Paused stream." : "Unpaused stream.");
sendReply("onStatus", nullptr, status);
//streamBegin
sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN,
STREAM_MEDIA);
if (!_pRingReader) {
throw std::runtime_error("Rtmp not started yet!");
}
if (paused) {
_pRingReader->setReadCB(nullptr);
} else {
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt) {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onSendMedia(pkt);
});
}
//streamBegin
sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA);
_paused = paused;
}
void RtmpSession::setMetaData(AMFDecoder &dec) {
......
......@@ -80,13 +80,14 @@ private:
double _dNowReqID = 0;
bool _set_meta_data = false;
Ticker _ticker;//数据接收时间
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader;
RtmpMediaSource::RingType::RingReader::Ptr _pRingReader;
std::shared_ptr<RtmpMediaSourceImp> _pPublisherSrc;
std::weak_ptr<RtmpMediaSource> _pPlayerSrc;
//时间戳修整器
Stamp _stamp[2];
//消耗的总流量
uint64_t _ui64TotalBytes = 0;
bool _paused = false;
};
......
......@@ -204,10 +204,10 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
pts /= 90;
dts /= 90;
_stamps[codecid].revise(dts,pts,dts,pts,false);
_dts = dts;
switch (codecid) {
case STREAM_VIDEO_H264: {
_dts = dts;
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
......@@ -232,6 +232,7 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
}
case STREAM_VIDEO_H265: {
_dts = dts;
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
......@@ -254,6 +255,7 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
}
case STREAM_AUDIO_AAC: {
_dts = dts;
if (!_codecid_audio) {
//获取到音频
_codecid_audio = codecid;
......
......@@ -16,7 +16,7 @@ namespace mediakit{
int RtpPayload::getClockRate(int pt){
switch (pt){
#define SWITCH_CASE(name, type, value, clock_rate, channel) case value : return clock_rate;
#define SWITCH_CASE(name, type, value, clock_rate, channel, codec_id) case value : return clock_rate;
RTP_PT_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return 90000;
......@@ -25,7 +25,7 @@ int RtpPayload::getClockRate(int pt){
TrackType RtpPayload::getTrackType(int pt){
switch (pt){
#define SWITCH_CASE(name, type, value, clock_rate, channel) case value : return type;
#define SWITCH_CASE(name, type, value, clock_rate, channel, codec_id) case value : return type;
RTP_PT_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return TrackInvalid;
......@@ -34,7 +34,7 @@ TrackType RtpPayload::getTrackType(int pt){
int RtpPayload::getAudioChannel(int pt){
switch (pt){
#define SWITCH_CASE(name, type, value, clock_rate, channel) case value : return channel;
#define SWITCH_CASE(name, type, value, clock_rate, channel, codec_id) case value : return channel;
RTP_PT_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return 1;
......@@ -43,13 +43,22 @@ int RtpPayload::getAudioChannel(int pt){
const char * RtpPayload::getName(int pt){
switch (pt){
#define SWITCH_CASE(name, type, value, clock_rate, channel) case value : return #name;
#define SWITCH_CASE(name, type, value, clock_rate, channel, codec_id) case value : return #name;
RTP_PT_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return "unknown payload type";
}
}
CodecId RtpPayload::getCodecId(int pt) {
switch (pt) {
#define SWITCH_CASE(name, type, value, clock_rate, channel, codec_id) case value : return codec_id;
RTP_PT_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default : return CodecInvalid;
}
}
static void getAttrSdp(const map<string, string> &attr, _StrPrinter &printer){
const map<string, string>::value_type *ptr = nullptr;
for(auto &pr : attr){
......@@ -70,7 +79,7 @@ static void getAttrSdp(const map<string, string> &attr, _StrPrinter &printer){
string SdpTrack::getName() const{
switch (_pt){
#define SWITCH_CASE(name, type, value, clock_rate, channel) case value : return #name;
#define SWITCH_CASE(name, type, value, clock_rate, channel, codec_id) case value : return #name;
RTP_PT_MAP(SWITCH_CASE)
#undef SWITCH_CASE
default: return _codec;
......@@ -172,6 +181,7 @@ void SdpParser::load(const string &sdp) {
if (4 == sscanf(opt_val.data(), " %15[^ ] %d %15[^ ] %d", type, &port, rtp, &pt)) {
track->_pt = pt;
track->_samplerate = RtpPayload::getClockRate(pt) ;
track->_channel = RtpPayload::getAudioChannel(pt);
track->_type = toTrackType(type);
track->_m = opt_val;
track->_port = port;
......@@ -214,9 +224,14 @@ void SdpParser::load(const string &sdp) {
it = track._attr.find("rtpmap");
if(it != track._attr.end()){
auto rtpmap = it->second;
int pt, samplerate;
int pt, samplerate, channel;
char codec[16] = {0};
if (3 == sscanf(rtpmap.data(), "%d %15[^/]/%d", &pt, codec, &samplerate)) {
if (4 == sscanf(rtpmap.data(), "%d %15[^/]/%d/%d", &pt, codec, &samplerate, &channel)) {
track._pt = pt;
track._codec = codec;
track._samplerate = samplerate;
track._channel = channel;
}else if (3 == sscanf(rtpmap.data(), "%d %15[^/]/%d", &pt, codec, &samplerate)) {
track._pt = pt;
track._codec = codec;
track._samplerate = samplerate;
......
......@@ -34,33 +34,33 @@ typedef enum {
} eRtpType;
#define RTP_PT_MAP(XX) \
XX(PCMU, TrackAudio, 0, 8000, 1) \
XX(GSM, TrackAudio , 3, 8000, 1) \
XX(G723, TrackAudio, 4, 8000, 1) \
XX(DVI4_8000, TrackAudio, 5, 8000, 1) \
XX(DVI4_16000, TrackAudio, 6, 16000, 1) \
XX(LPC, TrackAudio, 7, 8000, 1) \
XX(PCMA, TrackAudio, 8, 8000, 1) \
XX(G722, TrackAudio, 9, 8000, 1) \
XX(L16_Stereo, TrackAudio, 10, 44100, 2) \
XX(L16_Mono, TrackAudio, 11, 44100, 1) \
XX(QCELP, TrackAudio, 12, 8000, 1) \
XX(CN, TrackAudio, 13, 8000, 1) \
XX(MPA, TrackAudio, 14, 90000, 1) \
XX(G728, TrackAudio, 15, 8000, 1) \
XX(DVI4_11025, TrackAudio, 16, 11025, 1) \
XX(DVI4_22050, TrackAudio, 17, 22050, 1) \
XX(G729, TrackAudio, 18, 8000, 1) \
XX(CelB, TrackVideo, 25, 90000, 1) \
XX(JPEG, TrackVideo, 26, 90000, 1) \
XX(nv, TrackVideo, 28, 90000, 1) \
XX(H261, TrackVideo, 31, 90000, 1) \
XX(MPV, TrackVideo, 32, 90000, 1) \
XX(MP2T, TrackVideo, 33, 90000, 1) \
XX(H263, TrackVideo, 34, 90000, 1) \
XX(PCMU, TrackAudio, 0, 8000, 1, CodecG711U) \
XX(GSM, TrackAudio , 3, 8000, 1, CodecInvalid) \
XX(G723, TrackAudio, 4, 8000, 1, CodecInvalid) \
XX(DVI4_8000, TrackAudio, 5, 8000, 1, CodecInvalid) \
XX(DVI4_16000, TrackAudio, 6, 16000, 1, CodecInvalid) \
XX(LPC, TrackAudio, 7, 8000, 1, CodecInvalid) \
XX(PCMA, TrackAudio, 8, 8000, 1, CodecG711A) \
XX(G722, TrackAudio, 9, 8000, 1, CodecInvalid) \
XX(L16_Stereo, TrackAudio, 10, 44100, 2, CodecInvalid) \
XX(L16_Mono, TrackAudio, 11, 44100, 1, CodecInvalid) \
XX(QCELP, TrackAudio, 12, 8000, 1, CodecInvalid) \
XX(CN, TrackAudio, 13, 8000, 1, CodecInvalid) \
XX(MPA, TrackAudio, 14, 90000, 1, CodecInvalid) \
XX(G728, TrackAudio, 15, 8000, 1, CodecInvalid) \
XX(DVI4_11025, TrackAudio, 16, 11025, 1, CodecInvalid) \
XX(DVI4_22050, TrackAudio, 17, 22050, 1, CodecInvalid) \
XX(G729, TrackAudio, 18, 8000, 1, CodecInvalid) \
XX(CelB, TrackVideo, 25, 90000, 1, CodecInvalid) \
XX(JPEG, TrackVideo, 26, 90000, 1, CodecInvalid) \
XX(nv, TrackVideo, 28, 90000, 1, CodecInvalid) \
XX(H261, TrackVideo, 31, 90000, 1, CodecInvalid) \
XX(MPV, TrackVideo, 32, 90000, 1, CodecInvalid) \
XX(MP2T, TrackVideo, 33, 90000, 1, CodecInvalid) \
XX(H263, TrackVideo, 34, 90000, 1, CodecInvalid) \
typedef enum {
#define ENUM_DEF(name, type, value, clock_rate, channel) PT_ ## name = value,
#define ENUM_DEF(name, type, value, clock_rate, channel, codec_id) PT_ ## name = value,
RTP_PT_MAP(ENUM_DEF)
#undef ENUM_DEF
PT_MAX = 128
......@@ -88,6 +88,7 @@ public:
static TrackType getTrackType(int pt);
static int getAudioChannel(int pt);
static const char *getName(int pt);
static CodecId getCodecId(int pt);
private:
RtpPayload() = delete;
~RtpPayload() = delete;
......@@ -128,6 +129,7 @@ public:
int _pt;
string _codec;
int _samplerate;
int _channel;
string _fmtp;
string _control;
string _control_surffix;
......
......@@ -30,94 +30,10 @@ using namespace toolkit;
#define RTP_GOP_SIZE 512
namespace mediakit {
class RtpVideoCache {
public:
RtpVideoCache() {
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
virtual ~RtpVideoCache() = default;
void inputVideoRtp(const RtpPacket::Ptr &rtp, bool key_pos) {
if (_last_rtp_stamp != rtp->timeStamp) {
//时间戳发生变化了
flushAll();
} else if (_cache->size() > RTP_GOP_SIZE) {
//这个逻辑用于避免时间戳异常的流导致的内存暴增问题
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_rtp_stamp = rtp->timeStamp;
if (key_pos) {
_key_pos = key_pos;
}
}
virtual void onFlushVideoRtp(std::shared_ptr<List<RtpPacket::Ptr> > &, bool key_pos) = 0;
typedef VideoPacketCache<RtpPacket> RtpVideoCache;
typedef AudioPacketCache<RtpPacket> RtpAudioCache;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushVideoRtp(_cache, _key_pos);
_cache = std::make_shared<List<RtpPacket::Ptr> >();
_key_pos = false;
}
private:
std::shared_ptr<List<RtpPacket::Ptr> > _cache;
uint32_t _last_rtp_stamp = 0;
bool _key_pos = false;
};
class RtpAudioCache {
public:
RtpAudioCache() {
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
virtual ~RtpAudioCache() = default;
void inputAudioRtp(const RtpPacket::Ptr &rtp) {
if (rtp->timeStamp > _last_rtp_stamp + 100) {
//累积了100ms的音频数据
flushAll();
} else if (_cache->size() > 10) {
//或者audio rtp缓存超过10个
flushAll();
}
//追加数据到最后
_cache->emplace_back(rtp);
_last_rtp_stamp = rtp->timeStamp;
}
virtual void onFlushAudioRtp(std::shared_ptr<List<RtpPacket::Ptr> > &) = 0;
private:
void flushAll() {
if (_cache->empty()) {
return;
}
onFlushAudioRtp(_cache);
_cache = std::make_shared<List<RtpPacket::Ptr> >();
}
private:
std::shared_ptr<List<RtpPacket::Ptr> > _cache;
uint32_t _last_rtp_stamp = 0;
};
/**
/**
* rtsp媒体源的数据抽象
* rtsp有关键的两要素,分别是sdp、rtp包
* 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了
......@@ -261,9 +177,9 @@ public:
}
if(rtp->type == TrackVideo){
RtpVideoCache::inputVideoRtp(rtp, keyPos);
RtpVideoCache::inputVideo(rtp, keyPos);
}else{
RtpAudioCache::inputAudioRtp(rtp);
RtpAudioCache::inputAudio(rtp);
}
}
......@@ -274,7 +190,7 @@ private:
* @param rtp_list 时间戳相同的rtp包列表
* @param key_pos 是否包含关键帧
*/
void onFlushVideoRtp(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override {
void onFlushVideo(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override {
_ring->write(rtp_list, key_pos);
}
......@@ -282,7 +198,7 @@ private:
* 批量flush一定数量的音频rtp包时触发该函数
* @param rtp_list rtp包列表
*/
void onFlushAudioRtp(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list) override{
void onFlushAudio(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list) override{
//只有音频的话,就不存在gop缓存的意义
_ring->write(rtp_list, !_have_video);
}
......
......@@ -112,6 +112,11 @@ void RtspPlayer::onConnect(const SockException &err){
}
void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
if(_benchmark_mode && !_pPlayTimer){
//在性能测试模式下,如果rtsp握手完毕后,不再解析rtp包
_rtpTicker.resetTime();
return;
}
input(pBuf->data(),pBuf->size());
}
void RtspPlayer::onErr(const SockException &ex) {
......@@ -750,6 +755,8 @@ void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshakeComplete
//开始播放阶段
_pPlayTimer.reset();
onPlayResult(ex);
//是否为性能测试模式
_benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
} else if (ex) {
//播放成功后异常断开回调
onShutdown(ex);
......
......@@ -139,6 +139,8 @@ private:
//是否为rtsp点播
bool _is_play_back;
//是否为性能测试模式
bool _benchmark_mode = false;
};
} /* namespace mediakit */
......
......@@ -55,6 +55,7 @@ int main(int argc, char *argv[]) {
player->setOnShutdown([&](const SockException &ex) {
--alivePlayerCnt;
});
(*player)[kBenchmarkMode] = true;
(*player)[kRtpType] = atoi(argv[4]);
player->play(argv[3]);
playerList.push_back(player);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论