SrtTransport.cpp 29.9 KB
Newer Older
1
#include "Util/onceToken.h"
Dw9 committed
2 3
#include "Util/mini.h"

4
#include <iterator>
5
#include <stdlib.h>
6

ziyue committed
7 8 9
#include "Ack.hpp"
#include "Packet.hpp"
#include "SrtTransport.hpp"
10 11 12

namespace SRT {
#define SRT_FIELD "srt."
ziyue committed
13 14 15 16 17
// srt 超时时间
const std::string kTimeOutSec = SRT_FIELD "timeoutSec";
// srt 单端口udp服务器
const std::string kPort = SRT_FIELD "port";
const std::string kLatencyMul = SRT_FIELD "latencyMul";
18
const std::string kPktBufSize = SRT_FIELD "pktBufSize";
19

Dw9 committed
20 21 22 23 24 25 26
static onceToken token([]() {
    mINI::Instance()[kTimeOutSec] = 5;
    mINI::Instance()[kPort] = 9000;
    mINI::Instance()[kLatencyMul] = 4;
    mINI::Instance()[kPktBufSize] = 8192;
});

ziyue committed
27
static std::atomic<uint32_t> s_srt_socket_id_generate { 125 };
28 29 30
////////////  SrtTransport //////////////////////////
SrtTransport::SrtTransport(const EventPoller::Ptr &poller)
    : _poller(poller) {
ziyue committed
31 32 33
    _start_timestamp = SteadyClock::now();
    _socket_id = s_srt_socket_id_generate.fetch_add(1);
    _pkt_recv_rate_context = std::make_shared<PacketRecvRateContext>(_start_timestamp);
34
    //_recv_rate_context = std::make_shared<RecvRateContext>(_start_timestamp);
ziyue committed
35 36
    _estimated_link_capacity_context = std::make_shared<EstimatedLinkCapacityContext>(_start_timestamp);
}
37

ziyue committed
38 39
SrtTransport::~SrtTransport() {
    TraceL << " ";
40
}
ziyue committed
41

42 43 44 45 46 47 48 49 50 51 52 53 54
const EventPoller::Ptr &SrtTransport::getPoller() const {
    return _poller;
}

void SrtTransport::setSession(Session::Ptr session) {
    _history_sessions.emplace(session.get(), session);
    if (_selected_session) {
        InfoL << "srt network changed: " << _selected_session->get_peer_ip() << ":"
              << _selected_session->get_peer_port() << " -> " << session->get_peer_ip() << ":"
              << session->get_peer_port() << ", id:" << _selected_session->getIdentifier();
    }
    _selected_session = session;
}
ziyue committed
55

56 57 58 59
const Session::Ptr &SrtTransport::getSession() const {
    return _selected_session;
}

ziyue committed
60
void SrtTransport::switchToOtherTransport(uint8_t *buf, int len, uint32_t socketid, struct sockaddr_storage *addr) {
61 62
    BufferRaw::Ptr tmp = BufferRaw::create();
    struct sockaddr_storage tmp_addr = *addr;
ziyue committed
63
    tmp->assign((char *)buf, len);
64
    auto trans = SrtTransportManager::Instance().getItem(std::to_string(socketid));
ziyue committed
65 66 67
    if (trans) {
        trans->getPoller()->async([tmp, tmp_addr, trans] {
            trans->inputSockData((uint8_t *)tmp->data(), tmp->size(), (struct sockaddr_storage *)&tmp_addr);
68 69 70 71
        });
    }
}

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
void SrtTransport::createTimerForCheckAlive(){
    std::weak_ptr<SrtTransport> weak_self = std::static_pointer_cast<SrtTransport>(shared_from_this());
    auto timeoutSec = getTimeOutSec();
    _timer = std::make_shared<Timer>(
         timeoutSec/ 2,
        [weak_self,timeoutSec]() {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                return false;
            }
            if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) {
                strong_self->onShutdown(SockException(Err_timeout, "接收srt数据超时"));
            }
            return true;
        },
        getPoller());
}

90
void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) {
91 92 93 94
    _alive_ticker.resetTime();
    if(!_timer){
        createTimerForCheckAlive();
    }
ziyue committed
95
    using srt_control_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr);
