Commit fda4e83a by xiongguangjie

srt support tlpktdrop=0 feature

parent 56b8a860
...@@ -237,11 +237,16 @@ std::string PacketQueue::dump() { ...@@ -237,11 +237,16 @@ std::string PacketQueue::dump() {
//////////////////// PacketRecvQueue ////////////////////////////////// //////////////////// PacketRecvQueue //////////////////////////////////
PacketRecvQueue::PacketRecvQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency) PacketRecvQueue::PacketRecvQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency,uint32_t flag)
: _pkt_cap(max_size) : _pkt_cap(max_size)
, _pkt_latency(latency) , _pkt_latency(latency)
, _pkt_expected_seq(init_seq) , _pkt_expected_seq(init_seq)
, _pkt_buf(max_size) {} , _pkt_buf(max_size)
, _srt_flag(flag) {}
bool PacketRecvQueue::TLPKTDrop(){
return (_srt_flag&HSExtMessage::HS_EXT_MSG_TLPKTDROP) && (_srt_flag &HSExtMessage::HS_EXT_MSG_TSBPDRCV);
}
bool PacketRecvQueue::inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out) { bool PacketRecvQueue::inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out) {
// TraceL << dump() << " seq:" << pkt->packet_seq_number; // TraceL << dump() << " seq:" << pkt->packet_seq_number;
while (_size > 0 && _start == _end) { while (_size > 0 && _start == _end) {
...@@ -265,7 +270,7 @@ bool PacketRecvQueue::inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr ...@@ -265,7 +270,7 @@ bool PacketRecvQueue::inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr
_start = (_start + 1) % _pkt_cap; _start = (_start + 1) % _pkt_cap;
it = _pkt_buf[_start]; it = _pkt_buf[_start];
} }
while (timeLatency() > _pkt_latency) { while (timeLatency() > _pkt_latency && TLPKTDrop()) {
it = _pkt_buf[_start]; it = _pkt_buf[_start];
if (it) { if (it) {
_pkt_buf[_start] = nullptr; _pkt_buf[_start] = nullptr;
......
...@@ -65,7 +65,7 @@ class PacketRecvQueue : public PacketQueueInterface { ...@@ -65,7 +65,7 @@ class PacketRecvQueue : public PacketQueueInterface {
public: public:
using Ptr = std::shared_ptr<PacketRecvQueue>; using Ptr = std::shared_ptr<PacketRecvQueue>;
PacketRecvQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency); PacketRecvQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency,uint32_t flag = 0xbf);
~PacketRecvQueue() = default; ~PacketRecvQueue() = default;
bool inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out); bool inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out);
...@@ -85,12 +85,15 @@ private: ...@@ -85,12 +85,15 @@ private:
void insertToCycleBuf(DataPacket::Ptr pkt, uint32_t diff); void insertToCycleBuf(DataPacket::Ptr pkt, uint32_t diff);
DataPacket::Ptr getFirst(); DataPacket::Ptr getFirst();
DataPacket::Ptr getLast(); DataPacket::Ptr getLast();
bool TLPKTDrop();
private: private:
uint32_t _pkt_cap; uint32_t _pkt_cap;
uint32_t _pkt_latency; uint32_t _pkt_latency;
uint32_t _pkt_expected_seq; uint32_t _pkt_expected_seq;
uint32_t _srt_flag;
std::vector<DataPacket::Ptr> _pkt_buf; std::vector<DataPacket::Ptr> _pkt_buf;
uint32_t _start = 0; uint32_t _start = 0;
uint32_t _end = 0; uint32_t _end = 0;
......
...@@ -2,9 +2,10 @@ ...@@ -2,9 +2,10 @@
namespace SRT { namespace SRT {
PacketSendQueue::PacketSendQueue(uint32_t max_size, uint32_t latency) PacketSendQueue::PacketSendQueue(uint32_t max_size, uint32_t latency,uint32_t flag)
: _pkt_cap(max_size) : _pkt_cap(max_size)
, _pkt_latency(latency) {} , _pkt_latency(latency)
, _srt_flag(flag) {}
bool PacketSendQueue::drop(uint32_t num) { bool PacketSendQueue::drop(uint32_t num) {
decltype(_pkt_cache.begin()) it; decltype(_pkt_cache.begin()) it;
...@@ -24,12 +25,16 @@ bool PacketSendQueue::inputPacket(DataPacket::Ptr pkt) { ...@@ -24,12 +25,16 @@ bool PacketSendQueue::inputPacket(DataPacket::Ptr pkt) {
while (_pkt_cache.size() > _pkt_cap) { while (_pkt_cache.size() > _pkt_cap) {
_pkt_cache.pop_front(); _pkt_cache.pop_front();
} }
while (timeLatency() > _pkt_latency) { while (timeLatency() > _pkt_latency && TLPKTDrop()) {
_pkt_cache.pop_front(); _pkt_cache.pop_front();
} }
return true; return true;
} }
bool PacketSendQueue::TLPKTDrop(){
return (_srt_flag&HSExtMessage::HS_EXT_MSG_TLPKTDROP) && (_srt_flag &HSExtMessage::HS_EXT_MSG_TSBPDSND);
}
std::list<DataPacket::Ptr> PacketSendQueue::findPacketBySeq(uint32_t start, uint32_t end) { std::list<DataPacket::Ptr> PacketSendQueue::findPacketBySeq(uint32_t start, uint32_t end) {
std::list<DataPacket::Ptr> re; std::list<DataPacket::Ptr> re;
decltype(_pkt_cache.begin()) it; decltype(_pkt_cache.begin()) it;
......
...@@ -16,7 +16,7 @@ public: ...@@ -16,7 +16,7 @@ public:
using Ptr = std::shared_ptr<PacketSendQueue>; using Ptr = std::shared_ptr<PacketSendQueue>;
using LostPair = std::pair<uint32_t, uint32_t>; using LostPair = std::pair<uint32_t, uint32_t>;
PacketSendQueue(uint32_t max_size, uint32_t latency); PacketSendQueue(uint32_t max_size, uint32_t latency,uint32_t flag = 0xbf);
~PacketSendQueue() = default; ~PacketSendQueue() = default;
bool drop(uint32_t num); bool drop(uint32_t num);
...@@ -25,7 +25,9 @@ public: ...@@ -25,7 +25,9 @@ public:
private: private:
uint32_t timeLatency(); uint32_t timeLatency();
bool TLPKTDrop();
private: private:
uint32_t _srt_flag;
uint32_t _pkt_cap; uint32_t _pkt_cap;
uint32_t _pkt_latency; uint32_t _pkt_latency;
std::list<DataPacket::Ptr> _pkt_cache; std::list<DataPacket::Ptr> _pkt_cache;
......
...@@ -182,9 +182,9 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad ...@@ -182,9 +182,9 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
} }
if (req) { if (req) {
if (req->srt_flag != srt_flag) { if (req->srt_flag != srt_flag) {
WarnL << " not support flag " << req->srt_flag; WarnL << " flag " << req->srt_flag;
} }
// srt_flag = req->srt_flag; srt_flag = req->srt_flag;
delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay; delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay;
} }
TraceL << getIdentifier() << " CONCLUSION Phase "; TraceL << getIdentifier() << " CONCLUSION Phase ";
...@@ -214,8 +214,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad ...@@ -214,8 +214,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
sendControlPacket(res, true); sendControlPacket(res, true);
TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number
<< " latency=" << delay; << " latency=" << delay;
_recv_buf = std::make_shared<PacketRecvQueue>(getPktBufSize(), _init_seq_number, delay * 1e3); _recv_buf = std::make_shared<PacketRecvQueue>(getPktBufSize(), _init_seq_number, delay * 1e3,srt_flag);
_send_buf = std::make_shared<PacketSendQueue>(getPktBufSize(), delay * 1e3); _send_buf = std::make_shared<PacketSendQueue>(getPktBufSize(), delay * 1e3,srt_flag);
_send_packet_seq_number = _init_seq_number; _send_packet_seq_number = _init_seq_number;
_buf_delay = delay; _buf_delay = delay;
onHandShakeFinished(_stream_id, addr); onHandShakeFinished(_stream_id, addr);
...@@ -413,6 +413,7 @@ void SrtTransport::sendACKPacket() { ...@@ -413,6 +413,7 @@ void SrtTransport::sendACKPacket() {
_last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number;
sendControlPacket(pkt, true); sendControlPacket(pkt, true);
// TraceL<<"send ack "<<pkt->dump(); // TraceL<<"send ack "<<pkt->dump();
// TraceL<<_recv_buf->dump();
} }
void SrtTransport::sendLightACKPacket() { void SrtTransport::sendLightACKPacket() {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论