Commit 3830792c by xiongziliang

整理MediaSource派生类

修复转协议或录制时忽然Track的问题
parent 806747a9
......@@ -33,8 +33,14 @@
class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
class Listener{
public:
Listener() = default;
virtual ~Listener() = default;
virtual void onAllTrackReady() = 0;
};
typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
MultiMediaSourceMuxer(const string &vhost,
const string &strApp,
const string &strId,
......@@ -100,6 +106,9 @@ public:
}
}
void setTrackListener(Listener *listener){
_listener = listener;
}
protected:
/**
* 添加音视频媒体
......@@ -139,10 +148,15 @@ protected:
_rtsp->setTrackSource(shared_from_this());
_rtsp->onAllTrackReady();
}
if(_listener){
_listener->onAllTrackReady();
}
}
private:
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;
Listener *_listener;
};
......
......@@ -110,4 +110,15 @@ vector<Track::Ptr> Demuxer::getTracks(bool trackReady) const {
float Demuxer::getDuration() const {
return _fDuration;
}
void Demuxer::onAddTrack(const Track::Ptr &track){
if(_listener){
_listener->onAddTrack(track);
}
}
void Demuxer::setTrackListener(Demuxer::Listener *listener) {
_listener = listener;
}
} /* namespace mediakit */
......@@ -241,6 +241,13 @@ protected:
class Demuxer : public PlayerBase{
public:
class Listener{
public:
Listener() = default;
virtual ~Listener() = default;
virtual void onAddTrack(const Track::Ptr &track) = 0;
};
Demuxer(){};
virtual ~Demuxer(){};
......@@ -267,7 +274,15 @@ public:
* @return 节目总时长,单位秒
*/
float getDuration() const override;
/**
* 设置track监听器
*/
void setTrackListener(Listener *listener);
protected:
void onAddTrack(const Track::Ptr &track);
protected:
Listener *_listener;
AudioTrack::Ptr _audioTrack;
VideoTrack::Ptr _videoTrack;
Ticker _ticker;
......
......@@ -220,14 +220,12 @@ private:
//保存NoticeCenter的强引用,防止在MediaSourceWatcher单例释放前释放NoticeCenter单例
_notice_center = NoticeCenter::Instance().shared_from_this();
_notice_center->addListener(this,Broadcast::kBroadcastMediaChanged,[this](BroadcastMediaChangedArgs){
if(bRegist){
onRegist(sender);
}else{
onUnRegist(sender);
if(!bRegist){
removeRecorder(sender);
}
});
_notice_center->addListener(this,Broadcast::kBroadcastMediaResetTracks,[this](BroadcastMediaResetTracksArgs){
onRegist(sender);
addRecorder(sender);
});
}
......@@ -236,7 +234,7 @@ private:
_notice_center->delListener(this,Broadcast::kBroadcastMediaResetTracks);
}
void onRegist(MediaSource &sender){
void addRecorder(MediaSource &sender){
auto key = getRecorderKey(sender.getVhost(),sender.getApp(),sender.getId());
lock_guard<decltype(_recorder_mtx)> lck(_recorder_mtx);
auto it = _recorder_map.find(key);
......@@ -247,14 +245,14 @@ private:
if(!it->second->isRecording() || it->second->getSchema() == sender.getSchema()){
// 绑定的协议一致或者并未正在录制则替换tracks
auto tracks = sender.getTracks(true);
auto tracks = sender.getTracks(needTrackReady());
if (!tracks.empty()) {
it->second->attachTracks(std::move(tracks),sender.getSchema());
}
}
}
void onUnRegist(MediaSource &sender){
void removeRecorder(MediaSource &sender){
auto key = getRecorderKey(sender.getVhost(),sender.getApp(),sender.getId());
lock_guard<decltype(_recorder_mtx)> lck(_recorder_mtx);
auto it = _recorder_map.find(key);
......@@ -284,7 +282,7 @@ private:
vector<Track::Ptr> findTracks(const string &vhost, const string &app, const string &stream_id,string &schema) {
auto src = MediaSource::find(RTMP_SCHEMA, vhost, app, stream_id);
if (src) {
auto ret = src->getTracks(true);
auto ret = src->getTracks(needTrackReady());
if (!ret.empty()) {
schema = RTMP_SCHEMA;
return std::move(ret);
......@@ -294,7 +292,7 @@ private:
src = MediaSource::find(RTSP_SCHEMA, vhost, app, stream_id);
if (src) {
schema = RTSP_SCHEMA;
return src->getTracks(true);
return src->getTracks(needTrackReady());
}
return vector<Track::Ptr>();
}
......@@ -320,6 +318,20 @@ private:
}
return ret;
}
/**
* 有些录制类型不需要track就绪即可录制
*/
bool needTrackReady(){
switch (type){
case Recorder::type_hls:
return false;
case Recorder::type_mp4:
return true;
default:
return true;
}
}
private:
recursive_mutex _recorder_mtx;
NoticeCenter::Ptr _notice_center;
......
......@@ -29,23 +29,6 @@
namespace mediakit {
RtmpDemuxer::RtmpDemuxer(const AMFValue &val) {
try {
makeVideoTrack(val["videocodecid"]);
makeAudioTrack(val["audiocodecid"]);
val.object_for_each([&](const string &key, const AMFValue &val) {
if (key == "duration") {
_fDuration = val.as_number();
return;
}
});
}catch (std::exception &ex){
WarnL << ex.what();
}
}
bool RtmpDemuxer::inputRtmp(const RtmpPacket::Ptr &pkt) {
switch (pkt->typeId) {
case MSG_VIDEO: {
......@@ -86,6 +69,7 @@ void RtmpDemuxer::makeVideoTrack(const AMFValue &videoCodec) {
if (_videoRtmpDecoder) {
//设置rtmp解码器代理,生成的frame写入该Track
_videoRtmpDecoder->addDelegate(_videoTrack);
onAddTrack(_videoTrack);
} else {
//找不到相应的rtmp解码器,该track无效
_videoTrack.reset();
......@@ -102,6 +86,7 @@ void RtmpDemuxer::makeAudioTrack(const AMFValue &audioCodec) {
if (_audioRtmpDecoder) {
//设置rtmp解码器代理,生成的frame写入该Track
_audioRtmpDecoder->addDelegate(_audioTrack);
onAddTrack(_audioTrack);
} else {
//找不到相应的rtmp解码器,该track无效
_audioTrack.reset();
......
......@@ -47,12 +47,6 @@ public:
* 等效于RtmpDemuxer(AMFValue(AMF_NULL))
*/
RtmpDemuxer(){}
/**
* 构造rtmp解复用器
* @param val rtmp的metadata,可以传入null类型,
* 这样就会在inputRtmp时异步探测媒体编码格式
*/
RtmpDemuxer(const AMFValue &val);
virtual ~RtmpDemuxer(){};
/**
......
......@@ -122,9 +122,9 @@ public:
/**
* 输入rtmp包
* @param pkt rtmp包
* @param isKey 是否为关键帧
* @param key 是否为关键帧
*/
void onWrite(const RtmpPacket::Ptr &pkt, bool isKey = true) override {
void onWrite(const RtmpPacket::Ptr &pkt, bool key = true) override {
lock_guard<recursive_mutex> lock(_mtx);
if (pkt->isCfgFrame()) {
_config_frame_map[pkt->typeId] = pkt;
......
......@@ -39,56 +39,40 @@
#include "RtmpMediaSource.h"
#include "RtmpDemuxer.h"
#include "Common/MultiMediaSourceMuxer.h"
using namespace std;
using namespace toolkit;
namespace mediakit {
class RtmpToRtspMediaSource: public RtmpMediaSource {
class RtmpMediaSourceImp: public RtmpMediaSource, public Demuxer::Listener , public MultiMediaSourceMuxer::Listener {
public:
typedef std::shared_ptr<RtmpToRtspMediaSource> Ptr;
typedef std::shared_ptr<RtmpMediaSourceImp> Ptr;
RtmpToRtspMediaSource(const string &vhost,
const string &app,
const string &id,
int ringSize = 0) :
RtmpMediaSource(vhost, app, id,ringSize){
/**
* 构造函数
* @param vhost 虚拟主机
* @param app 应用名
* @param id 流id
* @param ringSize 环形缓存大小
*/
RtmpMediaSourceImp(const string &vhost, const string &app, const string &id, int ringSize = 0) : RtmpMediaSource(vhost, app, id, ringSize) {
_demuxer = std::make_shared<RtmpDemuxer>();
_demuxer->setTrackListener(this);
}
virtual ~RtmpToRtspMediaSource(){}
void setMetaData(const AMFValue &metadata) override {
if(!_demuxer){
//在未调用onWrite前,设置Metadata能触发生成RtmpDemuxer
_demuxer = std::make_shared<RtmpDemuxer>(metadata);
}
RtmpMediaSource::setMetaData(metadata);
}
~RtmpMediaSourceImp() = default;
/**
* 输入rtmp并解析
*/
void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos = true) override {
if(!_demuxer){
//尚未获取Metadata,那么不管有没有Metadata,都生成RtmpDemuxer
_demuxer = std::make_shared<RtmpDemuxer>();
}
_demuxer->inputRtmp(pkt);
if(!_muxer && _demuxer->isInited(2000)){
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(),
getApp(),
getId(),
_demuxer->getDuration(),
_enableRtsp,
false,//不重复生成rtmp
_enableHls,
_enableMP4);
for (auto &track : _demuxer->getTracks(false)){
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
_muxer->setListener(getListener());
}
RtmpMediaSource::onWrite(pkt,key_pos);
}
/**
* 设置监听器
* @param listener
*/
void setListener(const std::weak_ptr<MediaSourceEvent> &listener) override {
RtmpMediaSource::setListener(listener);
if(_muxer){
......@@ -96,41 +80,46 @@ public:
}
}
/**
* 播放器总数
*/
int readerCount() override {
return RtmpMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0);
}
/**
* 获取track
* @return
*/
vector<Track::Ptr> getTracks(bool trackReady) const override {
if(!_demuxer){
return this->RtmpMediaSource::getTracks(trackReady);
}
return _demuxer->getTracks(trackReady);
}
/**
* 设置协议转换
* @param enableRtsp 是否转换成rtsp
* @param enableHls 是否转换成hls
* @param enableMP4 是否mp4录制
*/
void setProtocolTranslation(bool enableRtsp,bool enableHls,bool enableMP4){
// DebugL << enableRtsp << " " << enableHls << " " << enableMP4;
_enableRtsp = enableRtsp;
_enableHls = enableHls;
_enableMP4 = enableMP4;
void setProtocolTranslation(bool enableRtsp, bool enableHls, bool enableMP4) {
//不重复生成rtmp
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), 0, enableRtsp, false, enableHls, enableMP4);
_muxer->setListener(getListener());
_muxer->setTrackListener(this);
}
/**
* _demuxer触发的添加Track事件
*/
void onAddTrack(const Track::Ptr &track) override {
if(_muxer){
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
}
/**
* _muxer触发的所有Track就绪的事件
*/
void onAllTrackReady() override{
setTrackSource(_muxer);
}
private:
RtmpDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer;
bool _enableHls = true;
bool _enableMP4 = false;
bool _enableRtsp = true;
};
} /* namespace mediakit */
#endif /* SRC_RTMP_RTMPTORTSPMEDIASOURCE_H_ */
......@@ -67,7 +67,6 @@ private:
if(_pRtmpMediaSrc){
_pRtmpMediaSrc->setMetaData(val);
}
_delegate.reset(new RtmpDemuxer(val));
return true;
}
void onMediaData(const RtmpPacket::Ptr &chunkData) override {
......
......@@ -169,7 +169,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
shutdown(SockException(Err_shutdown,errMsg));
return;
}
_pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid));
_pPublisherSrc.reset(new RtmpMediaSourceImp(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid));
_pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
//设置转协议
_pPublisherSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);
......
......@@ -33,7 +33,7 @@
#include "utils.h"
#include "Common/config.h"
#include "RtmpProtocol.h"
#include "RtmpToRtspMediaSource.h"
#include "RtmpMediaSourceImp.h"
#include "Util/util.h"
#include "Util/TimeTicker.h"
#include "Network/TcpSession.h"
......@@ -94,7 +94,7 @@ private:
double _dNowReqID = 0;
Ticker _ticker;//数据接收时间
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader;
std::shared_ptr<RtmpToRtspMediaSource> _pPublisherSrc;
std::shared_ptr<RtmpMediaSourceImp> _pPublisherSrc;
std::weak_ptr<RtmpMediaSource> _pPlayerSrc;
//时间戳修整器
Stamp _stamp[2];
......
......@@ -89,6 +89,7 @@ void RtspDemuxer::makeAudioTrack(const SdpTrack::Ptr &audio) {
if(_audioRtpDecoder){
//设置rtp解码器代理,生成的frame写入该Track
_audioRtpDecoder->addDelegate(_audioTrack);
onAddTrack(_audioTrack);
} else{
//找不到相应的rtp解码器,该track无效
_audioTrack.reset();
......@@ -105,6 +106,7 @@ void RtspDemuxer::makeVideoTrack(const SdpTrack::Ptr &video) {
if(_videoRtpDecoder){
//设置rtp解码器代理,生成的frame写入该Track
_videoRtpDecoder->addDelegate(_videoTrack);
onAddTrack(_videoTrack);
}else{
//找不到相应的rtp解码器,该track无效
_videoTrack.reset();
......
......@@ -31,70 +31,58 @@
#include "RtspMediaSource.h"
#include "RtspDemuxer.h"
#include "Common/MultiMediaSourceMuxer.h"
using namespace toolkit;
namespace mediakit {
class RtspToRtmpMediaSource : public RtspMediaSource {
class RtspMediaSourceImp : public RtspMediaSource, public Demuxer::Listener , public MultiMediaSourceMuxer::Listener {
public:
typedef std::shared_ptr<RtspToRtmpMediaSource> Ptr;
RtspToRtmpMediaSource(const string &vhost,
const string &app,
const string &id,
int ringSize = 0)
: RtspMediaSource(vhost, app, id,ringSize) {
}
typedef std::shared_ptr<RtspMediaSourceImp> Ptr;
virtual ~RtspToRtmpMediaSource() {}
/**
* 构造函数
* @param vhost 虚拟主机
* @param app 应用名
* @param id 流id
* @param ringSize 环形缓存大小
*/
RtspMediaSourceImp(const string &vhost, const string &app, const string &id, int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) {}
~RtspMediaSourceImp() = default;
virtual void setSdp(const string &strSdp) override {
/**
* 设置sdp
*/
void setSdp(const string &strSdp) override {
_demuxer = std::make_shared<RtspDemuxer>(strSdp);
_demuxer->setTrackListener(this);
RtspMediaSource::setSdp(strSdp);
}
virtual void onWrite(const RtpPacket::Ptr &rtp, bool bKeyPos) override {
/**
* 输入rtp并解析
*/
void onWrite(const RtpPacket::Ptr &rtp, bool key_pos) override {
if (_demuxer) {
bKeyPos = _demuxer->inputRtp(rtp);
if (!_muxer && _demuxer->isInited(2000)) {
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(),
getApp(),
getId(),
_demuxer->getDuration(),
false,//不重复生成rtsp
_enableRtmp,
_enableHls,
_enableMP4);
for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
_muxer->setListener(getListener());
}
key_pos = _demuxer->inputRtp(rtp);
}
RtspMediaSource::onWrite(rtp, bKeyPos);
RtspMediaSource::onWrite(rtp, key_pos);
}
/**
* 设置监听器
* @param listener
*/
void setListener(const std::weak_ptr<MediaSourceEvent> &listener) override {
RtspMediaSource::setListener(listener);
if(_muxer){
_muxer->setListener(listener);
}
}
int readerCount() override {
return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0);
}
/**
* 获取track
* @return
*/
vector<Track::Ptr> getTracks(bool trackReady) const override {
if(!_demuxer){
return this->RtspMediaSource::getTracks(trackReady);
}
return _demuxer->getTracks(trackReady);
* 播放器总数
*/
int readerCount() override {
return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0);
}
/**
......@@ -104,19 +92,32 @@ public:
* @param enableMP4 是否mp4录制
*/
void setProtocolTranslation(bool enableRtmp,bool enableHls,bool enableMP4){
// DebugL << enableRtmp << " " << enableHls << " " << enableMP4;
_enableRtmp = enableRtmp;
_enableHls = enableHls;
_enableMP4 = enableMP4;
//不重复生成rtsp
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), 0, false, enableRtmp, enableHls, enableMP4);
_muxer->setListener(getListener());
_muxer->setTrackListener(this);
}
/**
* _demuxer触发的添加Track事件
*/
void onAddTrack(const Track::Ptr &track) override {
if(_muxer){
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
}
/**
* _muxer触发的所有Track就绪的事件
*/
void onAllTrackReady() override{
setTrackSource(_muxer);
}
private:
RtspDemuxer::Ptr _demuxer;
MultiMediaSourceMuxer::Ptr _muxer;
bool _enableHls = true;
bool _enableMP4 = false;
bool _enableRtmp = true;
};
} /* namespace mediakit */
#endif /* SRC_RTSP_RTSPTORTMPMEDIASOURCE_H_ */
......@@ -263,7 +263,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
_strSession = makeRandStr(12);
_aTrackInfo = sdpParser.getAvailableTrack();
_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc = std::make_shared<RtspMediaSourceImp>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->setSdp(sdpParser.toString());
......
......@@ -40,7 +40,7 @@
#include "RtspMediaSource.h"
#include "RtspSplitter.h"
#include "RtpReceiver.h"
#include "RtspToRtmpMediaSource.h"
#include "RtspMediaSourceImp.h"
#include "Common/Stamp.h"
using namespace std;
......@@ -231,7 +231,7 @@ private:
//是否开始发送rtp
bool _enableSendRtp;
//rtsp推流相关
RtspToRtmpMediaSource::Ptr _pushSrc;
RtspMediaSourceImp::Ptr _pushSrc;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _aRtcpCnt[2];
//rtcp发送时间,trackid idx 为数组下标
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论