96 97 98 99 100 101 102 103 104 105 106 107 108
    static std::unordered_map<uint16_t, srt_control_handler> s_control_functions;
    static onceToken token([]() {
        s_control_functions.emplace(ControlPacket::HANDSHAKE, &SrtTransport::handleHandshake);
        s_control_functions.emplace(ControlPacket::KEEPALIVE, &SrtTransport::handleKeeplive);
        s_control_functions.emplace(ControlPacket::ACK, &SrtTransport::handleACK);
        s_control_functions.emplace(ControlPacket::NAK, &SrtTransport::handleNAK);
        s_control_functions.emplace(ControlPacket::CONGESTIONWARNING, &SrtTransport::handleCongestionWarning);
        s_control_functions.emplace(ControlPacket::SHUTDOWN, &SrtTransport::handleShutDown);
        s_control_functions.emplace(ControlPacket::ACKACK, &SrtTransport::handleACKACK);
        s_control_functions.emplace(ControlPacket::DROPREQ, &SrtTransport::handleDropReq);
        s_control_functions.emplace(ControlPacket::PEERERROR, &SrtTransport::handlePeerError);
        s_control_functions.emplace(ControlPacket::USERDEFINEDTYPE, &SrtTransport::handleUserDefinedType);
    });
109
    _now = SteadyClock::now();
110 111
    // 处理srt数据
    if (DataPacket::isDataPacket(buf, len)) {
ziyue committed
112 113
        uint32_t socketId = DataPacket::getSocketID(buf, len);
        if (socketId == _socket_id) {
114 115 116
            if(_handleshake_timer){
                _handleshake_timer.reset();
            }
xiongguangjie committed
117
            _pkt_recv_rate_context->inputPacket(_now,len+UDP_HDR_SIZE);
118
            //_recv_rate_context->inputPacket(_now, len);
119 120

            handleDataPacket(buf, len, addr);
121
            checkAndSendAckNak();
ziyue committed
122
        } else {
123
            WarnL<<"DataPacket switch to other transport: "<<socketId;
ziyue committed
124
            switchToOtherTransport(buf, len, socketId, addr);
125 126 127
        }
    } else {
        if (ControlPacket::isControlPacket(buf, len)) {
ziyue committed
128 129 130
            uint32_t socketId = ControlPacket::getSocketID(buf, len);
            uint16_t type = ControlPacket::getControlType(buf, len);
            if (type != ControlPacket::HANDSHAKE && socketId != _socket_id && _socket_id != 0) {
131
                // socket id not same
132
                WarnL<<"ControlPacket: "<< (int)type <<" switch to other transport: "<<socketId;
ziyue committed
133
                switchToOtherTransport(buf, len, socketId, addr);
134 135
                return;
            }
136
            
137 138
            //_pkt_recv_rate_context->inputPacket(_now,len);
            //_estimated_link_capacity_context->inputPacket(_now);
139
            //_recv_rate_context->inputPacket(_now, len);
140 141 142

            auto it = s_control_functions.find(type);
            if (it == s_control_functions.end()) {
ziyue committed
143
                WarnL << " not support type ignore" << ControlPacket::getControlType(buf, len);
144
                return;
ziyue committed
145 146
            } else {
                (this->*(it->second))(buf, len, addr);
147
            }
148 149 150
            if(_is_handleshake_finished && isPusher()){
                checkAndSendAckNak();
            }
151 152 153 154 155 156
        } else {
            // not reach
            WarnL << "not reach this";
        }
    }
}
157

158 159 160
void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockaddr_storage *addr) {
    // Induction Phase
    if (_handleshake_res) {
161 162 163
        if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_INDUCTION){
            if(pkt.srt_socket_id == _handleshake_res->dst_socket_id){
                TraceL << getIdentifier() <<" Induction repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
164
                sendControlPacket(_handleshake_res, true);
165 166 167 168 169 170 171 172 173
            }else{
                TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
                onShutdown(SockException(Err_other, "client new connection"));
            }
            return;
        }else if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION){
            if(_handleshake_res->dst_socket_id != pkt.srt_socket_id){
                TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
                onShutdown(SockException(Err_other, "client new connection"));
174
            }
175
            return;
176
        }else{
177
            WarnL<<"not reach this";
178
        }
179
        return;
180 181
    }else{
         TraceL << getIdentifier() <<" Induction from "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
182
    }
ziyue committed
183
    _induction_ts = _now;
184
    _start_timestamp = _now;
185 186 187 188
    _init_seq_number = pkt.initial_packet_sequence_number;
    _max_window_size = pkt.max_flow_window_size;
    _mtu = pkt.mtu;

189
    _last_pkt_seq = _init_seq_number - 1;
190
    _estimated_link_capacity_context->setLastSeq(_last_pkt_seq);
