Nack.cpp 6.71 KB
Newer Older
ziyue committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
/*
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
 *
 * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).
 *
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
 */

#include "Nack.h"

static constexpr uint32_t kMaxNackMS = 10 * 1000;

void NackList::push_back(RtpPacket::Ptr rtp) {
    auto seq = rtp->getSeq();
    _nack_cache_seq.emplace_back(seq);
    _nack_cache_pkt.emplace(seq, std::move(rtp));
    while (get_cache_ms() > kMaxNackMS) {
        //需要清除部分nack缓存
        pop_front();
    }
}

void NackList::for_each_nack(const FCI_NACK &nack, const function<void(const RtpPacket::Ptr &rtp)> &func) {
    auto seq = nack.getPid();
    for (auto bit : nack.getBitArray()) {
        if (bit) {
            //丢包
            RtpPacket::Ptr *ptr = get_rtp(seq);
            if (ptr) {
                func(*ptr);
            }
        }
        ++seq;
    }
}

void NackList::pop_front() {
    if (_nack_cache_seq.empty()) {
        return;
    }
    _nack_cache_pkt.erase(_nack_cache_seq.front());
    _nack_cache_seq.pop_front();
}

RtpPacket::Ptr *NackList::get_rtp(uint16_t seq) {
    auto it = _nack_cache_pkt.find(seq);
    if (it == _nack_cache_pkt.end()) {
        return nullptr;
    }
    return &it->second;
}

uint32_t NackList::get_cache_ms() {
    if (_nack_cache_seq.size() < 2) {
        return 0;
    }
    uint32_t back = _nack_cache_pkt[_nack_cache_seq.back()]->getStampMS();
    uint32_t front = _nack_cache_pkt[_nack_cache_seq.front()]->getStampMS();
61
    if (back >= front) {
ziyue committed
62 63 64 65 66 67 68 69
        return back - front;
    }
    //很有可能回环了
    return back + (UINT32_MAX - front);
}

////////////////////////////////////////////////////////////////////////////////////////////////

70
void NackContext::received(uint16_t seq, bool is_rtx) {
ziyue committed
71 72 73
    if (!_last_max_seq && _seq.empty()) {
        _last_max_seq = seq - 1;
    }
74 75 76 77 78 79 80
    if (is_rtx || (seq < _last_max_seq && !(seq < 1024 && _last_max_seq > UINT16_MAX - 1024))) {
        //重传包或
        //seq回退,且非回环,那么这个应该是重传包
        onRtx(seq);
        return;
    }

ziyue committed
81 82 83 84 85 86 87 88
    _seq.emplace(seq);
    auto max_seq = *_seq.rbegin();
    auto min_seq = *_seq.begin();
    auto diff = max_seq - min_seq;
    if (!diff) {
        return;
    }

wxf committed
89
    if (diff > UINT16_MAX / 2) {
ziyue committed
90 91 92
        //回环
        _seq.clear();
        _last_max_seq = min_seq;
ziyue committed
93
        _nack_send_status.clear();
ziyue committed
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
        return;
    }

    if (_seq.size() == diff + 1 && _last_max_seq + 1 == min_seq) {
        //都是连续的seq,未丢包
        _seq.clear();
        _last_max_seq = max_seq;
    } else {
        //seq不连续,有丢包
        if (min_seq == _last_max_seq + 1) {
            //前面部分seq是连续的,未丢包,移除之
            eraseFrontSeq();
        }

        //有丢包,丢包从_last_max_seq开始
109 110
        auto nack_rtp_count = FCI_NACK::kBitSize;
        if (max_seq - _last_max_seq > nack_rtp_count) {
ziyue committed
111
            vector<bool> vec;
112 113
            vec.resize(FCI_NACK::kBitSize, false);
            for (auto i = 0; i < nack_rtp_count; ++i) {
ziyue committed
114 115
                vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end();
            }
116 117
            doNack(FCI_NACK(_last_max_seq + 1, vec), true);
            _last_max_seq += nack_rtp_count + 1;
ziyue committed
118 119 120
            if (_last_max_seq >= max_seq) {
                _seq.clear();
            } else {
121
                auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq + 1);
ziyue committed
122 123 124 125 126 127 128 129 130 131
                _seq.erase(_seq.begin(), it);
            }
        }
    }
}

