RtpReceiver.h 5.91 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3
 *
4
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
xiongziliang committed
5
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xiongziliang committed
9 10 11 12 13
 */

#ifndef ZLMEDIAKIT_RTPRECEIVER_H
#define ZLMEDIAKIT_RTPRECEIVER_H

xiongziliang committed
14
#include <map>
xiongziliang committed
15 16
#include <string>
#include <memory>
xiongziliang committed
17
#include "RtpCodec.h"
xiongziliang committed
18 19 20 21 22 23
#include "RtspMediaSource.h"
using namespace std;
using namespace toolkit;

namespace mediakit {

24
template<typename T, typename SEQ = uint16_t, size_t kMax = 256, size_t kMin = 10>
xiongziliang committed
25 26 27 28 29
class PacketSortor {
public:
    PacketSortor() = default;
    ~PacketSortor() = default;

30
    void setOnSort(function<void(SEQ seq, T &packet)> cb) {
xiongziliang committed
31 32 33 34 35 36 37 38 39
        _cb = std::move(cb);
    }

    /**
     * 清空状态
     */
    void clear() {
        _seq_cycle_count = 0;
        _rtp_sort_cache_map.clear();
40 41
        _next_seq_out = 0;
        _max_sort_size = kMin;
xiongziliang committed
42 43 44 45 46
    }

    /**
     * 获取排序缓存长度
     */
47
    size_t getJitterSize() const{
xiongziliang committed
48 49 50 51 52 53
        return _rtp_sort_cache_map.size();
    }

    /**
     * 获取seq回环次数
     */
54
    size_t getCycleCount() const{
xiongziliang committed
55 56 57 58 59 60 61 62
        return _seq_cycle_count;
    }

    /**
     * 输入并排序
     * @param seq 序列号
     * @param packet 包负载
     */
63
    void sortPacket(SEQ seq, T packet) {
xiongziliang committed
64
        if (seq < _next_seq_out) {
xia-chu committed
65
            if (_next_seq_out < seq + kMax) {
xiongziliang committed
66 67 68 69 70 71
                //过滤seq回退包(回环包除外)
                return;
            }
        } else if (_next_seq_out && seq - _next_seq_out > (0xFFFF >> 1)) {
            //过滤seq跳变非常大的包(防止回环时乱序时收到非常大的seq)
            return;
xiongziliang committed
72
        }
xiongziliang committed
73

74 75 76 77
        //放入排序缓存
        _rtp_sort_cache_map.emplace(seq, std::move(packet));
        //尝试输出排序后的包
        tryPopPacket();
xiongziliang committed
78 79
    }

xiongziliang committed
80 81 82 83 84 85 86
    void flush(){
        //清空缓存
        while (!_rtp_sort_cache_map.empty()) {
            popIterator(_rtp_sort_cache_map.begin());
        }
    }

xiongziliang committed
87 88 89
private:
    void popPacket() {
        auto it = _rtp_sort_cache_map.begin();
xiongziliang committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
        if (it->first >= _next_seq_out) {
            //过滤回跳包
            popIterator(it);
            return;
        }

        if (_next_seq_out - it->first > (0xFFFF >> 1)) {
            //产生回环了
            if (_rtp_sort_cache_map.size() < 2 * kMin) {
                //等足够多的数据后才处理回环, 因为后面还可能出现大的SEQ
                return;
            }
            ++_seq_cycle_count;
            //找到大的SEQ并清空掉,然后从小的SEQ重新开始排序
            auto hit = _rtp_sort_cache_map.upper_bound((SEQ) (_next_seq_out - _rtp_sort_cache_map.size()));
            while (hit != _rtp_sort_cache_map.end()) {
                //回环前,清空剩余的大的SEQ的数据
                _cb(hit->first, hit->second);
                hit = _rtp_sort_cache_map.erase(hit);
            }
            //下一个回环的数据
            popIterator(_rtp_sort_cache_map.begin());
            return;
        }
        //删除回跳的数据包
        _rtp_sort_cache_map.erase(it);
    }