191

192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
    _peer_socket_id = pkt.srt_socket_id;
    HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
    res->dst_socket_id = _peer_socket_id;
    res->timestamp = DurationCountMicroseconds(_start_timestamp.time_since_epoch());
    res->mtu = _mtu;
    res->max_flow_window_size = _max_window_size;
    res->initial_packet_sequence_number = _init_seq_number;
    res->version = 5;
    res->encryption_field = HandshakePacket::NO_ENCRYPTION;
    res->extension_field = 0x4A17;
    res->handshake_type = HandshakePacket::HS_TYPE_INDUCTION;
    res->srt_socket_id = _peer_socket_id;
    res->syn_cookie = HandshakePacket::generateSynCookie(addr, _start_timestamp);
    _sync_cookie = res->syn_cookie;
    memcpy(res->peer_ip_addr, pkt.peer_ip_addr, sizeof(pkt.peer_ip_addr) * sizeof(pkt.peer_ip_addr[0]));
    _handleshake_res = res;
    res->storeToData();

    registerSelfHandshake();
211
    sendControlPacket(res, true);
212
    _handleshake_timer = std::make_shared<Timer>(0.2,[this]()->bool{
213 214 215
        sendControlPacket(_handleshake_res, true);
        return true;
    },getPoller());
216
}
ziyue committed
217

218
void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr) {
ziyue committed
219 220
    if (!_handleshake_res) {
        ErrorL << "must Induction Phase for handleshake ";
221 222 223 224 225 226 227
        return;
    }

    if (_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_INDUCTION) {
        // first
        HSExtMessage::Ptr req;
        HSExtStreamID::Ptr sid;
xiongguangjie committed
228
        uint32_t srt_flag = 0xbf;
ziyue committed
229
        uint16_t delay = DurationCountMicroseconds(_now - _induction_ts) * getLatencyMul() / 1000;
xiongguangjie committed
230 231 232
        if (delay <= 120) {
            delay = 120;
        }
233
        for (auto& ext : pkt.ext_list) {
ziyue committed
234
            // TraceL << getIdentifier() << " ext " << ext->dump();
235 236 237
            if (!req) {
                req = std::dynamic_pointer_cast<HSExtMessage>(ext);
            }
ziyue committed
238
            if (!sid) {
239 240 241
                sid = std::dynamic_pointer_cast<HSExtStreamID>(ext);
            }
        }
ziyue committed
242
        if (sid) {
243 244
            _stream_id = sid->streamid;
        }
ziyue committed
245
        if (req) {
246
            if (req->srt_flag != srt_flag) {
247
                WarnL << "  flag " << req->srt_flag;
248
            }
249
            srt_flag = req->srt_flag;
250
            delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay;
xiongguangjie committed
251
        }
252
        TraceL << getIdentifier() << " CONCLUSION Phase from"<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);;
253 254
        HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
        res->dst_socket_id = _peer_socket_id;
255
        res->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
256 257 258 259 260 261 262 263 264 265 266 267 268
        res->mtu = _mtu;
        res->max_flow_window_size = _max_window_size;
        res->initial_packet_sequence_number = _init_seq_number;
        res->version = 5;
        res->encryption_field = HandshakePacket::NO_ENCRYPTION;
        res->extension_field = HandshakePacket::HS_EXT_FILED_HSREQ;
        res->handshake_type = HandshakePacket::HS_TYPE_CONCLUSION;
        res->srt_socket_id = _socket_id;
        res->syn_cookie = 0;
        res->assignPeerIP(addr);
        HSExtMessage::Ptr ext = std::make_shared<HSExtMessage>();
        ext->extension_type = HSExt::SRT_CMD_HSRSP;
        ext->srt_version = srtVersion(1, 5, 0);
xiongguangjie committed
269 270
        ext->srt_flag = srt_flag;
        ext->recv_tsbpd_delay = ext->send_tsbpd_delay = delay;
271 272 273 274 275
        res->ext_list.push_back(std::move(ext));
        res->storeToData();
        _handleshake_res = res;
        unregisterSelfHandshake();
        registerSelf();
276
        sendControlPacket(res, true);
277 278
        TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number
               << " latency=" << delay;
279 280
        _recv_buf = std::make_shared<PacketRecvQueue>(getPktBufSize(), _init_seq_number, delay * 1e3,srt_flag);
        _send_buf = std::make_shared<PacketSendQueue>(getPktBufSize(), delay * 1e3,srt_flag);
281
        _send_packet_seq_number = _init_seq_number;
282
        _buf_delay = delay;
ziyue committed
283
        onHandShakeFinished(_stream_id, addr);
284 285 286 287

        if(!isPusher()){
            _handleshake_timer.reset();
        }
288
    } else {
289 290 291 292 293 294
        if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION){
            if(_handleshake_res->dst_socket_id != pkt.srt_socket_id){
                TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
                onShutdown(SockException(Err_other, "client new connection"));
            }else{
                TraceL << getIdentifier() <<" CONCLUSION repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
295 296
                sendControlPacket(_handleshake_res, true);
            }
297

298
        }else{
299
            WarnL<<"not reach this";
300
        }