void NackContext::setOnNack(onNack cb) {
    _cb = std::move(cb);
}

132 133 134 135
void NackContext::doNack(const FCI_NACK &nack, bool record_nack) {
    if (record_nack) {
        recordNack(nack);
    }
ziyue committed
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
    if (_cb) {
        _cb(nack);
    }
}

void NackContext::eraseFrontSeq() {
    //前面部分seq是连续的,未丢包,移除之
    for (auto it = _seq.begin(); it != _seq.end();) {
        if (*it != _last_max_seq + 1) {
            //seq不连续,丢包了
            break;
        }
        _last_max_seq = *it;
        it = _seq.erase(it);
    }
151 152 153
}

void NackContext::onRtx(uint16_t seq) {
ziyue committed
154 155
    auto it = _nack_send_status.find(seq);
    if (it == _nack_send_status.end()) {
156 157 158
        return;
    }
    auto rtt = getCurrentMillisecond() - it->second.update_stamp;
ziyue committed
159
    _nack_send_status.erase(it);
160 161 162 163 164 165 166 167 168 169 170 171 172

    if (rtt >= 0) {
        //rtt不肯小于0
        _rtt = rtt;
        //InfoL << "rtt:" << rtt;
    }
}

void NackContext::recordNack(const FCI_NACK &nack) {
    auto now = getCurrentMillisecond();
    auto i = nack.getPid();
    for (auto flag : nack.getBitArray()) {
        if (flag) {
ziyue committed
173
            auto &ref = _nack_send_status[i];
174 175 176 177 178 179 180
            ref.first_stamp = now;
            ref.update_stamp = now;
            ref.nack_count = 1;
        }
        ++i;
    }
    //记录太多了,移除一部分早期的记录
ziyue committed
181 182
    while (_nack_send_status.size() > kNackMaxSize) {
        _nack_send_status.erase(_nack_send_status.begin());
183 184 185 186 187 188
    }
}

uint64_t NackContext::reSendNack() {
    set<uint16_t> nack_rtp;
    auto now = getCurrentMillisecond();
ziyue committed
189
    for (auto it = _nack_send_status.begin(); it != _nack_send_status.end();) {
190 191
        if (now - it->second.first_stamp > kNackMaxMS) {
            //该rtp丢失太久了,不再要求重传
ziyue committed
192
            it = _nack_send_status.erase(it);
193 194
            continue;
        }
ziyue committed
195
        if (now - it->second.update_stamp < kNackIntervalRatio * _rtt) {
196 197 198 199 200 201 202 203 204 205
            //距离上次nack不足2倍的rtt,不用再发送nack
            ++it;
            continue;
        }
        //此rtp需要请求重传
        nack_rtp.emplace(it->first);
        //更新nack发送时间戳
        it->second.update_stamp = now;
        if (++(it->second.nack_count) == kNackMaxCount) {
            //nack次数太多,移除之
ziyue committed
206
            it = _nack_send_status.erase(it);
207 208 209 210 211
            continue;
        }
        ++it;
    }

ziyue committed
212
    if (_nack_send_status.empty()) {
213 214 215 216 217 218 219 220 221
        //不需要再发送nack
        return 0;
    }

    int pid = -1;
    vector<bool> vec;
    for (auto it = nack_rtp.begin(); it != nack_rtp.end();) {
        if (pid == -1) {
            pid = *it;
ziyue committed
222
            vec.resize(FCI_NACK::kBitSize, false);
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
            ++it;
            continue;
        }
        auto inc = *it - pid;
        if (inc > FCI_NACK::kBitSize) {
            //新的nack包
            doNack(FCI_NACK(pid, vec), false);
            pid = -1;
            continue;
        }
        //这个包丢了
        vec[inc - 1] = true;
        ++it;
    }
    if (pid != -1) {
        doNack(FCI_NACK(pid, vec), false);
    }

    //重传间隔不得低于5ms
242
    return max(_rtt, 5);
243
}