Commit c353c626 by xiongziliang

1、根据PS获取时间戳

2、修复rtp代理可能花屏的问题
parent bfa9a3b6
...@@ -33,6 +33,35 @@ ...@@ -33,6 +33,35 @@
namespace mediakit{ namespace mediakit{
/**
* 合并一些时间戳相同的frame
*/
class FrameMerger {
public:
FrameMerger() = default;
virtual ~FrameMerger() = default;
void inputFrame(const Frame::Ptr &frame,const function<void(uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer)> &cb){
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
if(_frameCached.size() != 1){
string merged;
_frameCached.for_each([&](const Frame::Ptr &frame){
merged.append(frame->data(),frame->size());
});
merged_frame = std::make_shared<BufferString>(std::move(merged));
}
cb(back->dts(),back->pts(),merged_frame);
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
private:
List<Frame::Ptr> _frameCached;
};
string printSSRC(uint32_t ui32Ssrc) { string printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 }; char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc); ui32Ssrc = htonl(ui32Ssrc);
...@@ -90,6 +119,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) { ...@@ -90,6 +119,7 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
}); });
} }
} }
_merger = std::make_shared<FrameMerger>();
} }
RtpProcess::~RtpProcess() { RtpProcess::~RtpProcess() {
...@@ -101,7 +131,7 @@ RtpProcess::~RtpProcess() { ...@@ -101,7 +131,7 @@ RtpProcess::~RtpProcess() {
} }
} }
bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr) { bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
GET_CONFIG(bool,check_source,RtpProxy::kCheckSource); GET_CONFIG(bool,check_source,RtpProxy::kCheckSource);
//检查源是否合法 //检查源是否合法
if(!_addr){ if(!_addr){
...@@ -116,7 +146,11 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr * ...@@ -116,7 +146,11 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *
} }
_last_rtp_time.resetTime(); _last_rtp_time.resetTime();
return handleOneRtp(0,_track,(unsigned char *)data,data_len); bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len);
if(dts_out){
*dts_out = _dts;
}
return ret;
} }
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
...@@ -156,6 +190,7 @@ void RtpProcess::onPSDecode(int stream, ...@@ -156,6 +190,7 @@ void RtpProcess::onPSDecode(int stream,
int bytes) { int bytes) {
pts /= 90; pts /= 90;
dts /= 90; dts /= 90;
_dts = dts;
_stamps[codecid].revise(dts,pts,dts,pts,false); _stamps[codecid].revise(dts,pts,dts,pts,false);
switch (codecid) { switch (codecid) {
...@@ -176,8 +211,10 @@ void RtpProcess::onPSDecode(int stream, ...@@ -176,8 +211,10 @@ void RtpProcess::onPSDecode(int stream,
if(_save_file_video){ if(_save_file_video){
fwrite((uint8_t *)data,bytes, 1, _save_file_video.get()); fwrite((uint8_t *)data,bytes, 1, _save_file_video.get());
} }
auto frame = std::make_shared<H264FrameNoCacheAble>((char *) data, bytes, dts, pts,4); auto frame = std::make_shared<H264FrameNoCacheAble>((char *) data, bytes, dts, pts,0);
_muxer->inputFrame(frame); _merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) {
_muxer->inputFrame(std::make_shared<H264FrameNoCacheAble>(buffer->data(), buffer->size(), dts, pts,4));
});
break; break;
} }
...@@ -196,8 +233,10 @@ void RtpProcess::onPSDecode(int stream, ...@@ -196,8 +233,10 @@ void RtpProcess::onPSDecode(int stream,
if(_save_file_video){ if(_save_file_video){
fwrite((uint8_t *)data,bytes, 1, _save_file_video.get()); fwrite((uint8_t *)data,bytes, 1, _save_file_video.get());
} }
auto frame = std::make_shared<H265FrameNoCacheAble>((char *) data, bytes, dts, pts, 4); auto frame = std::make_shared<H265FrameNoCacheAble>((char *) data, bytes, dts, pts, 0);
_muxer->inputFrame(frame); _merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) {
_muxer->inputFrame(std::make_shared<H265FrameNoCacheAble>(buffer->data(), buffer->size(), dts, pts, 4));
});
break; break;
} }
......
...@@ -39,13 +39,13 @@ using namespace mediakit; ...@@ -39,13 +39,13 @@ using namespace mediakit;
namespace mediakit{ namespace mediakit{
string printSSRC(uint32_t ui32Ssrc); string printSSRC(uint32_t ui32Ssrc);
class FrameMerger;
class RtpProcess : public RtpReceiver , public RtpDecoder , public PSDecoder { class RtpProcess : public RtpReceiver , public RtpDecoder , public PSDecoder {
public: public:
typedef std::shared_ptr<RtpProcess> Ptr; typedef std::shared_ptr<RtpProcess> Ptr;
RtpProcess(uint32_t ssrc); RtpProcess(uint32_t ssrc);
~RtpProcess(); ~RtpProcess();
bool inputRtp(const char *data,int data_len, const struct sockaddr *addr); bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
bool alive(); bool alive();
string get_peer_ip(); string get_peer_ip();
uint16_t get_peer_port(); uint16_t get_peer_port();
...@@ -70,8 +70,10 @@ private: ...@@ -70,8 +70,10 @@ private:
int _codecid_video = 0; int _codecid_video = 0;
int _codecid_audio = 0; int _codecid_audio = 0;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
std::shared_ptr<FrameMerger> _merger;
Ticker _last_rtp_time; Ticker _last_rtp_time;
map<int,Stamp> _stamps; map<int,Stamp> _stamps;
uint32_t _dts = 0;
}; };
}//namespace mediakit }//namespace mediakit
......
...@@ -31,7 +31,7 @@ namespace mediakit{ ...@@ -31,7 +31,7 @@ namespace mediakit{
INSTANCE_IMP(RtpSelector); INSTANCE_IMP(RtpSelector);
bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr) { bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
if(_last_rtp_time.elapsedTime() > 3000){ if(_last_rtp_time.elapsedTime() > 3000){
_last_rtp_time.resetTime(); _last_rtp_time.resetTime();
onManager(); onManager();
...@@ -43,7 +43,7 @@ bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr ...@@ -43,7 +43,7 @@ bool RtpSelector::inputRtp(const char *data, int data_len,const struct sockaddr
} }
auto process = getProcess(ssrc, true); auto process = getProcess(ssrc, true);
if(process){ if(process){
return process->inputRtp(data,data_len, addr); return process->inputRtp(data,data_len, addr,dts_out);
} }
return false; return false;
} }
......
...@@ -41,7 +41,7 @@ public: ...@@ -41,7 +41,7 @@ public:
~RtpSelector(); ~RtpSelector();
static RtpSelector &Instance(); static RtpSelector &Instance();
bool inputRtp(const char *data,int data_len,const struct sockaddr *addr); bool inputRtp(const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr );
static uint32_t getSSRC(const char *data,int data_len); static uint32_t getSSRC(const char *data,int data_len);
RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew);
void delProcess(uint32_t ssrc,const RtpProcess *ptr); void delProcess(uint32_t ssrc,const RtpProcess *ptr);
......
...@@ -71,19 +71,14 @@ static bool loadFile(const char *path){ ...@@ -71,19 +71,14 @@ static bool loadFile(const char *path){
} }
uint32_t timeStamp; uint32_t timeStamp;
memcpy(&timeStamp, rtp + 4, 4); RtpSelector::Instance().inputRtp(rtp,len, &addr,&timeStamp);
timeStamp = ntohl(timeStamp);
timeStamp /= 90;
if(timeStamp_last){ if(timeStamp_last){
auto diff = timeStamp - timeStamp_last; auto diff = timeStamp - timeStamp_last;
if(diff > 0){ if(diff > 0){
usleep(diff * 1000); usleep(diff * 1000);
} }
} }
timeStamp_last = timeStamp; timeStamp_last = timeStamp;
RtpSelector::Instance().inputRtp(rtp,len, &addr);
} }
fclose(fp); fclose(fp);
return true; return true;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论