301
        return;
302
        
303
    }
304
    _last_ack_pkt_seq = _init_seq_number;
305
}
ziyue committed
306 307

void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr) {
308
    HandshakePacket pkt;
309 310 311 312
    if(!pkt.loadFromData(buf, len)){
        WarnL<<"is not vaild HandshakePacket";
        return;
    }
ziyue committed
313 314 315 316 317 318 319

    if (pkt.handshake_type == HandshakePacket::HS_TYPE_INDUCTION) {
        handleHandshakeInduction(pkt, addr);
    } else if (pkt.handshake_type == HandshakePacket::HS_TYPE_CONCLUSION) {
        handleHandshakeConclusion(pkt, addr);
    } else {
        WarnL << " not support handshake type = " << pkt.handshake_type;
320
        WarnL <<pkt.dump();
321
    }
322 323
    _ack_ticker.resetTime(_now);
    _nak_ticker.resetTime(_now);
324
}
ziyue committed
325 326 327

void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr) {
    // TraceL;
328
    sendKeepLivePacket();
329
}
330

ziyue committed
331
void SrtTransport::sendKeepLivePacket() {
332 333
    KeepLivePacket::Ptr pkt = std::make_shared<KeepLivePacket>();
    pkt->dst_socket_id = _peer_socket_id;
ziyue committed
334
    pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
335
    pkt->storeToData();
ziyue committed
336 337 338 339 340
    sendControlPacket(pkt, true);
}

void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
    // TraceL;
341
    ACKPacket ack;
ziyue committed
342
    if (!ack.loadFromData(buf, len)) {
xiongguangjie committed
343 344
        return;
    }
345 346 347

    ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>();
    pkt->dst_socket_id = _peer_socket_id;
ziyue committed
348
    pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
349 350
    pkt->ack_number = ack.ack_number;
    pkt->storeToData();
351
    _send_buf->drop(ack.last_ack_pkt_seq_number);
ziyue committed
352 353
    sendControlPacket(pkt, true);
    // TraceL<<"ack number "<<ack.ack_number;
354
}
ziyue committed
355 356

void SrtTransport::sendMsgDropReq(uint32_t first, uint32_t last) {
xiongguangjie committed
357 358
    MsgDropReqPacket::Ptr pkt = std::make_shared<MsgDropReqPacket>();
    pkt->dst_socket_id = _peer_socket_id;
ziyue committed
359
    pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
xiongguangjie committed
360 361 362
    pkt->first_pkt_seq_num = first;
    pkt->last_pkt_seq_num = last;
    pkt->storeToData();
ziyue committed
363
    sendControlPacket(pkt, true);
364
}
ziyue committed
365 366 367

void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
    // TraceL;
368
    NAKPacket pkt;
ziyue committed
369
    pkt.loadFromData(buf, len);
370
    bool empty = false;
371
    bool flush = false;
372

373
    for (auto& it : pkt.lost_list) {
ziyue committed
374
        if (pkt.lost_list.back() == it) {
375 376
            flush = true;
        }
377
        empty = true;
ziyue committed
378
        auto re_list = _send_buf->findPacketBySeq(it.first, it.second - 1);
379
        for (auto& pkt : re_list) {
380 381
            pkt->R = 1;
            pkt->storeToHeader();
ziyue committed
382
            sendPacket(pkt, flush);
383
            empty = false;
384
        }
ziyue committed
385 386
        if (empty) {
            sendMsgDropReq(it.first, it.second - 1);
387 388
        }
    }
389
}
ziyue committed
390 391

void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr) {
392 393
    TraceL;
}
ziyue committed
394 395

void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage *addr) {
396 397 398
    TraceL;
    onShutdown(SockException(Err_shutdown, "peer close connection"));
}
ziyue committed
399 400

