RtpReceiver.h 4.96 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xiongziliang committed
9 10 11 12 13
 */

#ifndef ZLMEDIAKIT_RTPRECEIVER_H
#define ZLMEDIAKIT_RTPRECEIVER_H

xiongziliang committed
14
#include <map>
xiongziliang committed
15 16
#include <string>
#include <memory>
xiongziliang committed
17
#include "RtpCodec.h"
xiongziliang committed
18 19 20 21 22 23
#include "RtspMediaSource.h"
using namespace std;
using namespace toolkit;

namespace mediakit {

xiongziliang committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
template<typename T, typename SEQ = uint16_t>
class PacketSortor {
public:
    PacketSortor() = default;
    ~PacketSortor() = default;

    /**
     * 设置参数
     * @param max_sort_size 最大排序缓存长度
     * @param clear_sort_size seq连续次数超过该值后,清空并关闭排序缓存
     */
    void setup(uint32_t max_sort_size, uint32_t clear_sort_size) {
        _max_sort_size = max_sort_size;
        _clear_sort_size = clear_sort_size;
    }

    void setOnSort(function<void(SEQ seq, const T &packet)> cb){
        _cb = std::move(cb);
    }

    /**
     * 清空状态
     */
    void clear() {
        _last_seq = 0;
        _seq_ok_count = 0;
        _sort_started = 0;
        _seq_cycle_count = 0;
        _rtp_sort_cache_map.clear();
    }

    /**
     * 获取排序缓存长度
     */
    int getJitterSize(){
        return _rtp_sort_cache_map.size();
    }

    /**
     * 获取seq回环次数
     */
    int getCycleCount(){
        return _seq_cycle_count;
    }

    /**
     * 输入并排序
     * @param seq 序列号
     * @param packet 包负载
     */
    void sortPacket(SEQ seq, const T &packet){
        if (seq != _last_seq + 1 && _last_seq != 0) {
            //包乱序或丢包
            _seq_ok_count = 0;
            _sort_started = true;
            if (_last_seq > seq && _last_seq - seq > 0xFF) {
                //sequence回环,清空所有排序缓存
                while (_rtp_sort_cache_map.size()) {
                    popPacket();
                }
                ++_seq_cycle_count;
            }
        } else {
            //正确序列的包
            _seq_ok_count++;
        }

        _last_seq = seq;

        //开始排序缓存
        if (_sort_started) {
            _rtp_sort_cache_map.emplace(seq, packet);
            if (_seq_ok_count >= _clear_sort_size) {
                //网络环境改善,需要清空排序缓存
                _seq_ok_count = 0;
                _sort_started = false;
                while (_rtp_sort_cache_map.size()) {
                    popPacket();
                }
            } else if (_rtp_sort_cache_map.size() >= _max_sort_size) {
                //排序缓存溢出
                popPacket();
            }
        } else {
            //正确序列
            onPacketSorted(seq, packet);
        }
    }

private:
    void popPacket() {
        auto it = _rtp_sort_cache_map.begin();
        onPacketSorted(it->first, it->second);
        _rtp_sort_cache_map.erase(it);
    }

    void onPacketSorted(SEQ seq, const T &packet) {
        _cb(seq, packet);
    }

private:
    //是否开始seq排序
    bool _sort_started = false;
    //上次seq
    SEQ _last_seq = 0;
    //seq连续次数计数
    uint32_t _seq_ok_count = 0;
    //seq回环次数计数
    uint32_t _seq_cycle_count = 0;
    //排序缓存长度
    uint32_t _max_sort_size;
    //seq连续次数超过该值后,清空并关闭排序缓存
    uint32_t _clear_sort_size;
    //rtp排序缓存,根据seq排序
    map<SEQ, T> _rtp_sort_cache_map;
    //回调
    function<void(SEQ seq, const T &packet)> _cb;
};

xiongziliang committed
143 144 145 146 147
class RtpReceiver {
public:
    RtpReceiver();
    virtual ~RtpReceiver();

xiongziliang committed
148
protected:
xiongziliang committed
149 150
    /**
     * 输入数据指针生成并排序rtp包
151
     * @param track_index track下标索引
xiongziliang committed
152 153
     * @param type track类型
     * @param samplerate rtp时间戳基准时钟,视频为90000,音频为采样率
154 155
     * @param rtp_raw_ptr rtp数据指针
     * @param rtp_raw_len rtp数据指针长度
xiongziliang committed
156 157
     * @return 解析成功返回true
     */
xiongziliang committed
158
    bool handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len);
xiongziliang committed
159 160 161

    /**
     * rtp数据包排序后输出
xiongziliang committed
162 163
     * @param rtp rtp数据包
     * @param track_index track索引
xiongziliang committed
164
     */
xiongziliang committed
165
    virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){}
xiongziliang committed
166

xiongziliang committed
167 168
    void clear();
    void setPoolSize(int size);
xiongziliang committed
169 170
    int getJitterSize(int track_index);
    int getCycleCount(int track_index);
xiongziliang committed
171

xiongziliang committed
172
private:
173
    void sortRtp(const RtpPacket::Ptr &rtp , int track_index);
xiongziliang committed
174

175
private:
xiongziliang committed
176
    uint32_t _ssrc[2] = { 0, 0 };
177 178 179
    //ssrc不匹配计数
    uint32_t _ssrc_err_count[2] = { 0, 0 };
    //rtp排序缓存,根据seq排序
xiongziliang committed
180
    PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
181 182
    //rtp循环池
    RtspMediaSource::PoolType _rtp_pool;
xiongziliang committed
183 184 185 186 187 188
};

}//namespace mediakit


#endif //ZLMEDIAKIT_RTPRECEIVER_H