Commit 732eb2d1 by xiongziliang

完成FLV复用器改造

优化代码结构
parent ef9ebc89
...@@ -117,9 +117,6 @@ public: ...@@ -117,9 +117,6 @@ public:
unregist(); unregist();
} }
virtual bool regist() ;
virtual bool unregist() ;
static Ptr find(const string &schema, static Ptr find(const string &schema,
const string &vhost, const string &vhost,
const string &app, const string &app,
...@@ -179,6 +176,10 @@ public: ...@@ -179,6 +176,10 @@ public:
} }
} }
} }
protected:
bool regist() ;
bool unregist() ;
private: private:
template <typename FUN> template <typename FUN>
static bool searchMedia(const string &schema, static bool searchMedia(const string &schema,
......
...@@ -207,11 +207,9 @@ inline bool HttpSession::checkLiveFlvStream(){ ...@@ -207,11 +207,9 @@ inline bool HttpSession::checkLiveFlvStream(){
m_mediaInfo.m_streamid.erase(m_mediaInfo.m_streamid.size() - 4);//去除.flv后缀 m_mediaInfo.m_streamid.erase(m_mediaInfo.m_streamid.size() - 4);//去除.flv后缀
auto mediaSrc = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid)); auto mediaSrc = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid));
if(!mediaSrc || !mediaSrc->ready()){ if(!mediaSrc){
//该rtmp源不存在 //该rtmp源不存在
sendNotFound(true); return false;
shutdown();
return true;
} }
auto onRes = [this,mediaSrc](const string &err){ auto onRes = [this,mediaSrc](const string &err){
......
...@@ -41,7 +41,7 @@ namespace ZL { ...@@ -41,7 +41,7 @@ namespace ZL {
namespace Http { namespace Http {
class HttpSession: public TcpSession,FlvMuxer { class HttpSession: public TcpSession,public FlvMuxer {
public: public:
typedef StrCaseMap KeyValue; typedef StrCaseMap KeyValue;
typedef std::function<void(const string &codeOut, typedef std::function<void(const string &codeOut,
......
...@@ -21,9 +21,6 @@ void FlvMuxer::start(const RtmpMediaSource::Ptr &media) { ...@@ -21,9 +21,6 @@ void FlvMuxer::start(const RtmpMediaSource::Ptr &media) {
if(!media){ if(!media){
throw std::runtime_error("RtmpMediaSource 无效"); throw std::runtime_error("RtmpMediaSource 无效");
} }
if(!media->ready()){
throw std::runtime_error("RtmpMediaSource 未准备好");
}
onWriteFlvHeader(media); onWriteFlvHeader(media);
...@@ -176,6 +173,7 @@ void FlvRecorder::startRecord(const string &vhost, const string &app, const stri ...@@ -176,6 +173,7 @@ void FlvRecorder::startRecord(const string &vhost, const string &app, const stri
} }
void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &file_path) { void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &file_path) {
lock_guard<recursive_mutex> lck(_file_mtx);
//开辟文件写缓存 //开辟文件写缓存
std::shared_ptr<char> fileBuf(new char[FILE_BUF_SIZE],[](char *ptr){ std::shared_ptr<char> fileBuf(new char[FILE_BUF_SIZE],[](char *ptr){
if(ptr){ if(ptr){
...@@ -183,7 +181,7 @@ void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &f ...@@ -183,7 +181,7 @@ void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &f
} }
}); });
//新建文件 //新建文件
std::shared_ptr<FILE> _file(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){ _file.reset(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){
if(fp){ if(fp){
fflush(fp); fflush(fp);
fclose(fp); fclose(fp);
...@@ -221,6 +219,12 @@ std::shared_ptr<FlvMuxer> FlvRecorder::getSharedPtr() { ...@@ -221,6 +219,12 @@ std::shared_ptr<FlvMuxer> FlvRecorder::getSharedPtr() {
return shared_from_this(); return shared_from_this();
} }
FlvRecorder::FlvRecorder() {
}
FlvRecorder::~FlvRecorder() {
}
}//namespace Rtmp }//namespace Rtmp
}//namespace ZL }//namespace ZL
...@@ -16,6 +16,7 @@ namespace Rtmp { ...@@ -16,6 +16,7 @@ namespace Rtmp {
class FlvMuxer{ class FlvMuxer{
public: public:
typedef std::shared_ptr<FlvMuxer> Ptr;
FlvMuxer(); FlvMuxer();
virtual ~FlvMuxer(); virtual ~FlvMuxer();
void stop(); void stop();
...@@ -38,6 +39,7 @@ private: ...@@ -38,6 +39,7 @@ private:
class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this<FlvRecorder>{ class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this<FlvRecorder>{
public: public:
typedef std::shared_ptr<FlvRecorder> Ptr;
FlvRecorder(); FlvRecorder();
virtual ~FlvRecorder(); virtual ~FlvRecorder();
void startRecord(const string &vhost,const string &app,const string &stream,const string &file_path); void startRecord(const string &vhost,const string &app,const string &stream,const string &file_path);
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <unordered_map> #include <unordered_map>
#include "amf.h" #include "amf.h"
#include "Rtmp.h" #include "Rtmp.h"
#include "RtmpParser.h"
#include "Common/config.h" #include "Common/config.h"
#include "Common/MediaSender.h" #include "Common/MediaSender.h"
#include "Common/MediaSource.h" #include "Common/MediaSource.h"
...@@ -71,6 +72,7 @@ public: ...@@ -71,6 +72,7 @@ public:
} }
const AMFValue &getMetaData() const { const AMFValue &getMetaData() const {
lock_guard<recursive_mutex> lock(m_mtxMap);
return m_metadata; return m_metadata;
} }
template<typename FUN> template<typename FUN>
...@@ -80,29 +82,52 @@ public: ...@@ -80,29 +82,52 @@ public:
f(pr.second); f(pr.second);
} }
} }
bool ready() const {
lock_guard<recursive_mutex> lock(m_mtxMap);
return (m_mapCfgFrame.size() != 0);
}
virtual void onGetMetaData(const AMFValue &_metadata) { virtual void onGetMetaData(const AMFValue &_metadata) {
lock_guard<recursive_mutex> lock(m_mtxMap);
m_metadata = _metadata; m_metadata = _metadata;
RtmpParser parser(_metadata);
m_iCfgFrameSize = parser.containAudio() + parser.containVideo();
if(ready()){
MediaSource::regist();
m_bRegisted = true;
} else{
m_bAsyncRegist = true;
}
} }
virtual void onGetMedia(const RtmpPacket::Ptr &pkt) { virtual void onGetMedia(const RtmpPacket::Ptr &pkt) {
if (pkt->isCfgFrame()) { if(!m_bRegisted){
lock_guard<recursive_mutex> lock(m_mtxMap); lock_guard<recursive_mutex> lock(m_mtxMap);
m_mapCfgFrame.emplace(pkt->typeId, pkt); if (m_mapCfgFrame.size() != m_iCfgFrameSize && pkt->isCfgFrame()) {
m_mapCfgFrame.emplace(pkt->typeId, pkt);
if( m_mapCfgFrame.size() == m_iCfgFrameSize && m_bAsyncRegist){
m_bAsyncRegist = false;
MediaSource::regist();
m_bRegisted = true;
}
}
} }
auto _ring = m_pRing; auto _ring = m_pRing;
m_thPool.async([_ring,pkt]() { m_thPool.async([_ring,pkt]() {
_ring->write(pkt,pkt->isVideoKeyFrame()); _ring->write(pkt,pkt->isVideoKeyFrame());
}); });
} }
private:
bool ready(){
lock_guard<recursive_mutex> lock(m_mtxMap);
return m_iCfgFrameSize != -1 && m_iCfgFrameSize == m_mapCfgFrame.size();
}
protected: protected:
AMFValue m_metadata; AMFValue m_metadata;
unordered_map<int, RtmpPacket::Ptr> m_mapCfgFrame; unordered_map<int, RtmpPacket::Ptr> m_mapCfgFrame;
mutable recursive_mutex m_mtxMap; mutable recursive_mutex m_mtxMap;
RingBuffer<RtmpPacket::Ptr>::Ptr m_pRing; //rtp环形缓冲 RingBuffer<RtmpPacket::Ptr>::Ptr m_pRing; //rtp环形缓冲
ThreadPool &m_thPool; ThreadPool &m_thPool;
int m_iCfgFrameSize = -1;
bool m_bAsyncRegist = false;
bool m_bRegisted = false;
}; };
} /* namespace Rtmp */ } /* namespace Rtmp */
......
...@@ -194,10 +194,7 @@ inline void RtmpPusher::send_metaData(){ ...@@ -194,10 +194,7 @@ inline void RtmpPusher::send_metaData(){
if (!src) { if (!src) {
throw std::runtime_error("the media source was released"); throw std::runtime_error("the media source was released");
} }
if (!src->ready()) {
throw std::runtime_error("the media source is not ready");
}
AMFEncoder enc; AMFEncoder enc;
enc << "@setDataFrame" << "onMetaData" << src->getMetaData(); enc << "@setDataFrame" << "onMetaData" << src->getMetaData();
sendRequest(MSG_DATA, enc.data()); sendRequest(MSG_DATA, enc.data());
......
...@@ -167,7 +167,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -167,7 +167,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
shutdown(); shutdown();
return; return;
} }
m_bPublisherSrcRegisted = false;
m_pPublisherSrc.reset(new RtmpToRtspMediaSource(m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid)); m_pPublisherSrc.reset(new RtmpToRtspMediaSource(m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid));
m_pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); m_pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
}; };
...@@ -212,13 +211,6 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar ...@@ -212,13 +211,6 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar
m_mediaInfo.m_app, m_mediaInfo.m_app,
m_mediaInfo.m_streamid, m_mediaInfo.m_streamid,
true)); true));
if(src ){
if(!src->ready()){
//流未准备好那么相当于没有
src = nullptr;
}
}
//是否鉴权成功 //是否鉴权成功
bool authSuccess = err.empty(); bool authSuccess = err.empty();
if(authSuccess && !src && tryDelay ){ if(authSuccess && !src && tryDelay ){
...@@ -509,10 +501,6 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { ...@@ -509,10 +501,6 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
chunkData.timeStamp = m_stampTicker[chunkData.typeId % 2].elapsedTime(); chunkData.timeStamp = m_stampTicker[chunkData.typeId % 2].elapsedTime();
} }
m_pPublisherSrc->onGetMedia(std::make_shared<RtmpPacket>(chunkData)); m_pPublisherSrc->onGetMedia(std::make_shared<RtmpPacket>(chunkData));
if(!m_bPublisherSrcRegisted && m_pPublisherSrc->ready()){
m_bPublisherSrcRegisted = true;
m_pPublisherSrc->regist();
}
} }
break; break;
default: default:
......
...@@ -102,7 +102,6 @@ private: ...@@ -102,7 +102,6 @@ private:
SmoothTicker m_stampTicker[2];//时间戳生产器 SmoothTicker m_stampTicker[2];//时间戳生产器
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr m_pRingReader; RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr m_pRingReader;
std::shared_ptr<RtmpMediaSource> m_pPublisherSrc; std::shared_ptr<RtmpMediaSource> m_pPublisherSrc;
bool m_bPublisherSrcRegisted = false;
std::weak_ptr<RtmpMediaSource> m_pPlayerSrc; std::weak_ptr<RtmpMediaSource> m_pPlayerSrc;
uint32_t m_aui32FirstStamp[2] = {0}; uint32_t m_aui32FirstStamp[2] = {0};
//消耗的总流量 //消耗的总流量
......
...@@ -45,19 +45,6 @@ RtmpToRtspMediaSource::RtmpToRtspMediaSource(const string &vhost, ...@@ -45,19 +45,6 @@ RtmpToRtspMediaSource::RtmpToRtspMediaSource(const string &vhost,
} }
RtmpToRtspMediaSource::~RtmpToRtspMediaSource() {} RtmpToRtspMediaSource::~RtmpToRtspMediaSource() {}
bool RtmpToRtspMediaSource::regist() {
if (m_pRtspSrc) {
m_pRtspSrc->regist();
}
return MediaSource::regist();
}
bool RtmpToRtspMediaSource::unregist() {
if(m_pRtspSrc){
m_pRtspSrc->unregist();
}
return MediaSource::unregist();
}
void RtmpToRtspMediaSource::onGetH264(const H264Frame &frame) { void RtmpToRtspMediaSource::onGetH264(const H264Frame &frame) {
if(m_pRecorder){ if(m_pRecorder){
...@@ -166,7 +153,6 @@ void RtmpToRtspMediaSource::makeSDP() { ...@@ -166,7 +153,6 @@ void RtmpToRtspMediaSource::makeSDP() {
m_pRtspSrc.reset(new RtspMediaSource(getVhost(),getApp(),getId())); m_pRtspSrc.reset(new RtspMediaSource(getVhost(),getApp(),getId()));
m_pRtspSrc->setListener(m_listener); m_pRtspSrc->setListener(m_listener);
m_pRtspSrc->onGetSDP(strSDP); m_pRtspSrc->onGetSDP(strSDP);
m_pRtspSrc->regist();
} }
......
...@@ -63,9 +63,6 @@ public: ...@@ -63,9 +63,6 @@ public:
bool bEnableMp4 = false); bool bEnableMp4 = false);
virtual ~RtmpToRtspMediaSource(); virtual ~RtmpToRtspMediaSource();
bool regist() override;
bool unregist() override;
void onGetMetaData(const AMFValue &_metadata) override { void onGetMetaData(const AMFValue &_metadata) override {
try { try {
m_pParser.reset(new RtmpParser(_metadata)); m_pParser.reset(new RtmpParser(_metadata));
......
...@@ -87,6 +87,7 @@ public: ...@@ -87,6 +87,7 @@ public:
virtual void onGetSDP(const string& sdp) { virtual void onGetSDP(const string& sdp) {
//派生类设置该媒体源媒体描述信息 //派生类设置该媒体源媒体描述信息
m_strSdp = sdp; m_strSdp = sdp;
regist();
} }
virtual void onGetRTP(const RtpPacket::Ptr &rtppt, bool keyPos) { virtual void onGetRTP(const RtpPacket::Ptr &rtppt, bool keyPos) {
auto &trackRef = m_mapTracks[rtppt->type]; auto &trackRef = m_mapTracks[rtppt->type];
......
...@@ -50,20 +50,6 @@ RtspToRtmpMediaSource::~RtspToRtmpMediaSource() { ...@@ -50,20 +50,6 @@ RtspToRtmpMediaSource::~RtspToRtmpMediaSource() {
} }
bool RtspToRtmpMediaSource::regist() {
if (m_pRtmpSrc) {
m_pRtmpSrc->regist();
}
return MediaSource::regist();
}
bool RtspToRtmpMediaSource::unregist() {
if (m_pRtmpSrc) {
m_pRtmpSrc->unregist();
}
return MediaSource::unregist();
}
void RtspToRtmpMediaSource::makeVideoConfigPkt() { void RtspToRtmpMediaSource::makeVideoConfigPkt() {
int8_t flags = 7; //h.264 int8_t flags = 7; //h.264
flags |= (FLV_KEY_FRAME << 4); flags |= (FLV_KEY_FRAME << 4);
......
...@@ -68,8 +68,6 @@ public: ...@@ -68,8 +68,6 @@ public:
} }
RtspMediaSource::onGetRTP(pRtppkt, bKeyPos); RtspMediaSource::onGetRTP(pRtppkt, bKeyPos);
} }
virtual bool regist() override ;
virtual bool unregist() override;
int readerCount(){ int readerCount(){
return getRing()->readerCount() + (m_pRtmpSrc ? m_pRtmpSrc->getRing()->readerCount() : 0); return getRing()->readerCount() + (m_pRtmpSrc ? m_pRtmpSrc->getRing()->readerCount() : 0);
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "Http/HttpSession.h" #include "Http/HttpSession.h"
#include "Shell/ShellSession.h" #include "Shell/ShellSession.h"
#include "Util/MD5.h" #include "Util/MD5.h"
#include "Rtmp/FlvMuxer.h"
#ifdef ENABLE_OPENSSL #ifdef ENABLE_OPENSSL
#include "Util/SSLBox.h" #include "Util/SSLBox.h"
...@@ -133,6 +134,29 @@ static onceToken s_token([](){ ...@@ -133,6 +134,29 @@ static onceToken s_token([](){
}); });
}); });
//此处用于测试rtmp保存为flv录像,保存在http根目录下
NoticeCenter::Instance().addListener(nullptr,Config::Broadcast::kBroadcastMediaChanged,[](BroadcastMediaChangedArgs){
if(schema == RTMP_SCHEMA){
static map<string,FlvRecorder::Ptr> s_mapFlvRecorder;
static mutex s_mtxFlvRecorder;
lock_guard<mutex> lck(s_mtxFlvRecorder);
if(bRegist){
GET_CONFIG_AND_REGISTER(string,http_root,Config::Http::kRootPath);
auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv";
FlvRecorder::Ptr recorder(new FlvRecorder);
try{
recorder->startRecord(dynamic_pointer_cast<RtmpMediaSource>(sender.shared_from_this()),path);
s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder;
}catch(std::exception &ex){
WarnL << ex.what();
}
}else{
s_mapFlvRecorder.erase(vhost + "/" + app + "/" + stream);
}
}
});
}, nullptr); }, nullptr);
#if !defined(SIGHUP) #if !defined(SIGHUP)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论