void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr) {
401
    MsgDropReqPacket pkt;
ziyue committed
402
    pkt.loadFromData(buf, len);
403
    std::list<DataPacket::Ptr> list;
ziyue committed
404 405
    // TraceL<<"drop "<<pkt.first_pkt_seq_num<<" last "<<pkt.last_pkt_seq_num;
    _recv_buf->drop(pkt.first_pkt_seq_num, pkt.last_pkt_seq_num, list);
406
    //checkAndSendAckNak();
ziyue committed
407
    if (list.empty()) {
408 409
        return;
    }
xiongziliang committed
410
    // uint32_t max_seq = 0;
411
    for (auto& data : list) {
xiongziliang committed
412
        // max_seq = data->packet_seq_number;
413 414 415 416
        if (_last_pkt_seq + 1 != data->packet_seq_number) {
            TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number;
        }
        _last_pkt_seq = data->packet_seq_number;
417 418
        onSRTData(std::move(data));
    }
xiongguangjie committed
419
    /*
420 421 422 423 424 425 426 427 428 429
    _recv_nack.drop(max_seq);

    auto lost = _recv_buf->getLostSeq();
    _recv_nack.update(_now, lost);
    lost.clear();
    _recv_nack.getLostList(_now, _rtt, _rtt_variance, lost);
    if (!lost.empty()) {
        sendNAKPacket(lost);
        // TraceL << "check lost send nack";
    }
xiongguangjie committed
430
    */
431 432
}
void SrtTransport::checkAndSendAckNak(){
ziyue committed
433 434 435
    auto nak_interval = (_rtt + _rtt_variance * 4) / 2;
    if (nak_interval <= 20 * 1000) {
        nak_interval = 20 * 1000;
436
    }
ziyue committed
437
    if (_nak_ticker.elapsedTime(_now) > nak_interval) {
438
        auto lost = _recv_buf->getLostSeq();
ziyue committed
439 440
        if (!lost.empty()) {
            sendNAKPacket(lost);
441 442 443 444
        }
        _nak_ticker.resetTime(_now);
    }

ziyue committed
445
    if (_ack_ticker.elapsedTime(_now) > 10 * 1000) {
446 447
        _light_ack_pkt_count = 0;
        _ack_ticker.resetTime(_now);
ziyue committed
448
        // send a ack per 10 ms for receiver
449 450 451 452 453 454 455
        if(_last_ack_pkt_seq != _recv_buf->getExpectedSeq()){
            //TraceL<<"send a ack packet";
            sendACKPacket();
        }else{
            //TraceL<<" ignore repeate ack packet";
        }
        
ziyue committed
456 457
    } else {
        if (_light_ack_pkt_count >= 64) {
458
            // for high bitrate stream send light ack
ziyue committed
459
            // TODO
460
            sendLightACKPacket();
ziyue committed
461
            TraceL << "send light ack";
462 463 464 465
        }
        _light_ack_pkt_count = 0;
    }
    _light_ack_pkt_count++;
466
}
ziyue committed
467
void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr) {
468 469 470
    TraceL;
}

ziyue committed
471 472
void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
    // TraceL;
473
    ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>();
ziyue committed
474
    pkt->loadFromData(buf, len);
475

476 477 478 479 480 481
    if(_ack_send_timestamp.find(pkt->ack_number)!=_ack_send_timestamp.end()){
        uint32_t rtt = DurationCountMicroseconds(_now - _ack_send_timestamp[pkt->ack_number]);
        _rtt_variance = (3 * _rtt_variance + abs((long)_rtt - (long)rtt)) / 4;
        _rtt = (7 * rtt + _rtt) / 8;
        // TraceL<<" rtt:"<<_rtt<<" rtt variance:"<<_rtt_variance;
        _ack_send_timestamp.erase(pkt->ack_number);
482

483 484 485 486 487 488 489 490
        if(_last_recv_ackack_seq_num < pkt->ack_number){
            _last_recv_ackack_seq_num = pkt->ack_number;
        }else{
            if((_last_recv_ackack_seq_num-pkt->ack_number)>(MAX_TS>>1)){
                _last_recv_ackack_seq_num = pkt->ack_number;
            }
        }

491 492 493 494 495 496 497 498 499 500 501 502
        if(_ack_send_timestamp.size()>1000){
            // clear data
            for(auto it = _ack_send_timestamp.begin(); it != _ack_send_timestamp.end();){
                if(DurationCountMicroseconds(_now-it->second)>5e6){
                    // 超过五秒没有ackack 丢弃
                    it = _ack_send_timestamp.erase(it);
                }else{
                    it++;
                }
            }
        }

503
    }
