Commit 62dfed39 by xiongguangjie

fix timelatency not take effect and avoid buffer is liner avoid cycle

parent 6f05cf80
......@@ -2,31 +2,62 @@
namespace SRT {
#define MAX_SEQ 0x7fffffff
#define MAX_TS 0xffffffff
inline uint32_t genExpectedSeq(uint32_t seq){
return 0x7fffffff&seq;
return MAX_SEQ & seq;
}
inline bool isSeqEdge(uint32_t seq,uint32_t cap){
if(seq >(MAX_SEQ - cap)){
return true;
}
return false;
}
inline bool isTSCycle(uint32_t first,uint32_t second){
uint32_t diff;
if(first>second){
diff = first - second;
}else{
diff = second - first;
}
if(diff > (MAX_TS>>1)){
return true;
}else{
return false;
}
}
PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency)
: _pkt_expected_seq(init_seq)
, _pkt_cap(max_size)
, _pkt_lantency(lantency) {
}
}
void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt){
bool PacketQueue::inputPacket(DataPacket::Ptr pkt) {
if (pkt->packet_seq_number < _pkt_expected_seq) {
// TOO later drop this packet
return false;
if (_pkt_expected_seq <= pkt->packet_seq_number) {
auto diff = pkt->packet_seq_number - _pkt_expected_seq;
if(diff >= (MAX_SEQ>>1)){
TraceL << "drop packet too later for cycle "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number;
return;
}else{
_pkt_map.emplace(pkt->packet_seq_number, pkt);
}
} else {
auto diff = _pkt_expected_seq - pkt->packet_seq_number;
if(diff >= (MAX_SEQ>>1)){
_pkt_map.emplace(pkt->packet_seq_number, pkt);
TraceL<<" cycle packet "<<"expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number;
}else{
TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number;
}
}
_pkt_map[pkt->packet_seq_number] = pkt;
return true;
}
std::list<DataPacket::Ptr> PacketQueue::tryGetPacket() {
std::list<DataPacket::Ptr> re;
bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& out) {
tryInsertPkt(pkt);
auto it = _pkt_map.find(_pkt_expected_seq);
while ( it != _pkt_map.end()) {
re.push_back(it->second);
out.push_back(it->second);
_pkt_map.erase(it);
_pkt_expected_seq = genExpectedSeq(_pkt_expected_seq+1);
it = _pkt_map.find(_pkt_expected_seq);
......@@ -36,66 +67,39 @@ std::list<DataPacket::Ptr> PacketQueue::tryGetPacket() {
// 防止回环
it = _pkt_map.find(_pkt_expected_seq);
if(it != _pkt_map.end()){
re.push_back(it->second);
out.push_back(it->second);
_pkt_map.erase(it);
}
_pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1);
}
while (timeLantency() > _pkt_lantency) {
while (timeLantency() > _pkt_lantency) {
it = _pkt_map.find(_pkt_expected_seq);
if(it != _pkt_map.end()){
re.push_back(it->second);
out.push_back(it->second);
_pkt_map.erase(it);
}
_pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1);
}
return re;
}
bool PacketQueue::dropForRecv(uint32_t first,uint32_t last){
if(first >= last){
return false;
}
if(_pkt_expected_seq <= last){
for(uint32_t i =first;i<=last;++i){
if(_pkt_map.find(i) != _pkt_map.end()){
_pkt_map.erase(i);
}
}
_pkt_expected_seq =genExpectedSeq(last+1);
return true;
}
return false;
return true;
}
bool PacketQueue::dropForSend(uint32_t num){
if(num <= _pkt_expected_seq){
return false;
}
bool PacketQueue::drop(uint32_t first, uint32_t last,std::list<DataPacket::Ptr>& out){
uint32_t end = genExpectedSeq(last+1);
decltype(_pkt_map.end()) it;
for(uint32_t i =_pkt_expected_seq;i< num;++i){
for(uint32_t i =_pkt_expected_seq;i< end;){
it = _pkt_map.find(i);
if(it != _pkt_map.end()){
out.push_back(it->second);
_pkt_map.erase(it);
}
i = genExpectedSeq(i+1);
}
_pkt_expected_seq =genExpectedSeq(num);
_pkt_expected_seq = end;
return true;
}
DataPacket::Ptr PacketQueue::findPacketBySeq(uint32_t seq){
auto it = _pkt_map.find(seq);
if(it != _pkt_map.end()){
return it->second;
}
return nullptr;
}
uint32_t PacketQueue::timeLantency() {
if (_pkt_map.empty()) {
return 0;
......@@ -111,8 +115,8 @@ uint32_t PacketQueue::timeLantency() {
}
if(dur > 0x80000000){
//WarnL<<"cycle dur "<<dur;
dur = 0xffffffff - dur;
dur = MAX_TS - dur;
WarnL<<"cycle dur "<<dur;
}
return dur;
......@@ -128,13 +132,24 @@ std::list<PacketQueue::LostPair> PacketQueue::getLostSeq() {
return re;
}
uint32_t end = 0;
uint32_t first,last;
first = _pkt_map.begin()->second->packet_seq_number;
last = _pkt_map.rbegin()->second->packet_seq_number;
if ((last - first) > (MAX_SEQ >> 1)) {
TraceL << " cycle seq first " << first << " last " << last << " size " << _pkt_map.size();
end = first;
} else {
end = last;
}
PacketQueue::LostPair lost;
lost.first = 0;
lost.second = 0;
uint32_t i = _pkt_expected_seq;
bool finish = true;
for(i = _pkt_expected_seq;i<=_pkt_map.rbegin()->first;){
for(i = _pkt_expected_seq;i<=end;){
if(_pkt_map.find(i) == _pkt_map.end()){
if(finish){
finish = false;
......@@ -144,7 +159,6 @@ std::list<PacketQueue::LostPair> PacketQueue::getLostSeq() {
lost.second = i+1;
}
}else{
if(!finish){
finish = true;
re.push_back(lost);
......@@ -164,21 +178,42 @@ size_t PacketQueue::getExpectedSize() {
if(_pkt_map.empty()){
return 0;
}
auto size = _pkt_map.rbegin()->first - _pkt_expected_seq+1;
if(size >= _pkt_cap){
// 回环
//WarnL<<"cycle size "<<size;
size = 0xffffffff - size;
uint32_t max = _pkt_map.rbegin()->first;
uint32_t min = _pkt_map.begin()->first;
if((max-min)>=(MAX_SEQ>>1)){
TraceL<<"cycle "<<"expected seq "<<_pkt_expected_seq<<" min "<<min<<" max "<<max<<" size "<<_pkt_map.size();
return MAX_SEQ-_pkt_expected_seq+min+1;
}else{
return max-_pkt_expected_seq+1;
}
return size;
}
size_t PacketQueue::getAvailableBufferSize(){
return _pkt_cap - getSize();
auto size = getExpectedSize();
if(_pkt_cap > size){
return _pkt_cap - size;
}
if(_pkt_cap > _pkt_map.size()){
return _pkt_cap - _pkt_map.size();
}
WarnL<<" cap "<<_pkt_cap<<" expected size "<<size<<" map size "<<_pkt_map.size();
return _pkt_cap;
}
uint32_t PacketQueue::getExpectedSeq(){
return _pkt_expected_seq;
}
std::string PacketQueue::dump(){
_StrPrinter printer;
if(_pkt_map.empty()){
printer<<" expected seq :"<<_pkt_expected_seq;
}else{
printer<<" expected seq :"<<_pkt_expected_seq<<" size:"<<_pkt_map.size()<<" first:"<<_pkt_map.begin()->second->packet_seq_number;
printer<<" last:"<<_pkt_map.rbegin()->second->packet_seq_number;
printer<<" latency:"<<timeLantency()/1e3;
}
return std::move(printer);
}
} // namespace SRT
\ No newline at end of file
#ifndef ZLMEDIAKIT_SRT_PACKET_QUEUE_H
#define ZLMEDIAKIT_SRT_PACKET_QUEUE_H
#include "Packet.hpp"
#include <algorithm>
#include <list>
#include <memory>
#include <map>
#include <list>
#include <utility>
#include <tuple>
#include <utility>
#include "Packet.hpp"
namespace SRT{
namespace SRT {
class PacketQueue
{
// for recv
class PacketQueue {
public:
using Ptr = std::shared_ptr<PacketQueue>;
using LostPair = std::pair<uint32_t,uint32_t>;
using LostPair = std::pair<uint32_t, uint32_t>;
PacketQueue(uint32_t max_size,uint32_t init_seq,uint32_t lantency);
PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency);
~PacketQueue() = default;
bool inputPacket(DataPacket::Ptr pkt);
std::list<DataPacket::Ptr> tryGetPacket();
bool inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& out);
uint32_t timeLantency();
std::list<LostPair> getLostSeq();
......@@ -28,21 +28,20 @@ public:
size_t getAvailableBufferSize();
uint32_t getExpectedSeq();
bool dropForRecv(uint32_t first,uint32_t last);
bool dropForSend(uint32_t num);
bool drop(uint32_t first, uint32_t last,std::list<DataPacket::Ptr>& out);
DataPacket::Ptr findPacketBySeq(uint32_t seq);
std::string dump();
private:
std::map<uint32_t,DataPacket::Ptr> _pkt_map;
void tryInsertPkt(DataPacket::Ptr pkt);
private:
std::map<uint32_t, DataPacket::Ptr> _pkt_map;
uint32_t _pkt_expected_seq = 0;
uint32_t _pkt_cap;
uint32_t _pkt_lantency;
};
}
} // namespace SRT
#endif //ZLMEDIAKIT_SRT_PACKET_QUEUE_H
\ No newline at end of file
#endif // ZLMEDIAKIT_SRT_PACKET_QUEUE_H
\ No newline at end of file
#include "PacketSendQueue.hpp"
namespace SRT {
PacketSendQueue::PacketSendQueue(uint32_t max_size, uint32_t lantency)
: _pkt_cap(max_size)
, _pkt_lantency(lantency) {}
bool PacketSendQueue::drop(uint32_t num) {
decltype(_pkt_cache.begin()) it;
for (it = _pkt_cache.begin(); it != _pkt_cache.end(); ++it) {
if ((*it)->packet_seq_number == num) {
break;
}
}
if (it != _pkt_cache.end()) {
_pkt_cache.erase(_pkt_cache.begin(), it);
}
return true;
}
bool PacketSendQueue::inputPacket(DataPacket::Ptr pkt) {
_pkt_cache.push_back(pkt);
while (_pkt_cache.size() > _pkt_cap) {
_pkt_cache.pop_front();
}
while (timeLantency() > _pkt_lantency) {
_pkt_cache.pop_front();
}
return true;
}
std::list<DataPacket::Ptr> PacketSendQueue::findPacketBySeq(uint32_t start, uint32_t end) {
std::list<DataPacket::Ptr> re;
decltype(_pkt_cache.begin()) it;
for (it = _pkt_cache.begin(); it != _pkt_cache.end(); ++it) {
if ((*it)->packet_seq_number == start) {
break;
}
}
if (start == end) {
if (it != _pkt_cache.end()) {
re.push_back(*it);
}
return re;
}
for (; it != _pkt_cache.end(); ++it) {
re.push_back(*it);
if ((*it)->packet_seq_number == end) {
break;
}
}
return re;
}
uint32_t PacketSendQueue::timeLantency() {
if (_pkt_cache.empty()) {
return 0;
}
auto first = _pkt_cache.front()->timestamp;
auto last = _pkt_cache.back()->timestamp;
uint32_t dur;
if (last > first) {
dur = last - first;
} else {
dur = first - last;
}
if (dur > (0x01 << 31)) {
TraceL << "cycle timeLantency " << dur;
dur = 0xffffffff - dur;
}
return dur;
}
} // namespace SRT
\ No newline at end of file
#ifndef ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H
#define ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H
#include "Packet.hpp"
#include <algorithm>
#include <list>
#include <memory>
#include <set>
#include <tuple>
#include <utility>
namespace SRT {
class PacketSendQueue {
public:
using Ptr = std::shared_ptr<PacketSendQueue>;
using LostPair = std::pair<uint32_t, uint32_t>;
PacketSendQueue(uint32_t max_size, uint32_t lantency);
~PacketSendQueue() = default;
bool drop(uint32_t num);
bool inputPacket(DataPacket::Ptr pkt);
std::list<DataPacket::Ptr> findPacketBySeq(uint32_t start,uint32_t end);
private:
uint32_t timeLantency();
private:
std::list<DataPacket::Ptr> _pkt_cache;
uint32_t _pkt_cap;
uint32_t _pkt_lantency;
};
} // namespace SRT
#endif // ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H
\ No newline at end of file
......@@ -198,8 +198,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
registerSelf();
sendControlPacket(res, true);
TraceL<<" buf size = "<<res->max_flow_window_size<<" init seq ="<<_init_seq_number<<" lantency="<<delay;
_recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, delay*1e6);
_send_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, delay*1e6);
_recv_buf = std::make_shared<PacketQueue>(res->max_flow_window_size,_init_seq_number, delay*1e3);
_send_buf = std::make_shared<PacketSendQueue>(res->max_flow_window_size, delay*1e3);
_send_packet_seq_number = _init_seq_number;
_buf_delay = delay;
onHandShakeFinished(_stream_id,addr);
......@@ -249,7 +249,7 @@ void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *add
pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp);
pkt->ack_number = ack.ack_number;
pkt->storeToData();
_send_buf->dropForSend(ack.last_ack_pkt_seq_number);
_send_buf->drop(ack.last_ack_pkt_seq_number);
sendControlPacket(pkt,true);
//TraceL<<"ack number "<<ack.ack_number;
}
......@@ -267,17 +267,19 @@ void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *add
NAKPacket pkt;
pkt.loadFromData(buf,len);
bool empty = false;
bool flush = false;
for(auto it : pkt.lost_list){
if(pkt.lost_list.back() == it){
flush = true;
}
empty = true;
for(uint32_t i=it.first;i<it.second;++i){
auto data = _send_buf->findPacketBySeq(i);
if(data){
data->R = 1;
data->storeToHeader();
sendPacket(data,true);
empty = false;
}
auto re_list = _send_buf->findPacketBySeq(it.first,it.second-1);
for(auto pkt : re_list){
pkt->R = 1;
pkt->storeToHeader();
sendPacket(pkt,flush);
empty = false;
}
if(empty){
sendMsgDropReq(it.first,it.second-1);
......@@ -294,8 +296,45 @@ void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage
void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr){
MsgDropReqPacket pkt;
pkt.loadFromData(buf,len);
std::list<DataPacket::Ptr> list;
//TraceL<<"drop "<<pkt.first_pkt_seq_num<<" last "<<pkt.last_pkt_seq_num;
_recv_buf->dropForRecv(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num);
_recv_buf->drop(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num,list);
if(list.empty()){
return;
}
for(auto data : list){
onSRTData(std::move(data));
}
auto nak_interval = (_rtt+_rtt_variance*4)/2;
if(nak_interval >= 20*1000){
nak_interval = 20*1000;
}
if(_nak_ticker.elapsedTime(_now)>nak_interval){
auto lost = _recv_buf->getLostSeq();
if(!lost.empty()){
sendNAKPacket(lost);
}
_nak_ticker.resetTime(_now);
}
if(_ack_ticker.elapsedTime(_now)>10*1000){
_light_ack_pkt_count = 0;
_ack_ticker.resetTime(_now);
// send a ack per 10 ms for receiver
sendACKPacket();
}else{
if(_light_ack_pkt_count >= 64){
// for high bitrate stream send light ack
// TODO
sendLightACKPacket();
TraceL<<"send light ack";
}
_light_ack_pkt_count = 0;
}
_light_ack_pkt_count++;
}
void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr){
TraceL;
......@@ -379,32 +418,23 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
pkt->loadFromData(buf,len);
pkt->get_ts = _now;
std::list<DataPacket::Ptr> list;
//TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
//" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
#if 1
_recv_buf->inputPacket(pkt);
#else
if(pkt->packet_seq_number%100 == 0){
// drop
TraceL<<"drop packet";
TraceL<<"expected size "<<_recv_buf->getExpectedSize()<<" real size="<<_recv_buf->getSize();
}else{
_recv_buf->inputPacket(pkt);
}
#endif
//TraceL<<" data number size "<<list.size();
auto list = _recv_buf->tryGetPacket();
_recv_buf->inputPacket(pkt,list);
for(auto data : list){
onSRTData(std::move(data));
}
if(list.empty()){
//TraceL<<_recv_buf->dump();
}
auto nak_interval = (_rtt+_rtt_variance*4)/2;
if(nak_interval >= 20*1000){
if(nak_interval <= 20*1000){
nak_interval = 20*1000;
}
if(_nak_ticker.elapsedTime(_now)>nak_interval || list.empty()){
if(_nak_ticker.elapsedTime(_now)>nak_interval){
auto lost = _recv_buf->getLostSeq();
if(!lost.empty()){
sendNAKPacket(lost);
......@@ -428,7 +458,6 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
_light_ack_pkt_count = 0;
}
_light_ack_pkt_count++;
//bufCheckInterval();
}
......
......@@ -13,6 +13,7 @@
#include "Common.hpp"
#include "Packet.hpp"
#include "PacketQueue.hpp"
#include "PacketSendQueue.hpp"
#include "Statistic.hpp"
namespace SRT {
......@@ -110,7 +111,7 @@ private:
uint32_t _send_packet_seq_number = 0;
uint32_t _send_msg_number = 1;
PacketQueue::Ptr _send_buf;
PacketSendQueue::Ptr _send_buf;
uint32_t _buf_delay = 120;
PacketQueue::Ptr _recv_buf;
uint32_t _rtt = 100*1000;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论