Commit 4b34e58d by ziyue

移除MultiMuxerPrivate对象,修复rtp发送时无sps pps帧的问题

parent 902c25ae
...@@ -18,12 +18,46 @@ namespace toolkit { ...@@ -18,12 +18,46 @@ namespace toolkit {
namespace mediakit { namespace mediakit {
///////////////////////////////MultiMuxerPrivate////////////////////////////////// static std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, size_t max_second){
auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), custom_path, max_second);
for (auto &track : tracks) {
recorder->addTrack(track);
}
return recorder;
}
MultiMuxerPrivate::~MultiMuxerPrivate() {} static string getTrackInfoStr(const TrackSource *track_src){
MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, const string &stream, float dur_sec, _StrPrinter codec_info;
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) { auto tracks = track_src->getTracks(true);
_stream_url = vhost + " " + app + " " + stream; for (auto &track : tracks) {
auto codec_type = track->getTrackType();
codec_info << track->getCodecName();
switch (codec_type) {
case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
codec_info << "["
<< audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit() << "] ";
break;
}
case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
codec_info << "["
<< video_track->getVideoWidth() << "/"
<< video_track->getVideoHeight() << "/"
<< round(video_track->getVideoFps()) << "] ";
break;
}
default:
break;
}
}
return std::move(codec_info);
}
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
if (enable_rtmp) { if (enable_rtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleMeta>(dur_sec)); _rtmp = std::make_shared<RtmpMediaSourceMuxer>(vhost, app, stream, std::make_shared<TitleMeta>(dur_sec));
} }
...@@ -46,85 +80,79 @@ MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, con ...@@ -46,85 +80,79 @@ MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, con
#endif #endif
} }
void MultiMuxerPrivate::resetTracks() { void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
setDelegate(listener);
auto self = shared_from_this();
//拦截事件
if (_rtmp) { if (_rtmp) {
_rtmp->resetTracks(); _rtmp->setListener(self);
} }
if (_rtsp) { if (_rtsp) {
_rtsp->resetTracks(); _rtsp->setListener(self);
} }
if (_ts) { if (_ts) {
_ts->resetTracks(); _ts->setListener(self);
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->resetTracks(); _fmp4->setListener(self);
} }
#endif #endif
//拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
auto hls = _hls; auto hls = _hls;
if (hls) { if (hls) {
hls->resetTracks(); hls->setListener(self);
} }
}
auto mp4 = _mp4; void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr<Listener> &listener) {
if (mp4) { _track_listener = listener;
mp4->resetTracks();
}
} }
void MultiMuxerPrivate::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) { int MultiMediaSourceMuxer::totalReaderCount() const {
_listener = listener; auto hls = _hls;
auto ret = (_rtsp ? _rtsp->readerCount() : 0) +
(_rtmp ? _rtmp->readerCount() : 0) +
(_ts ? _ts->readerCount() : 0) +
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->readerCount() : 0) +
#endif
(hls ? hls->readerCount() : 0);
#if defined(ENABLE_RTPPROXY)
return ret + (int)_rtp_sender.size();
#else
return ret;
#endif
}
void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) {
if (_rtmp) { if (_rtmp) {
_rtmp->setListener(listener); _rtmp->setTimeStamp(stamp);
} }
if (_rtsp) { if (_rtsp) {
_rtsp->setListener(listener); _rtsp->setTimeStamp(stamp);
}
if (_ts) {
_ts->setListener(listener);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->setListener(listener);
}
#endif
auto hls = _hls;
if (hls) {
hls->setListener(listener);
} }
} }
int MultiMuxerPrivate::totalReaderCount() const { int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) {
auto hls = _hls; auto listener = getDelegate();
return (_rtsp ? _rtsp->readerCount() : 0) + if (!listener) {
(_rtmp ? _rtmp->readerCount() : 0) + return totalReaderCount();
(_ts ? _ts->readerCount() : 0) +
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->readerCount() : 0) +
#endif
(hls ? hls->readerCount() : 0);
}
static std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, const vector<Track::Ptr> &tracks, Recorder::type type, const string &custom_path, size_t max_second){
auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), custom_path, max_second);
for (auto &track : tracks) {
recorder->addTrack(track);
} }
return recorder; return listener->totalReaderCount(sender);
} }
//此函数可能跨线程调用 //此函数可能跨线程调用
bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second){ bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) {
switch (type) { switch (type) {
case Recorder::type_hls : { case Recorder::type_hls : {
if (start && !_hls) { if (start && !_hls) {
//开始录制 //开始录制
auto hls = dynamic_pointer_cast<HlsRecorder>(makeRecorder(sender, getTracks(true), type, custom_path, max_second)); auto hls = dynamic_pointer_cast<HlsRecorder>(makeRecorder(sender, MediaSink::getTracks(), type, custom_path, max_second));
if (hls) { if (hls) {
//设置HlsMediaSource的事件监听器 //设置HlsMediaSource的事件监听器
hls->setListener(_listener); hls->setListener(shared_from_this());
} }
_hls = hls; _hls = hls;
} else if (!start && _hls) { } else if (!start && _hls) {
...@@ -136,7 +164,7 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo ...@@ -136,7 +164,7 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo
case Recorder::type_mp4 : { case Recorder::type_mp4 : {
if (start && !_mp4) { if (start && !_mp4) {
//开始录制 //开始录制
_mp4 = makeRecorder(sender, getTracks(true), type, custom_path, max_second); _mp4 = makeRecorder(sender, MediaSink::getTracks(), type, custom_path, max_second);
} else if (!start && _mp4) { } else if (!start && _mp4) {
//停止录制 //停止录制
_mp4 = nullptr; _mp4 = nullptr;
...@@ -148,7 +176,7 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo ...@@ -148,7 +176,7 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo
} }
//此函数可能跨线程调用 //此函数可能跨线程调用
bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type) {
switch (type){ switch (type){
case Recorder::type_hls : case Recorder::type_hls :
return _hls ? true : false; return _hls ? true : false;
...@@ -159,20 +187,57 @@ bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){ ...@@ -159,20 +187,57 @@ bool MultiMuxerPrivate::isRecording(MediaSource &sender, Recorder::type type){
} }
} }
void MultiMuxerPrivate::setTimeStamp(uint32_t stamp) { void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(uint16_t local_port, const SockException &ex)> &cb){
if (_rtmp) { #if defined(ENABLE_RTPPROXY)
_rtmp->setTimeStamp(stamp); RtpSender::Ptr rtp_sender = std::make_shared<RtpSender>(atoi(ssrc.data()));
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](uint16_t local_port, const SockException &ex) {
cb(local_port, ex);
auto strong_self = weak_self.lock();
if (!strong_self || ex) {
return;
}
for (auto &track : strong_self->MediaSink::getTracks(false)) {
rtp_sender->addTrack(track);
}
rtp_sender->addTrackCompleted();
lock_guard<mutex> lck(strong_self->_rtp_sender_mtx);
strong_self->_rtp_sender[ssrc] = rtp_sender;
});
#else
cb(0, SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏"));
#endif//ENABLE_RTPPROXY
}
bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string &ssrc) {
#if defined(ENABLE_RTPPROXY)
if (&sender != MediaSource::NullMediaSource) {
onceToken token(nullptr, [&]() {
//关闭rtp推流,可能触发无人观看事件
MediaSourceEventInterceptor::onReaderChanged(sender, totalReaderCount());
});
} }
if (_rtsp) { if (ssrc.empty()) {
_rtsp->setTimeStamp(stamp); //关闭全部
lock_guard<mutex> lck(_rtp_sender_mtx);
auto size = _rtp_sender.size();
_rtp_sender.clear();
return size;
} }
//关闭特定的
lock_guard<mutex> lck(_rtp_sender_mtx);
return _rtp_sender.erase(ssrc);
#else
return false;
#endif//ENABLE_RTPPROXY
} }
void MultiMuxerPrivate::setTrackListener(Listener *listener) { void MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) {
_track_listener = listener; if (CodecL16 == track->getCodecId()) {
} WarnL << "L16音频格式目前只支持RTSP协议推流拉流!!!";
return;
}
void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) {
if (_rtmp) { if (_rtmp) {
_rtmp->addTrack(track); _rtmp->addTrack(track);
} }
...@@ -199,212 +264,62 @@ void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) { ...@@ -199,212 +264,62 @@ void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) {
} }
} }
bool MultiMuxerPrivate::isEnabled(){ void MultiMediaSourceMuxer::onAllTrackReady() {
auto hls = _hls; setMediaListener(getDelegate());
return (_rtmp ? _rtmp->isEnabled() : false) ||
(_rtsp ? _rtsp->isEnabled() : false) ||
(_ts ? _ts->isEnabled() : false) ||
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->isEnabled() : false) ||
#endif
(hls ? hls->isEnabled() : false) || _mp4;
}
void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) {
if (_rtmp) { if (_rtmp) {
_rtmp->inputFrame(frame); _rtmp->onAllTrackReady();
} }
if (_rtsp) { if (_rtsp) {
_rtsp->inputFrame(frame); _rtsp->onAllTrackReady();
}
if (_ts) {
_ts->inputFrame(frame);
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->inputFrame(frame); _fmp4->onAllTrackReady();
} }
#endif #endif
auto listener = _track_listener.lock();
//拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 if (listener) {
//此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优 listener->onAllTrackReady();
auto hls = _hls;
if (hls) {
hls->inputFrame(frame);
}
auto mp4 = _mp4;
if (mp4) {
mp4->inputFrame(frame);
} }
InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this);
} }
static string getTrackInfoStr(const TrackSource *track_src){ void MultiMediaSourceMuxer::resetTracks() {
_StrPrinter codec_info; MediaSink::resetTracks();
auto tracks = track_src->getTracks(true);
for (auto &track : tracks) {
auto codec_type = track->getTrackType();
codec_info << track->getCodecName();
switch (codec_type) {
case TrackAudio : {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
codec_info << "["
<< audio_track->getAudioSampleRate() << "/"
<< audio_track->getAudioChannel() << "/"
<< audio_track->getAudioSampleBit() << "] ";
break;
}
case TrackVideo : {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
codec_info << "["
<< video_track->getVideoWidth() << "/"
<< video_track->getVideoHeight() << "/"
<< round(video_track->getVideoFps()) << "] ";
break;
}
default:
break;
}
}
return std::move(codec_info);
}
void MultiMuxerPrivate::onAllTrackReady() {
if (_rtmp) { if (_rtmp) {
_rtmp->onAllTrackReady(); _rtmp->resetTracks();
} }
if (_rtsp) { if (_rtsp) {
_rtsp->onAllTrackReady(); _rtsp->resetTracks();
}
if (_ts) {
_ts->resetTracks();
} }
#if defined(ENABLE_MP4) #if defined(ENABLE_MP4)
if (_fmp4) { if (_fmp4) {
_fmp4->onAllTrackReady(); _fmp4->resetTracks();
}
#endif
if (_track_listener) {
_track_listener->onAllTrackReady();
} }
InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this);
}
///////////////////////////////MultiMediaSourceMuxer//////////////////////////////////
MultiMediaSourceMuxer::~MultiMediaSourceMuxer() {}
MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) {
_muxer.reset(new MultiMuxerPrivate(vhost, app, stream, dur_sec, enable_rtsp, enable_rtmp, enable_hls, enable_mp4));
_muxer->setTrackListener(this);
}
void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener) {
setDelegate(listener);
//拦截事件
_muxer->setMediaListener(shared_from_this());
}
void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener) {
_track_listener = listener;
}
int MultiMediaSourceMuxer::totalReaderCount() const {
#if defined(ENABLE_RTPPROXY)
return _muxer->totalReaderCount() + (int)_rtp_sender.size();
#else
return _muxer->totalReaderCount();
#endif #endif
}
void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) {
_muxer->setTimeStamp(stamp);
}
vector<Track::Ptr> MultiMediaSourceMuxer::getTracks(MediaSource &sender, bool trackReady) const {
return _muxer->getTracks(trackReady);
}
int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) {
auto listener = getDelegate();
if (!listener) {
return totalReaderCount();
}
return listener->totalReaderCount(sender);
}
bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second) {
return _muxer->setupRecord(sender, type, start, custom_path, max_second);
}
bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type) {
return _muxer->isRecording(sender,type);
}
void MultiMediaSourceMuxer::startSendRtp(MediaSource &, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function<void(uint16_t local_port, const SockException &ex)> &cb){
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
RtpSender::Ptr rtp_sender = std::make_shared<RtpSender>(atoi(ssrc.data()));
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](uint16_t local_port, const SockException &ex) {
cb(local_port, ex);
auto strong_self = weak_self.lock();
if (!strong_self || ex) {
return;
}
for (auto &track : strong_self->_muxer->getTracks(false)) {
rtp_sender->addTrack(track);
}
rtp_sender->addTrackCompleted();
lock_guard<mutex> lck(strong_self->_rtp_sender_mtx);
strong_self->_rtp_sender[ssrc] = rtp_sender;
});
#else
cb(0, SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏"));
#endif//ENABLE_RTPPROXY
}
bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string &ssrc) {
#if defined(ENABLE_RTPPROXY)
if (&sender != MediaSource::NullMediaSource) {
onceToken token(nullptr, [&]() {
//关闭rtp推流,可能触发无人观看事件
MediaSourceEventInterceptor::onReaderChanged(sender, totalReaderCount());
});
}
if (ssrc.empty()) {
//关闭全部
lock_guard<mutex> lck(_rtp_sender_mtx);
auto size = _rtp_sender.size();
_rtp_sender.clear();
return size;
}
//关闭特定的
lock_guard<mutex> lck(_rtp_sender_mtx); lock_guard<mutex> lck(_rtp_sender_mtx);
return _rtp_sender.erase(ssrc); for (auto &pr : _rtp_sender) {
#else pr.second->resetTracks();
return false;
#endif//ENABLE_RTPPROXY
}
void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) {
if (CodecL16 == track->getCodecId()) {
WarnL << "L16音频格式目前只支持RTSP协议推流拉流!!!";
return;
} }
_muxer->addTrack(track); #endif
}
void MultiMediaSourceMuxer::addTrackCompleted() {
_muxer->addTrackCompleted();
}
void MultiMediaSourceMuxer::onAllTrackReady(){ //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
_muxer->setMediaListener(shared_from_this()); auto hls = _hls;
auto listener = _track_listener.lock(); if (hls) {
if(listener){ hls->resetTracks();
listener->onAllTrackReady();
} }
}
void MultiMediaSourceMuxer::resetTracks() { auto mp4 = _mp4;
_muxer->resetTracks(); if (mp4) {
mp4->resetTracks();
}
} }
//该类实现frame级别的时间戳覆盖 //该类实现frame级别的时间戳覆盖
...@@ -459,14 +374,40 @@ private: ...@@ -459,14 +374,40 @@ private:
Frame::Ptr _frame; Frame::Ptr _frame;
}; };
void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { void MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) {
GET_CONFIG(bool, modify_stamp, General::kModifyStamp); GET_CONFIG(bool, modify_stamp, General::kModifyStamp);
auto frame = frame_in; auto frame = frame_in;
if (modify_stamp) { if (modify_stamp) {
//开启了时间戳覆盖 //开启了时间戳覆盖
frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]); frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]);
} }
_muxer->inputFrame(frame);
if (_rtmp) {
_rtmp->inputFrame(frame);
}
if (_rtsp) {
_rtsp->inputFrame(frame);
}
if (_ts) {
_ts->inputFrame(frame);
}
//拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
//此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优
auto hls = _hls;
if (hls) {
hls->inputFrame(frame);
}
auto mp4 = _mp4;
if (mp4) {
mp4->inputFrame(frame);
}
#if defined(ENABLE_MP4)
if (_fmp4) {
_fmp4->inputFrame(frame);
}
#endif
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
lock_guard<mutex> lck(_rtp_sender_mtx); lock_guard<mutex> lck(_rtp_sender_mtx);
...@@ -474,7 +415,6 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { ...@@ -474,7 +415,6 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) {
pr.second->inputFrame(frame); pr.second->inputFrame(frame);
} }
#endif //ENABLE_RTPPROXY #endif //ENABLE_RTPPROXY
} }
bool MultiMediaSourceMuxer::isEnabled(){ bool MultiMediaSourceMuxer::isEnabled(){
...@@ -482,10 +422,19 @@ bool MultiMediaSourceMuxer::isEnabled(){ ...@@ -482,10 +422,19 @@ bool MultiMediaSourceMuxer::isEnabled(){
if (!_is_enable || _last_check.elapsedTime() > stream_none_reader_delay_ms) { if (!_is_enable || _last_check.elapsedTime() > stream_none_reader_delay_ms) {
//无人观看时,每次检查是否真的无人观看 //无人观看时,每次检查是否真的无人观看
//有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能) //有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能)
auto hls = _hls;
auto flag = (_rtmp ? _rtmp->isEnabled() : false) ||
(_rtsp ? _rtsp->isEnabled() : false) ||
(_ts ? _ts->isEnabled() : false) ||
#if defined(ENABLE_MP4)
(_fmp4 ? _fmp4->isEnabled() : false) ||
#endif
(hls ? hls->isEnabled() : false) || _mp4;
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
_is_enable = (_muxer->isEnabled() || _rtp_sender.size()); _is_enable = flag || _rtp_sender.size();
#else #else
_is_enable = _muxer->isEnabled(); _is_enable = flag;
#endif //ENABLE_RTPPROXY #endif //ENABLE_RTPPROXY
if (_is_enable) { if (_is_enable) {
//无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍,所以刷新计数器无意义且浪费cpu //无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍,所以刷新计数器无意义且浪费cpu
...@@ -495,5 +444,4 @@ bool MultiMediaSourceMuxer::isEnabled(){ ...@@ -495,5 +444,4 @@ bool MultiMediaSourceMuxer::isEnabled(){
return _is_enable; return _is_enable;
} }
}//namespace mediakit
}//namespace mediakit \ No newline at end of file
...@@ -23,10 +23,10 @@ ...@@ -23,10 +23,10 @@
namespace mediakit{ namespace mediakit{
class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this<MultiMuxerPrivate>{ class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public: public:
friend class MultiMediaSourceMuxer; typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
typedef std::shared_ptr<MultiMuxerPrivate> Ptr;
class Listener{ class Listener{
public: public:
Listener() = default; Listener() = default;
...@@ -34,43 +34,7 @@ public: ...@@ -34,43 +34,7 @@ public:
virtual void onAllTrackReady() = 0; virtual void onAllTrackReady() = 0;
}; };
~MultiMuxerPrivate() override; ~MultiMediaSourceMuxer() override = default;
private:
MultiMuxerPrivate(const string &vhost,const string &app, const string &stream,float dur_sec,
bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4);
void resetTracks() override;
void setMediaListener(const std::weak_ptr<MediaSourceEvent> &listener);
int totalReaderCount() const;
void setTimeStamp(uint32_t stamp);
void setTrackListener(Listener *listener);
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path, size_t max_second);
bool isRecording(MediaSource &sender, Recorder::type type);
bool isEnabled();
void onTrackReady(const Track::Ptr & track) override;
void onTrackFrame(const Frame::Ptr &frame) override;
void onAllTrackReady() override;
private:
string _stream_url;
Listener *_track_listener = nullptr;
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;
HlsRecorder::Ptr _hls;
MediaSinkInterface::Ptr _mp4;
TSMediaSourceMuxer::Ptr _ts;
#if defined(ENABLE_MP4)
FMP4MediaSourceMuxer::Ptr _fmp4;
#endif
std::weak_ptr<MediaSourceEvent> _listener;
};
class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSinkInterface, public MultiMuxerPrivate::Listener, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
typedef MultiMuxerPrivate::Listener Listener;
typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
~MultiMediaSourceMuxer() override;
MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0, MultiMediaSourceMuxer(const string &vhost, const string &app, const string &stream, float dur_sec = 0.0,
bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false); bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false);
...@@ -84,7 +48,7 @@ public: ...@@ -84,7 +48,7 @@ public:
* 随着Track就绪事件监听器 * 随着Track就绪事件监听器
* @param listener 事件监听器 * @param listener 事件监听器
*/ */
void setTrackListener(const std::weak_ptr<MultiMuxerPrivate::Listener> &listener); void setTrackListener(const std::weak_ptr<Listener> &listener);
/** /**
* 返回总的消费者个数 * 返回总的消费者个数
...@@ -105,13 +69,6 @@ public: ...@@ -105,13 +69,6 @@ public:
/////////////////////////////////MediaSourceEvent override///////////////////////////////// /////////////////////////////////MediaSourceEvent override/////////////////////////////////
/** /**
* 获取所有Track
* @param trackReady 是否筛选过滤未就绪的track
* @return 所有Track
*/
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
/**
* 观看总人数 * 观看总人数
* @param sender 事件发送者 * @param sender 事件发送者
* @return 观看总人数 * @return 观看总人数
...@@ -150,48 +107,52 @@ public: ...@@ -150,48 +107,52 @@ public:
*/ */
bool stopSendRtp(MediaSource &sender, const string &ssrc) override; bool stopSendRtp(MediaSource &sender, const string &ssrc) override;
/////////////////////////////////MediaSinkInterface override/////////////////////////////////
/**
* 添加track,内部会调用Track的clone方法
* 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系
* @param track 添加音频或视频轨道
*/
void addTrack(const Track::Ptr &track) override;
/**
* 添加track完毕
*/
void addTrackCompleted() override;
/** /**
* 重置track * 重置track
*/ */
void resetTracks() override; void resetTracks() override;
/** protected:
* 写入帧数据 /////////////////////////////////MediaSink override/////////////////////////////////
* @param frame 帧
*/
void inputFrame(const Frame::Ptr &frame) override;
/////////////////////////////////MultiMuxerPrivate::Listener override///////////////////////////////// /**
* 某track已经准备好,其ready()状态返回true,
* 此时代表可以获取其例如sps pps等相关信息了
* @param track
*/
void onTrackReady(const Track::Ptr & track) override;
/** /**
* 所有track全部就绪 * 所有Track已经准备好,
*/ */
void onAllTrackReady() override; void onAllTrackReady() override;
/**
* 某Track输出frame,在onAllTrackReady触发后才会调用此方法
* @param frame
*/
void onTrackFrame(const Frame::Ptr &frame) override;
private: private:
bool _is_enable = false; bool _is_enable = false;
string _stream_url;
Ticker _last_check; Ticker _last_check;
Stamp _stamp[2]; Stamp _stamp[2];
MultiMuxerPrivate::Ptr _muxer; std::weak_ptr<Listener> _track_listener;
std::weak_ptr<MultiMuxerPrivate::Listener> _track_listener;
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
mutex _rtp_sender_mtx; mutex _rtp_sender_mtx;
unordered_map<string, RtpSender::Ptr> _rtp_sender; unordered_map<string, RtpSender::Ptr> _rtp_sender;
#endif //ENABLE_RTPPROXY #endif //ENABLE_RTPPROXY
#if defined(ENABLE_MP4)
FMP4MediaSourceMuxer::Ptr _fmp4;
#endif
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;
TSMediaSourceMuxer::Ptr _ts;
MediaSinkInterface::Ptr _mp4;
HlsRecorder::Ptr _hls;
//对象个数统计 //对象个数统计
ObjectStatistic<MultiMediaSourceMuxer> _statistic; ObjectStatistic<MultiMediaSourceMuxer> _statistic;
}; };
......
...@@ -127,7 +127,7 @@ public: ...@@ -127,7 +127,7 @@ public:
if (_recreate_metadata) { if (_recreate_metadata) {
//更新metadata //更新metadata
for (auto &track : _muxer->getTracks(*this)) { for (auto &track : _muxer->MediaSink::getTracks()) {
Metadata::addTrack(_metadata, track); Metadata::addTrack(_metadata, track);
} }
RtmpMediaSource::updateMetaData(_metadata); RtmpMediaSource::updateMetaData(_metadata);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论