Commit ac1abb34 by ziyue

新增媒体流flush机制:#1996

parent 80eef693
......@@ -420,9 +420,7 @@ FFmpegDecoder::FFmpegDecoder(const Track::Ptr &track, int thread_num) {
FFmpegDecoder::~FFmpegDecoder() {
stopThread(true);
if (_do_merger) {
_merger.inputFrame(nullptr, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
decodeFrame(buffer->data(), buffer->size(), dts, pts, false);
});
_merger.flush();
}
flush();
}
......@@ -452,7 +450,7 @@ const AVCodecContext *FFmpegDecoder::getContext() const {
bool FFmpegDecoder::inputFrame_l(const Frame::Ptr &frame, bool live, bool enable_merge) {
if (_do_merger && enable_merge) {
return _merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
return _merger.inputFrame(frame, [this, live](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
decodeFrame(buffer->data(), buffer->size(), dts, pts, live);
});
}
......
......@@ -359,9 +359,9 @@ public:
virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
bool flush = flushImmediatelyWhenCloseMerge();
if (!flush && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flushAll();
bool flag = flushImmediatelyWhenCloseMerge();
if (!flag && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flush();
}
//追加数据到最后
......@@ -370,19 +370,12 @@ public:
_key_pos = key_pos;
}
if (flush) {
flushAll();
if (flag) {
flush();
}
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
void flushAll() {
void flush() {
if (_cache->empty()) {
return;
}
......@@ -391,6 +384,13 @@ private:
_key_pos = false;
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
bool flushImmediatelyWhenCloseMerge() {
//一般的协议关闭合并写时,立即刷新缓存,这样可以减少一帧的延时,但是rtp例外
//因为rtp的包很小,一个RtpPacket包中也不是完整的一帧图像,所以在关闭合并写时,
......
......@@ -155,7 +155,7 @@ void FrameMerger::doMerge(BufferLikeString &merged, const Frame::Ptr &frame) con
}
}
bool FrameMerger::inputFrame(const Frame::Ptr &frame, const onOutput &cb, BufferLikeString *buffer) {
bool FrameMerger::inputFrame(const Frame::Ptr &frame, onOutput cb, BufferLikeString *buffer) {
if (willFlush(frame)) {
Frame::Ptr back = _frame_cache.back();
Buffer::Ptr merged_frame = back;
......@@ -190,6 +190,7 @@ bool FrameMerger::inputFrame(const Frame::Ptr &frame, const onOutput &cb, Buffer
if (frame->decodeAble()) {
_have_decode_able_frame = true;
}
_cb = std::move(cb);
_frame_cache.emplace_back(Frame::getCacheAbleFrame(frame));
return true;
}
......@@ -203,4 +204,11 @@ void FrameMerger::clear() {
_have_decode_able_frame = false;
}
void FrameMerger::flush() {
if (_cb) {
inputFrame(nullptr, std::move(_cb), nullptr);
}
clear();
}
}//namespace mediakit
......@@ -271,6 +271,11 @@ public:
* 写入帧数据
*/
virtual bool inputFrame(const Frame::Ptr &frame) = 0;
/**
* 刷新输出所有frame缓存
*/
virtual void flush() {};
};
/**
......@@ -542,8 +547,13 @@ public:
FrameMerger(int type);
~FrameMerger() = default;
/**
* 刷新输出缓冲,注意此时会调用FrameMerger::inputFrame传入的onOutput回调
* 请注意回调捕获参数此时是否有效
*/
void flush();
void clear();
bool inputFrame(const Frame::Ptr &frame, const onOutput &cb, toolkit::BufferLikeString *buffer = nullptr);
bool inputFrame(const Frame::Ptr &frame, onOutput cb, toolkit::BufferLikeString *buffer = nullptr);
private:
bool willFlush(const Frame::Ptr &frame) const;
......@@ -552,6 +562,7 @@ private:
private:
int _type;
bool _have_decode_able_frame = false;
onOutput _cb;
toolkit::List<Frame::Ptr> _frame_cache;
};
......
......@@ -124,26 +124,32 @@ void H264RtmpEncoder::makeConfigPacket(){
}
}
void H264RtmpEncoder::flush() {
inputFrame(nullptr);
}
bool H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
auto data = frame->data() + frame->prefixSize();
auto len = frame->size() - frame->prefixSize();
auto type = H264_TYPE(data[0]);
switch (type) {
case H264Frame::NAL_SPS: {
if (!_got_config_frame) {
_sps = string(data, len);
makeConfigPacket();
if (frame) {
auto data = frame->data() + frame->prefixSize();
auto len = frame->size() - frame->prefixSize();
auto type = H264_TYPE(data[0]);
switch (type) {
case H264Frame::NAL_SPS: {
if (!_got_config_frame) {
_sps = string(data, len);
makeConfigPacket();
}
break;
}
break;
}
case H264Frame::NAL_PPS: {
if (!_got_config_frame) {
_pps = string(data, len);
makeConfigPacket();
case H264Frame::NAL_PPS: {
if (!_got_config_frame) {
_pps = string(data, len);
makeConfigPacket();
}
break;
}
break;
default: break;
}
default : break;
}
if (!_rtmp_packet) {
......
......@@ -62,7 +62,7 @@ public:
* @param track
*/
H264RtmpEncoder(const Track::Ptr &track);
~H264RtmpEncoder() {}
~H264RtmpEncoder() = default;
/**
* 输入264帧,可以不带sps pps
......@@ -71,6 +71,11 @@ public:
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
/**
* 生成config包
*/
void makeConfigPacket() override;
......
......@@ -291,6 +291,14 @@ bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame) {
return true;
}
void H264RtpEncoder::flush() {
if (_last_frame) {
// 如果时间戳发生了变化,那么markbit才置true
inputFrame_l(_last_frame, true);
_last_frame = nullptr;
}
}
bool H264RtpEncoder::inputFrame_l(const Frame::Ptr &frame, bool is_mark){
if (frame->keyFrame()) {
//保证每一个关键帧前都有SPS与PPS
......
......@@ -85,6 +85,11 @@ public:
*/
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
private:
void insertConfigFrame(uint64_t pts);
bool inputFrame_l(const Frame::Ptr &frame, bool is_mark);
......
......@@ -138,33 +138,39 @@ void H265RtmpEncoder::makeConfigPacket(){
}
}
void H265RtmpEncoder::flush() {
inputFrame(nullptr);
}
bool H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
auto data = frame->data() + frame->prefixSize();
auto len = frame->size() - frame->prefixSize();
auto type = H265_TYPE(data[0]);
switch (type) {
case H265Frame::NAL_SPS: {
if (!_got_config_frame) {
_sps = string(data, len);
makeConfigPacket();
if (frame) {
auto data = frame->data() + frame->prefixSize();
auto len = frame->size() - frame->prefixSize();
auto type = H265_TYPE(data[0]);
switch (type) {
case H265Frame::NAL_SPS: {
if (!_got_config_frame) {
_sps = string(data, len);
makeConfigPacket();
}
break;
}
break;
}
case H265Frame::NAL_PPS: {
if (!_got_config_frame) {
_pps = string(data, len);
makeConfigPacket();
case H265Frame::NAL_PPS: {
if (!_got_config_frame) {
_pps = string(data, len);
makeConfigPacket();
}
break;
}
break;
}
case H265Frame::NAL_VPS: {
if (!_got_config_frame) {
_vps = string(data, len);
makeConfigPacket();
case H265Frame::NAL_VPS: {
if (!_got_config_frame) {
_vps = string(data, len);
makeConfigPacket();
}
break;
}
break;
default: break;
}
default: break;
}
if (!_rtmp_packet) {
......
......@@ -60,7 +60,7 @@ public:
* @param track
*/
H265RtmpEncoder(const Track::Ptr &track);
~H265RtmpEncoder() {}
~H265RtmpEncoder() = default;
/**
* 输入265帧,可以不带sps pps
......@@ -69,6 +69,11 @@ public:
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
/**
* 生成config包
*/
void makeConfigPacket() override;
......
......@@ -31,7 +31,7 @@ public:
};
//FMP4直播源
class FMP4MediaSource : public MediaSource, public toolkit::RingDelegate<FMP4Packet::Ptr>, private PacketCache<FMP4Packet>{
class FMP4MediaSource final : public MediaSource, public toolkit::RingDelegate<FMP4Packet::Ptr>, private PacketCache<FMP4Packet>{
public:
using Ptr = std::shared_ptr<FMP4MediaSource>;
using RingDataType = std::shared_ptr<toolkit::List<FMP4Packet::Ptr> >;
......@@ -42,7 +42,7 @@ public:
const std::string &stream_id,
int ring_size = FMP4_GOP_SIZE) : MediaSource(FMP4_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
~FMP4MediaSource() override = default;
~FMP4MediaSource() override { flush(); }
/**
* 获取媒体源的环形缓冲
......
......@@ -18,8 +18,8 @@
namespace mediakit {
class FMP4MediaSourceMuxer : public MP4MuxerMemory, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<FMP4MediaSourceMuxer> {
class FMP4MediaSourceMuxer final : public MP4MuxerMemory, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<FMP4MediaSourceMuxer> {
public:
using Ptr = std::shared_ptr<FMP4MediaSourceMuxer>;
......@@ -29,7 +29,7 @@ public:
_media_src = std::make_shared<FMP4MediaSource>(vhost, app, stream_id);
}
~FMP4MediaSourceMuxer() override = default;
~FMP4MediaSourceMuxer() override { MP4MuxerMemory::flush(); };
void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
setDelegate(listener);
......
......@@ -393,6 +393,9 @@ void HlsPlayerImp::onShutdown(const SockException &ex) {
break;
}
}
if (_decoder) {
_decoder->flush();
}
PlayerImp<HlsPlayer, PlayerBase>::onShutdown(ex);
}
......
......@@ -60,6 +60,9 @@ void TsPlayerImp::onShutdown(const SockException &ex) {
break;
}
}
if (_decoder) {
_decoder->flush();
}
PlayerImp<TsPlayer, PlayerBase>::onShutdown(ex);
}
......
......@@ -16,7 +16,7 @@
namespace mediakit {
class HlsRecorder : public MediaSourceEventInterceptor, public MpegMuxer, public std::enable_shared_from_this<HlsRecorder> {
class HlsRecorder final : public MediaSourceEventInterceptor, public MpegMuxer, public std::enable_shared_from_this<HlsRecorder> {
public:
using Ptr = std::shared_ptr<HlsRecorder>;
......@@ -30,7 +30,7 @@ public:
_hls->clearCache();
}
~HlsRecorder() = default;
~HlsRecorder() { MpegMuxer::flush(); };
void setMediaSource(const std::string &vhost, const std::string &app, const std::string &stream_id) {
_hls->setMediaSource(vhost, app, stream_id);
......
......@@ -79,6 +79,10 @@ void MP4MuxerInterface::resetTracks() {
_codec_to_trackid.clear();
}
void MP4MuxerInterface::flush() {
_frame_merger.flush();
}
bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
auto it = _codec_to_trackid.find(frame->getCodecId());
if (it == _codec_to_trackid.end()) {
......@@ -98,12 +102,12 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
//mp4文件时间戳需要从0开始
auto &track_info = it->second;
int64_t dts_out, pts_out;
switch (frame->getCodecId()) {
case CodecH264:
case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理,
_frame_merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
_frame_merger.inputFrame(frame, [this, &track_info](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
int64_t dts_out, pts_out;
track_info.stamp.revise(dts, pts, dts_out, pts_out);
mp4_writer_write(_mov_writter.get(),
track_info.track_id,
......@@ -117,6 +121,7 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
}
default: {
int64_t dts_out, pts_out;
track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
mp4_writer_write(_mov_writter.get(),
track_info.track_id,
......
......@@ -44,6 +44,11 @@ public:
void resetTracks() override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
/**
* 是否包含视频
*/
bool haveVideo() const;
......
......@@ -33,6 +33,7 @@ MP4Recorder::MP4Recorder(const string &path, const string &vhost, const string &
}
MP4Recorder::~MP4Recorder() {
flush();
closeFile();
}
......@@ -96,6 +97,12 @@ void MP4Recorder::closeFile() {
}
}
void MP4Recorder::flush() {
if (_muxer) {
_muxer->flush();
}
}
bool MP4Recorder::inputFrame(const Frame::Ptr &frame) {
if (!(_have_video && frame->getTrackType() == TrackAudio)) {
//如果有视频且输入的是音频,那么应该忽略切片逻辑
......
......@@ -24,7 +24,7 @@
namespace mediakit {
#ifdef ENABLE_MP4
class MP4Recorder : public MediaSinkInterface {
class MP4Recorder final : public MediaSinkInterface {
public:
using Ptr = std::shared_ptr<MP4Recorder>;
......@@ -42,6 +42,11 @@ public:
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
/**
* 添加ready状态的track
*/
bool addTrack(const Track::Ptr & track) override;
......
......@@ -63,7 +63,7 @@ bool MpegMuxer::inputFrame(const Frame::Ptr &frame) {
case CodecH264:
case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理,
return _frame_merger.inputFrame(frame,[&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
return _frame_merger.inputFrame(frame,[this, track_id](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
_key_pos = have_idr;
//取视频时间戳为TS的时间戳
_timestamp = dts;
......@@ -153,6 +153,10 @@ void MpegMuxer::releaseContext() {
_frame_merger.clear();
}
void MpegMuxer::flush() {
_frame_merger.flush();
}
}//mediakit
#endif
\ No newline at end of file
......@@ -45,6 +45,11 @@ public:
*/
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
protected:
/**
* 输出ts/ps数据回调
......
......@@ -60,7 +60,7 @@ public:
MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
}
~RtmpMediaSource() override{}
~RtmpMediaSource() override { flush(); }
/**
* 获取媒体源的环形缓冲
......
......@@ -26,9 +26,9 @@
namespace mediakit {
class RtmpMediaSourceImp: public RtmpMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener {
class RtmpMediaSourceImp final : public RtmpMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener {
public:
typedef std::shared_ptr<RtmpMediaSourceImp> Ptr;
using Ptr = std::shared_ptr<RtmpMediaSourceImp>;
/**
* 构造函数
......@@ -42,7 +42,7 @@ public:
_demuxer->setTrackListener(this);
}
~RtmpMediaSourceImp() = default;
~RtmpMediaSourceImp() override = default;
/**
* 设置metadata
......
......@@ -16,8 +16,8 @@
namespace mediakit {
class RtmpMediaSourceMuxer : public RtmpMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<RtmpMediaSourceMuxer> {
class RtmpMediaSourceMuxer final : public RtmpMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<RtmpMediaSourceMuxer> {
public:
typedef std::shared_ptr<RtmpMediaSourceMuxer> Ptr;
......@@ -29,7 +29,7 @@ public:
getRtmpRing()->setDelegate(_media_src);
}
~RtmpMediaSourceMuxer() override{}
~RtmpMediaSourceMuxer() override { RtmpMuxer::flush(); }
void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
setDelegate(listener);
......
......@@ -43,6 +43,14 @@ bool RtmpMuxer::inputFrame(const Frame::Ptr &frame) {
return encoder ? encoder->inputFrame(frame) : false;
}
void RtmpMuxer::flush() {
for (auto &encoder : _encoder) {
if (encoder) {
encoder->flush();
}
}
}
void RtmpMuxer::makeConfigPacket(){
for(auto &encoder : _encoder){
if(encoder){
......
......@@ -52,6 +52,11 @@ public:
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
/**
* 重置所有track
*/
void resetTracks() override ;
......
......@@ -65,6 +65,10 @@ DecoderImp::Ptr DecoderImp::createDecoder(Type type, MediaSinkInterface *sink){
return DecoderImp::Ptr(new DecoderImp(decoder, sink));
}
void DecoderImp::flush() {
_merger.flush();
}
ssize_t DecoderImp::input(const uint8_t *data, size_t bytes){
return _decoder->input(data, bytes);
}
......@@ -219,10 +223,7 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
default:
// 海康的 PS 流中会有 codecid 为 0xBD 的包
if (codecid != 0 && codecid != 0xBD) {
if (_last_unsported_print.elapsedTime() / 1000 > 5) {
_last_unsported_print.resetTime();
WarnL << "unsupported codec type:" << getCodecName(codecid) << " " << (int) codecid;
}
WarnL << "unsupported codec type:" << getCodecName(codecid) << " " << (int) codecid;
}
break;
}
......
......@@ -39,16 +39,14 @@ protected:
class DecoderImp{
public:
typedef enum {
decoder_ts = 0,
decoder_ps
}Type;
typedef enum { decoder_ts = 0, decoder_ps } Type;
typedef std::shared_ptr<DecoderImp> Ptr;
~DecoderImp() = default;
static Ptr createDecoder(Type type, MediaSinkInterface *sink);
ssize_t input(const uint8_t *data, size_t bytes);
void flush();
protected:
void onTrack(const Track::Ptr &track);
......@@ -63,7 +61,6 @@ private:
Decoder::Ptr _decoder;
MediaSinkInterface *_sink;
FrameMerger _merger{FrameMerger::none};
toolkit::Ticker _last_unsported_print;
Track::Ptr _tracks[TrackMax];
};
......
......@@ -59,12 +59,16 @@ GB28181Process::GB28181Process(const MediaInfo &media_info, MediaSinkInterface *
_interface = sink;
}
GB28181Process::~GB28181Process() {}
void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp) {
_rtp_decoder[rtp->getHeader()->pt]->inputRtp(rtp, false);
}
void GB28181Process::flush() {
if (_decoder) {
_decoder->flush();
}
}
bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
GET_CONFIG(uint32_t, h264_pt, RtpProxy::kH264PT);
GET_CONFIG(uint32_t, h265_pt, RtpProxy::kH265PT);
......
......@@ -26,7 +26,7 @@ class GB28181Process : public ProcessInterface {
public:
typedef std::shared_ptr<GB28181Process> Ptr;
GB28181Process(const MediaInfo &media_info, MediaSinkInterface *sink);
~GB28181Process() override;
~GB28181Process() override = default;
/**
* 输入rtp
......@@ -36,6 +36,11 @@ public:
*/
bool inputRtp(bool, const char *data, size_t data_len) override;
/**
* 刷新输出所有缓存
*/
void flush() override;
protected:
void onRtpSorted(RtpPacket::Ptr rtp);
......
......@@ -31,6 +31,11 @@ public:
* @return 是否解析成功
*/
virtual bool inputRtp(bool is_udp, const char *data, size_t data_len) = 0;
/**
* 刷新输出所有缓存
*/
virtual void flush() {}
};
}//namespace mediakit
......
......@@ -19,37 +19,49 @@ namespace mediakit{
RtpCache::RtpCache(onFlushed cb) {
_cb = std::move(cb);
}
bool RtpCache::firstKeyReady(bool in) {
if(_first_key){
if (_first_key) {
return _first_key;
}
_first_key = in;
return _first_key;
}
void RtpCache::onFlush(std::shared_ptr<List<Buffer::Ptr> > rtp_list, bool) {
void RtpCache::onFlush(std::shared_ptr<List<Buffer::Ptr>> rtp_list, bool) {
_cb(std::move(rtp_list));
}
void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer,bool is_key ) {
void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer, bool is_key) {
inputPacket(stamp, true, std::move(buffer), is_key);
}
void RtpCachePS::onRTP(Buffer::Ptr buffer,bool is_key) {
if(!firstKeyReady(is_key)){
void RtpCachePS::flush() {
PSEncoderImp::flush();
RtpCache::flush();
}
void RtpCachePS::onRTP(Buffer::Ptr buffer, bool is_key) {
if (!firstKeyReady(is_key)) {
return;
}
auto rtp = std::static_pointer_cast<RtpPacket>(buffer);
auto stamp = rtp->getStampMS();
input(stamp, std::move(buffer),is_key);
input(stamp, std::move(buffer), is_key);
}
void RtpCacheRaw::flush() {
RawEncoderImp::flush();
RtpCache::flush();
}
void RtpCacheRaw::onRTP(Buffer::Ptr buffer,bool is_key) {
if(!firstKeyReady(is_key)){
void RtpCacheRaw::onRTP(Buffer::Ptr buffer, bool is_key) {
if (!firstKeyReady(is_key)) {
return;
}
auto rtp = std::static_pointer_cast<RtpPacket>(buffer);
auto stamp = rtp->getStampMS();
input(stamp, std::move(buffer),is_key);
input(stamp, std::move(buffer), is_key);
}
}//namespace mediakit
......
......@@ -19,7 +19,7 @@
namespace mediakit{
class RtpCache : private PacketCache<toolkit::Buffer> {
class RtpCache : protected PacketCache<toolkit::Buffer> {
public:
using onFlushed = std::function<void(std::shared_ptr<toolkit::List<toolkit::Buffer::Ptr> >)>;
RtpCache(onFlushed cb);
......@@ -33,30 +33,33 @@ protected:
void input(uint64_t stamp, toolkit::Buffer::Ptr buffer,bool is_key = false);
bool firstKeyReady(bool in);
protected:
void onFlush(std::shared_ptr<toolkit::List<toolkit::Buffer::Ptr> > rtp_list, bool) override;
private:
onFlushed _cb;
bool _first_key = false;
onFlushed _cb;
};
class RtpCachePS : public RtpCache, public PSEncoderImp{
class RtpCachePS : public RtpCache, public PSEncoderImp {
public:
RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96) : RtpCache(std::move(cb)), PSEncoderImp(ssrc, payload_type) {};
~RtpCachePS() override = default;
void flush() override;
protected:
void onRTP(toolkit::Buffer::Ptr rtp,bool is_key = false) override;
void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) override;
};
class RtpCacheRaw : public RtpCache, public RawEncoderImp{
class RtpCacheRaw : public RtpCache, public RawEncoderImp {
public:
RtpCacheRaw(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96, bool sendAudio = true) : RtpCache(std::move(cb)), RawEncoderImp(ssrc, payload_type,sendAudio) {};
RtpCacheRaw(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96, bool send_audio = true) : RtpCache(std::move(cb)), RawEncoderImp(ssrc, payload_type, send_audio) {};
~RtpCacheRaw() override = default;
void flush() override;
protected:
void onRTP(toolkit::Buffer::Ptr rtp,bool is_key = false) override;
void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) override;
};
}//namespace mediakit
......
......@@ -50,6 +50,9 @@ RtpProcess::RtpProcess(const string &stream_id) {
}
RtpProcess::~RtpProcess() {
if (_process) {
_process->flush();
}
uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000;
WarnP(this) << "RTP推流器("
<< _media_info.shortUrl()
......
......@@ -25,6 +25,10 @@ RtpSender::RtpSender(EventPoller::Ptr poller) {
_socket_rtp = Socket::createSocket(_poller, false);
}
RtpSender::~RtpSender() {
flush();
}
void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const function<void(uint16_t local_port, const SockException &ex)> &cb){
_args = args;
if (!_interface) {
......@@ -231,6 +235,12 @@ void RtpSender::resetTracks(){
_interface->resetTracks();
}
void RtpSender::flush() {
if (_interface) {
_interface->flush();
}
}
//此函数在其他线程执行
bool RtpSender::inputFrame(const Frame::Ptr &frame) {
//连接成功后才做实质操作(节省cpu资源)
......
......@@ -18,12 +18,12 @@
namespace mediakit{
//rtp发送客户端,支持发送GB28181协议
class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this<RtpSender>{
class RtpSender final : public MediaSinkInterface, public std::enable_shared_from_this<RtpSender>{
public:
typedef std::shared_ptr<RtpSender> Ptr;
RtpSender(toolkit::EventPoller::Ptr poller = nullptr);
~RtpSender() override = default;
~RtpSender() override;
/**
* 开始发送ps-rtp包
......@@ -38,6 +38,11 @@ public:
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出frame缓存
*/
void flush() override;
/**
* 添加track,内部会调用Track的clone方法
* 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系
* @param track
......
......@@ -56,7 +56,7 @@ public:
int ring_size = RTP_GOP_SIZE) :
MediaSource(RTSP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
~RtspMediaSource() override{}
~RtspMediaSource() override { flush(); }
/**
* 获取媒体源的环形缓冲
......
......@@ -17,9 +17,9 @@
#include "Common/MultiMediaSourceMuxer.h"
namespace mediakit {
class RtspMediaSourceImp : public RtspMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener {
class RtspMediaSourceImp final : public RtspMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener {
public:
typedef std::shared_ptr<RtspMediaSourceImp> Ptr;
using Ptr = std::shared_ptr<RtspMediaSourceImp>;
/**
* 构造函数
......@@ -33,7 +33,7 @@ public:
_demuxer->setTrackListener(this);
}
~RtspMediaSourceImp() = default;
~RtspMediaSourceImp() override = default;
/**
* 设置sdp
......
......@@ -16,8 +16,8 @@
namespace mediakit {
class RtspMediaSourceMuxer : public RtspMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<RtspMediaSourceMuxer> {
class RtspMediaSourceMuxer final : public RtspMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<RtspMediaSourceMuxer> {
public:
typedef std::shared_ptr<RtspMediaSourceMuxer> Ptr;
......@@ -29,7 +29,7 @@ public:
getRtpRing()->setDelegate(_media_src);
}
~RtspMediaSourceMuxer() override{}
~RtspMediaSourceMuxer() override { RtspMuxer::flush(); }
void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
setDelegate(listener);
......
......@@ -86,6 +86,14 @@ bool RtspMuxer::inputFrame(const Frame::Ptr &frame) {
return encoder ? encoder->inputFrame(frame) : false;
}
void RtspMuxer::flush() {
for (auto &encoder : _encoder) {
if (encoder) {
encoder->flush();
}
}
}
string RtspMuxer::getSdp() {
return _sdp;
}
......
......@@ -74,6 +74,11 @@ public:
bool inputFrame(const Frame::Ptr &frame) override;
/**
* 刷新输出所有frame缓存
*/
void flush() override;
/**
* 重置所有track
*/
void resetTracks() override ;
......
......@@ -31,7 +31,7 @@ public:
};
//TS直播源
class TSMediaSource : public MediaSource, public toolkit::RingDelegate<TSPacket::Ptr>, private PacketCache<TSPacket>{
class TSMediaSource final : public MediaSource, public toolkit::RingDelegate<TSPacket::Ptr>, private PacketCache<TSPacket>{
public:
using Ptr = std::shared_ptr<TSMediaSource>;
using RingDataType = std::shared_ptr<toolkit::List<TSPacket::Ptr> >;
......@@ -42,7 +42,7 @@ public:
const std::string &stream_id,
int ring_size = TS_GOP_SIZE) : MediaSource(TS_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {}
~TSMediaSource() override = default;
~TSMediaSource() override { flush(); }
/**
* 获取媒体源的环形缓冲
......
......@@ -16,8 +16,8 @@
namespace mediakit {
class TSMediaSourceMuxer : public MpegMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<TSMediaSourceMuxer> {
class TSMediaSourceMuxer final : public MpegMuxer, public MediaSourceEventInterceptor,
public std::enable_shared_from_this<TSMediaSourceMuxer> {
public:
using Ptr = std::shared_ptr<TSMediaSourceMuxer>;
......@@ -27,7 +27,7 @@ public:
_media_src = std::make_shared<TSMediaSource>(vhost, app, stream_id);
}
~TSMediaSourceMuxer() override = default;
~TSMediaSourceMuxer() override { MpegMuxer::flush(); };
void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
setDelegate(listener);
......
......@@ -108,6 +108,9 @@ void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) {
}
void SrtTransportImp::onShutdown(const SockException &ex) {
if (_decoder) {
_decoder->flush();
}
SrtTransport::onShutdown(ex);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论