From b55db11de314b2321213275b2d01cf1900b7f5f0 Mon Sep 17 00:00:00 2001
From: xiongziliang <771730766@qq.com>
Date: Mon, 13 Jan 2020 15:48:55 +0800
Subject: [PATCH] 解决rtmp过早注册的问题

---
 src/Common/MediaSink.cpp   | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------
 src/Common/MediaSink.h     |  3 ++-
 src/Common/Stamp.cpp       |  5 +++--
 src/Rtmp/RtmpMediaSource.h | 11 ++++++-----
 src/Rtmp/RtmpPlayerImp.h   |  6 ++++++
 src/Rtmp/RtmpSession.cpp   |  6 ++++++
 src/Rtmp/RtmpSession.h     |  1 +
 7 files changed, 74 insertions(+), 38 deletions(-)

diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp
index 64049dc..eb7cf53 100644
--- a/src/Common/MediaSink.cpp
+++ b/src/Common/MediaSink.cpp
@@ -26,7 +26,11 @@
 #include "MediaSink.h"
 
 //最多等待未初始化的Track 10秒,超时之后会忽略未初始化的Track
-#define MAX_WAIT_MS 10000
+#define MAX_WAIT_MS_READY 10000
+
+//如果添加Track,最多等待3秒
+#define MAX_WAIT_MS_ADD_TRACK 3000
+
 
 namespace mediakit{
 
@@ -34,23 +38,16 @@ void MediaSink::addTrack(const Track::Ptr &track_in) {
     lock_guard<recursive_mutex> lck(_mtx);
     //克隆Track,只拷贝其数据,不拷贝其数据转发关系
     auto track = track_in->clone();
-
     auto codec_id = track->getCodecId();
     _track_map[codec_id] = track;
-    auto lam = [this,track](){
+    _allTrackReady = false;
+    _trackReadyCallback[codec_id] = [this, track]() {
         onTrackReady(track);
     };
-    if(track->ready()){
-        lam();
-    }else{
-        _anyTrackUnReady = true;
-        _allTrackReady = false;
-        _trackReadyCallback[codec_id] = lam;
-        _ticker.resetTime();
-    }
+    _ticker.resetTime();
 
-    track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame){
-        if(!_anyTrackUnReady){
+    track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
+        if (_allTrackReady) {
             onTrackFrame(frame);
         }
     }));
@@ -58,7 +55,6 @@ void MediaSink::addTrack(const Track::Ptr &track_in) {
 
 void MediaSink::resetTracks() {
     lock_guard<recursive_mutex> lck(_mtx);
-    _anyTrackUnReady = false;
     _allTrackReady = false;
     _track_map.clear();
     _trackReadyCallback.clear();
@@ -83,26 +79,50 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) {
         }
     }
 
-    if(!_allTrackReady && (_trackReadyCallback.empty() || _ticker.elapsedTime() > MAX_WAIT_MS)){
-        _allTrackReady = true;
-        _anyTrackUnReady = false;
+    if(!_allTrackReady){
+        if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){
+            //如果超过规定时间,那么不再等待并忽略未准备好的Track
+            emitAllTrackReady();
+            return;
+        }
+
         if(!_trackReadyCallback.empty()){
-            //这是超时强制忽略未准备好的Track
-            _trackReadyCallback.clear();
-            //移除未准备好的Track
-            for(auto it = _track_map.begin() ; it != _track_map.end() ; ){
-                if(!it->second->ready()){
-                    it = _track_map.erase(it);
-                    continue;
-                }
-                ++it;
-            }
+            //在超时时间内,如果存在未准备好的Track,那么继续等待
+            return;
         }
 
-        if(!_track_map.empty()){
-            //最少有一个有效的Track
-            onAllTrackReady();
+        if(_track_map.size() == 2){
+            //如果已经添加了音视频Track,并且不存在未准备好的Track,那么说明所有Track都准备好了
+            emitAllTrackReady();
+            return;
         }
+
+        if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){
+            //如果只有一个Track,那么在该Track添加后,我们最多还等待若干时间(可能后面还会添加Track)
+            emitAllTrackReady();
+            return;
+        }
+    }
+}
+
+void MediaSink::emitAllTrackReady() {
+    _allTrackReady = true;
+    if(!_trackReadyCallback.empty()){
+        //这是超时强制忽略未准备好的Track
+        _trackReadyCallback.clear();
+        //移除未准备好的Track
+        for(auto it = _track_map.begin() ; it != _track_map.end() ; ){
+            if(!it->second->ready()){
+                it = _track_map.erase(it);
+                continue;
+            }
+            ++it;
+        }
+    }
+
+    if(!_track_map.empty()){
+        //最少有一个有效的Track
+        onAllTrackReady();
     }
 }
 
diff --git a/src/Common/MediaSink.h b/src/Common/MediaSink.h
index 6431738..e37980e 100644
--- a/src/Common/MediaSink.h
+++ b/src/Common/MediaSink.h
@@ -110,11 +110,12 @@ protected:
      */
     virtual void onTrackFrame(const Frame::Ptr &frame) {};
 private:
+    void emitAllTrackReady();
+private:
     mutable recursive_mutex _mtx;
     map<int,Track::Ptr> _track_map;
     map<int,function<void()> > _trackReadyCallback;
     bool _allTrackReady = false;