    void popIterator(typename map<SEQ, T>::iterator it) {
119 120
        auto seq = it->first;
        auto data = std::move(it->second);
xiongziliang committed
121
        _rtp_sort_cache_map.erase(it);
122 123
        _next_seq_out = seq + 1;
        _cb(seq, data);
xiongziliang committed
124 125
    }

126
    void tryPopPacket() {
xiongziliang committed
127
        int count = 0;
128 129 130
        while ((!_rtp_sort_cache_map.empty() && _rtp_sort_cache_map.begin()->first == _next_seq_out)) {
            //找到下个包,直接输出
            popPacket();
xiongziliang committed
131
            ++count;
132 133
        }

xiongziliang committed
134
        if (count) {
135 136 137 138 139 140 141 142 143
            setSortSize();
        } else if (_rtp_sort_cache_map.size() > _max_sort_size) {
            //排序缓存溢出,不再继续排序
            popPacket();
            setSortSize();
        }
    }

    void setSortSize() {
xiongziliang committed
144
        _max_sort_size = kMin + _rtp_sort_cache_map.size();
145 146 147
        if (_max_sort_size > kMax) {
            _max_sort_size = kMax;
        }
xiongziliang committed
148 149 150
    }

private:
151 152
    //下次应该输出的SEQ
    SEQ _next_seq_out = 0;
xiongziliang committed
153
    //seq回环次数计数
154
    size_t _seq_cycle_count = 0;
xiongziliang committed
155
    //排序缓存长度
156
    size_t _max_sort_size = kMin;
xiongziliang committed
157 158 159
    //rtp排序缓存,根据seq排序
    map<SEQ, T> _rtp_sort_cache_map;
    //回调
160
    function<void(SEQ seq, T &packet)> _cb;
xiongziliang committed
161 162
};

xiongziliang committed
163 164 165 166 167
class RtpReceiver {
public:
    RtpReceiver();
    virtual ~RtpReceiver();

168 169 170 171 172 173 174
    class BadRtpException : public invalid_argument {
    public:
        template<typename Type>
        BadRtpException(Type &&type) : invalid_argument(std::forward<Type>(type)) {}
        ~BadRtpException() = default;
    };

xiongziliang committed
175
protected:
xiongziliang committed
176 177
    /**
     * 输入数据指针生成并排序rtp包
xiongziliang committed
178
     * @param index track下标索引
xiongziliang committed
179 180
     * @param type track类型
     * @param samplerate rtp时间戳基准时钟,视频为90000,音频为采样率
xiongziliang committed
181 182
     * @param ptr rtp数据指针
     * @param len rtp数据指针长度
xiongziliang committed
183 184
     * @return 解析成功返回true
     */
xiongziliang committed
185
    bool handleOneRtp(int index, TrackType type, int samplerate, uint8_t *ptr, size_t len);
xiongziliang committed
186 187 188

    /**
     * rtp数据包排序后输出
xiongziliang committed
189 190
     * @param rtp rtp数据包
     * @param track_index track索引
xiongziliang committed
191
     */
xia-chu committed
192
    virtual void onRtpSorted(RtpPacket::Ptr rtp, int track_index) {}
xiongziliang committed
193

xiongziliang committed
194 195 196 197 198 199 200
    /**
     * 解析出rtp但还未排序
     * @param rtp rtp数据包
     * @param track_index track索引
     */
    virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) {}

xiongziliang committed
201
    void clear();
202 203
    size_t getJitterSize(int track_index) const;
    size_t getCycleCount(int track_index) const;
ziyue committed
204
    uint32_t getSSRC(int track_index) const;
xiongziliang committed
205

xiongziliang committed
206
private:
207
    uint32_t _ssrc[2] = {0, 0};
208
    Ticker _ssrc_alive[2];
209
    //rtp排序缓存,根据seq排序
xiongziliang committed
210
    PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
xiongziliang committed
211 212 213 214 215 216
};

}//namespace mediakit


#endif //ZLMEDIAKIT_RTPRECEIVER_H