504 505
}

ziyue committed
506
void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr) {
507 508 509 510
    TraceL;
}

void SrtTransport::sendACKPacket() {
511 512
    uint32_t recv_rate = 0;

ziyue committed
513
    ACKPacket::Ptr pkt = std::make_shared<ACKPacket>();
514
    pkt->dst_socket_id = _peer_socket_id;
515
    pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
516 517 518 519 520
    pkt->ack_number = ++_ack_number_count;
    pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq();
    pkt->rtt = _rtt;
    pkt->rtt_variance = _rtt_variance;
    pkt->available_buf_size = _recv_buf->getAvailableBufferSize();
521
    pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(recv_rate);
522
    pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity();
523
    pkt->recv_rate = recv_rate;
xiongguangjie committed
524
    if(0){
525 526 527 528 529 530 531 532 533
        TraceL<<pkt->pkt_recv_rate<<" pkt/s "<<recv_rate<<" byte/s "<<pkt->estimated_link_capacity<<" pkt/s (cap) "<<pkt->available_buf_size<<" available buf";
        //TraceL<<_pkt_recv_rate_context->dump();
        //TraceL<<"recv estimated:";
        //TraceL<< _pkt_recv_rate_context->dump();
        //TraceL<<"recv queue:";
        //TraceL<<_recv_buf->dump();
    }
    if(pkt->available_buf_size<2){
        pkt->available_buf_size = 2;
xiongguangjie committed
534
    }
535
    pkt->storeToData();
536
    _ack_send_timestamp[pkt->ack_number] = _now;
537
    _last_ack_pkt_seq = pkt->last_ack_pkt_seq_number;
ziyue committed
538 539
    sendControlPacket(pkt, true);
    // TraceL<<"send  ack "<<pkt->dump();
540
    // TraceL<<_recv_buf->dump();
541
}
ziyue committed
542

543
void SrtTransport::sendLightACKPacket() {
ziyue committed
544 545
    ACKPacket::Ptr pkt = std::make_shared<ACKPacket>();

546
    pkt->dst_socket_id = _peer_socket_id;
547
    pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
548 549 550 551 552 553 554 555 556
    pkt->ack_number = 0;
    pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq();
    pkt->rtt = 0;
    pkt->rtt_variance = 0;
    pkt->available_buf_size = 0;
    pkt->pkt_recv_rate = 0;
    pkt->estimated_link_capacity = 0;
    pkt->recv_rate = 0;
    pkt->storeToData();
557
    _last_ack_pkt_seq = pkt->last_ack_pkt_seq_number;
ziyue committed
558 559
    sendControlPacket(pkt, true);
    TraceL << "send  ack " << pkt->dump();
560 561
}

ziyue committed
562
void SrtTransport::sendNAKPacket(std::list<PacketQueue::LostPair> &lost_list) {
563
    NAKPacket::Ptr pkt = std::make_shared<NAKPacket>();
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
    std::list<PacketQueue::LostPair> tmp;
    auto size = NAKPacket::getCIFSize(lost_list);
    size_t paylaod_size = getPayloadSize();
    if (size > paylaod_size) {
        WarnL << "loss report cif size " << size;
        size_t num = paylaod_size / 8;

        size_t msgNum = (lost_list.size() + num - 1) / num;
        decltype(lost_list.begin()) cur, next;
        for (size_t i = 0; i < msgNum; ++i) {
            cur = lost_list.begin();
            std::advance(cur, i * num);

            if (i == msgNum - 1) {
                next = lost_list.end();
            } else {
                next = lost_list.begin();
                std::advance(next, (i + 1) * num);
            }
            tmp.assign(cur, next);
            pkt->dst_socket_id = _peer_socket_id;
            pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
            pkt->lost_list = tmp;
            pkt->storeToData();
            sendControlPacket(pkt, true);
        }
590

591 592 593 594 595 596 597
    } else {
        pkt->dst_socket_id = _peer_socket_id;
        pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
        pkt->lost_list = lost_list;
        pkt->storeToData();
        sendControlPacket(pkt, true);
    }
598

ziyue committed
599
    // TraceL<<"send NAK "<<pkt->dump();
600
}
601

ziyue committed
602
void SrtTransport::sendShutDown() {
603 604 605 606
    ShutDownPacket::Ptr pkt = std::make_shared<ShutDownPacket>();
    pkt->dst_socket_id = _peer_socket_id;
    pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
    pkt->storeToData();
ziyue committed
607
    sendControlPacket(pkt, true);
608
}
ziyue committed
609 610