-    bool _anyTrackUnReady = false;
     Ticker _ticker;
 };
 
diff --git a/src/Common/Stamp.cpp b/src/Common/Stamp.cpp
index 7330de2..e01887e 100644
--- a/src/Common/Stamp.cpp
+++ b/src/Common/Stamp.cpp
@@ -26,7 +26,8 @@
 
 #include "Stamp.h"
 
-#define MAX_DELTA_STAMP 300
+#define MAX_DELTA_STAMP 1000
+#define MAX_CTS 500
 #define ABS(x) ((x) > 0 ? (x) : (-x))
 
 namespace mediakit {
@@ -77,7 +78,7 @@ void Stamp::revise(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_out,
     dts_out = _relativeStamp;
 
     //////////////以下是播放时间戳的计算//////////////////
-    if(pts_dts_diff > MAX_DELTA_STAMP || pts_dts_diff < -MAX_DELTA_STAMP){
+    if(ABS(pts_dts_diff) > MAX_CTS){
         //如果差值太大,则认为由于回环导致时间戳错乱了
         pts_dts_diff = 0;
     }
diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h
index 982d440..928c906 100644
--- a/src/Rtmp/RtmpMediaSource.h
+++ b/src/Rtmp/RtmpMediaSource.h
@@ -72,7 +72,6 @@ public:
 					const string &stream_id,
 					int ring_size = 0) :
 			MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {
-		_metadata = TitleMeta().getMetadata();
 	}
 
 	virtual ~RtmpMediaSource() {}
@@ -117,6 +116,9 @@ public:
 	virtual void setMetaData(const AMFValue &metadata) {
 		lock_guard<recursive_mutex> lock(_mtx);
 		_metadata = metadata;
+		if(_ring){
+			regist();
+		}
 	}
 
 	/**
@@ -143,10 +145,9 @@ public:
 			_ring = std::make_shared<RingType>(_ring_size, std::move(lam));
 			onReaderChanged(0);
 
-			//如果输入了非config帧,
-			//那么说明不再可能获取config帧以及metadata,
-			//所以我们强制其为已注册
-			regist();
+			if(_metadata){
+				regist();
+			}
 		}
 		_track_stamps_map[pkt->typeId] = pkt->timeStamp;
 		_ring->write(pkt, pkt->isVideoKeyFrame());
diff --git a/src/Rtmp/RtmpPlayerImp.h b/src/Rtmp/RtmpPlayerImp.h
index b06be30..e90acc4 100644
--- a/src/Rtmp/RtmpPlayerImp.h
+++ b/src/Rtmp/RtmpPlayerImp.h
@@ -66,6 +66,7 @@ private:
         _pRtmpMediaSrc = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc);
         if(_pRtmpMediaSrc){
             _pRtmpMediaSrc->setMetaData(val);
+            _set_meta_data = true;
         }
         _delegate.reset(new RtmpDemuxer);
         _delegate->loadMetaData(val);
@@ -73,6 +74,10 @@ private:
     }
     void onMediaData(const RtmpPacket::Ptr &chunkData) override {
     	if(_pRtmpMediaSrc){
+            if(!_set_meta_data && !chunkData->isCfgFrame()){
+                _set_meta_data = true;
+                _pRtmpMediaSrc->setMetaData(TitleMeta().getMetadata());
+            }
             _pRtmpMediaSrc->onWrite(chunkData);
         }
         if(!_delegate){
@@ -83,6 +88,7 @@ private:
     }
 private:
     RtmpMediaSource::Ptr _pRtmpMediaSrc;
+    bool _set_meta_data = false;
 };
 
 
diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp
index 6f067fe..061e512 100644
--- a/src/Rtmp/RtmpSession.cpp
+++ b/src/Rtmp/RtmpSession.cpp
@@ -434,6 +434,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) {
     auto metadata = dec.load<AMFValue>();
 //    dumpMetadata(metadata);
     _pPublisherSrc->setMetaData(metadata);
+    _set_meta_data = true;
 }
 
 void RtmpSession::onProcessCmd(AMFDecoder &dec) {
@@ -491,6 +492,11 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
             _stamp[chunkData.typeId % 2].revise(chunkData.timeStamp, chunkData.timeStamp, dts_out, dts_out, true);
             chunkData.timeStamp = dts_out;
         }
+
+        if(!_set_meta_data && !chunkData.isCfgFrame()){
+            _set_meta_data = true;
+            _pPublisherSrc->setMetaData(TitleMeta().getMetadata());
+        }
         _pPublisherSrc->onWrite(std::make_shared<RtmpPacket>(std::move(chunkData)));
 	}
 		break;
diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h
index 572d40f..f5b6df7 100644
--- a/src/Rtmp/RtmpSession.h
+++ b/src/Rtmp/RtmpSession.h
@@ -95,6 +95,7 @@ private:
 	std::string _strTcUrl;
 	MediaInfo _mediaInfo;
 	double _dNowReqID = 0;
+	bool _set_meta_data = false;
 	Ticker _ticker;//数据接收时间
 	RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr _pRingReader;
 	std::shared_ptr<RtmpMediaSourceImp> _pPublisherSrc;
--
libgit2 0.26.0