void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr) {
611
    DataPacket::Ptr pkt = std::make_shared<DataPacket>();
ziyue committed
612
    pkt->loadFromData(buf, len);
613

614 615
    _estimated_link_capacity_context->inputPacket(_now,pkt);

616
    std::list<DataPacket::Ptr> list;
617
    //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
618
    //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
ziyue committed
619
    _recv_buf->inputPacket(pkt, list);
620 621 622
    if (list.empty()) {
        // when no data ok send nack to sender immediately
    } else {
xiongziliang committed
623
        // uint32_t last_seq;
624
        for (auto& data : list) {
xiongziliang committed
625
            // last_seq = data->packet_seq_number;
626 627 628 629
            if (_last_pkt_seq + 1 != data->packet_seq_number) {
                TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number;
            }
            _last_pkt_seq = data->packet_seq_number;
630 631 632
            onSRTData(std::move(data));
        }

xiongguangjie committed
633 634 635
        //_recv_nack.drop(last_seq);
    }
    /*
636 637 638 639 640 641 642
    auto lost = _recv_buf->getLostSeq();
    _recv_nack.update(_now, lost);
    lost.clear();
    _recv_nack.getLostList(_now, _rtt, _rtt_variance, lost);
    if (!lost.empty()) {
        // TraceL << "check lost send nack immediately";
        sendNAKPacket(lost);
xiongguangjie committed
643
    }
xiongguangjie committed
644
    */
645
   /*
ziyue committed
646 647 648
    auto nak_interval = (_rtt + _rtt_variance * 4) / 2;
    if (nak_interval <= 20 * 1000) {
        nak_interval = 20 * 1000;
649
    }
650

ziyue committed
651
    if (_nak_ticker.elapsedTime(_now) > nak_interval) {
652
        // Periodic NAK reports
653
        auto lost = _recv_buf->getLostSeq();
ziyue committed
654 655 656 657 658
        if (!lost.empty()) {
            sendNAKPacket(lost);
            // TraceL<<"send NAK";
        } else {
            // TraceL<<"lost is empty";
659
        }
660
        _nak_ticker.resetTime(_now);
661 662
    }

ziyue committed
663
    if (_ack_ticker.elapsedTime(_now) > 10 * 1000) {
xiongguangjie committed
664
        _light_ack_pkt_count = 0;
665
        _ack_ticker.resetTime(_now);
ziyue committed
666
        // send a ack per 10 ms for receiver
xiongguangjie committed
667
        sendACKPacket();
ziyue committed
668 669
    } else {
        if (_light_ack_pkt_count >= 64) {
xiongguangjie committed
670
            // for high bitrate stream send light ack
ziyue committed
671
            // TODO
xiongguangjie committed
672
            sendLightACKPacket();
ziyue committed
673
            TraceL << "send light ack";
xiongguangjie committed
674 675
        }
        _light_ack_pkt_count = 0;
676
    }
xiongguangjie committed
677
    _light_ack_pkt_count++;
678
    */
ziyue committed
679
    // bufCheckInterval();
680 681
}

ziyue committed
682 683 684
void SrtTransport::sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush) {
    pkt->storeToData((uint8_t *)buf, len);
    sendPacket(pkt, flush);
685
    _send_buf->inputPacket(pkt);
686
}
ziyue committed
687 688 689 690 691 692 693 694 695 696 697 698 699

void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) {
    sendPacket(pkt, flush);
}

void SrtTransport::sendPacket(Buffer::Ptr pkt, bool flush) {
    if (_selected_session) {
        auto tmp = _packet_pool.obtain2();
        tmp->assign(pkt->data(), pkt->size());
        _selected_session->setSendFlushFlag(flush);
        _selected_session->send(std::move(tmp));
    } else {
        WarnL << "not reach this";
700 701
    }
}
ziyue committed
702 703

std::string SrtTransport::getIdentifier() {
704 705 706
    return _selected_session ? _selected_session->getIdentifier() : "";
}

ziyue committed
707 708
void SrtTransport::registerSelfHandshake() {
    SrtTransportManager::Instance().addHandshakeItem(std::to_string(_sync_cookie), shared_from_this());
709
}
ziyue committed
710 711 712

void SrtTransport::unregisterSelfHandshake() {
    if (_sync_cookie == 0) {
713 714 715 716 717 718
        return;
    }
    SrtTransportManager::Instance().removeHandshakeItem(std::to_string(_sync_cookie));
}

void SrtTransport::registerSelf() {
ziyue committed
719
    if (_socket_id == 0) {
720 721
        return;
    }
ziyue committed
722
    SrtTransportManager::Instance().addItem(std::to_string(_socket_id), shared_from_this());
723
}
ziyue committed
724 725

void SrtTransport::unregisterSelf() {
726 727 728
    SrtTransportManager::Instance().removeItem(std::to_string(_socket_id));
}

ziyue committed
729
void SrtTransport::onShutdown(const SockException &ex) {
730
    sendShutDown();
731 732 733 734 735 736 737 738 739 740
    WarnL << ex.what();
    unregisterSelfHandshake();
    unregisterSelf();
    for (auto &pr : _history_sessions) {
        auto session = pr.second.lock();
        if (session) {
            session->shutdown(ex);
        }
    }
}
ziyue committed
741 742 743

size_t SrtTransport::getPayloadSize() {
    size_t ret = (_mtu - 28 - 16) / 188 * 188;
744 745
    return ret;
}
ziyue committed
746 747 748

void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush) {
    // TraceL;
749 750
    DataPacket::Ptr pkt;
    size_t payloadSize = getPayloadSize();
ziyue committed
751 752 753
    size_t size = buffer->size();
    char *ptr = buffer->data();
    char *end = buffer->data() + size;
754

ziyue committed
755
    while (ptr < end && size >= payloadSize) {
756 757
        pkt = std::make_shared<DataPacket>();
        pkt->f = 0;
ziyue committed
758 759
        pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff;
        _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff;
760 761 762 763 764 765 766
        pkt->PP = 3;
        pkt->O = 0;
        pkt->KK = 0;
        pkt->R = 0;
        pkt->msg_number = _send_msg_number++;
        pkt->dst_socket_id = _peer_socket_id;
        pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
ziyue committed
767
        sendDataPacket(pkt, ptr, (int)payloadSize, flush);
768 769 770 771
        ptr += payloadSize;
        size -= payloadSize;
    }

ziyue committed
772
    if (size > 0 && ptr < end) {
773 774
        pkt = std::make_shared<DataPacket>();
        pkt->f = 0;
ziyue committed
775 776
        pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff;
        _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff;
777 778 779 780 781 782 783
        pkt->PP = 3;
        pkt->O = 0;
        pkt->KK = 0;
        pkt->R = 0;
        pkt->msg_number = _send_msg_number++;
        pkt->dst_socket_id = _peer_socket_id;
        pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
ziyue committed
784
        sendDataPacket(pkt, ptr, (int)size, flush);
785 786
    }
}
ziyue committed
787

788
////////////  SrtTransportManager //////////////////////////
ziyue committed
789

790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
SrtTransportManager &SrtTransportManager::Instance() {
    static SrtTransportManager s_instance;
    return s_instance;
}

void SrtTransportManager::addItem(const std::string &key, const SrtTransport::Ptr &ptr) {
    std::lock_guard<std::mutex> lck(_mtx);
    _map[key] = ptr;
}

SrtTransport::Ptr SrtTransportManager::getItem(const std::string &key) {
    if (key.empty()) {
        return nullptr;
    }
    std::lock_guard<std::mutex> lck(_mtx);
    auto it = _map.find(key);
    if (it == _map.end()) {
        return nullptr;
    }
    return it->second.lock();
}

void SrtTransportManager::removeItem(const std::string &key) {
    std::lock_guard<std::mutex> lck(_mtx);
    _map.erase(key);
}

void SrtTransportManager::addHandshakeItem(const std::string &key, const SrtTransport::Ptr &ptr) {
    std::lock_guard<std::mutex> lck(_handshake_mtx);
    _handshake_map[key] = ptr;
}
ziyue committed
821

822
void SrtTransportManager::removeHandshakeItem(const std::string &key) {
ziyue committed
823
    std::lock_guard<std::mutex> lck(_handshake_mtx);
824 825
    _handshake_map.erase(key);
}
ziyue committed
826

827 828 829 830 831 832 833 834 835 836 837 838 839
SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const std::string &key) {
    if (key.empty()) {
        return nullptr;
    }
    std::lock_guard<std::mutex> lck(_handshake_mtx);
    auto it = _handshake_map.find(key);
    if (it == _handshake_map.end()) {
        return nullptr;
    }
    return it->second.lock();
}

} // namespace SRT