Commit 6d8d64ca by xiongguangjie

for mergen

parents e7e7906e 8c670c65
...@@ -68,7 +68,6 @@ bool H265Frame::isKeyFrame(int type, const char *ptr) { ...@@ -68,7 +68,6 @@ bool H265Frame::isKeyFrame(int type, const char *ptr) {
return (((*((uint8_t *) ptr + 2)) >> 7) & 0x01) == 1 && (type == NAL_IDR_N_LP || type == NAL_IDR_W_RADL); return (((*((uint8_t *) ptr + 2)) >> 7) & 0x01) == 1 && (type == NAL_IDR_N_LP || type == NAL_IDR_W_RADL);
} }
return false; return false;
} }
H265Frame::H265Frame(){ H265Frame::H265Frame(){
......
...@@ -18,8 +18,7 @@ void RtcpContext::clear() { ...@@ -18,8 +18,7 @@ void RtcpContext::clear() {
memset(this, 0, sizeof(RtcpContext)); memset(this, 0, sizeof(RtcpContext));
} }
RtcpContext::RtcpContext(uint32_t sample_rate, bool is_receiver) { RtcpContext::RtcpContext(bool is_receiver) {
_sample_rate = sample_rate;
_is_receiver = is_receiver; _is_receiver = is_receiver;
} }
...@@ -35,7 +34,6 @@ void RtcpContext::onRtp(uint16_t seq, uint32_t stamp, size_t bytes) { ...@@ -35,7 +34,6 @@ void RtcpContext::onRtp(uint16_t seq, uint32_t stamp, size_t bytes) {
diff = -diff; diff = -diff;
} }
//抖动单位为采样次数 //抖动单位为采样次数
diff *= (_sample_rate / 1000.0);
_jitter += (diff - _jitter) / 16.0; _jitter += (diff - _jitter) / 16.0;
} else { } else {
_jitter = 0; _jitter = 0;
...@@ -129,7 +127,7 @@ Buffer::Ptr RtcpContext::createRtcpSR(uint32_t rtcp_ssrc) { ...@@ -129,7 +127,7 @@ Buffer::Ptr RtcpContext::createRtcpSR(uint32_t rtcp_ssrc) {
rtcp->setNtpStamp(tv); rtcp->setNtpStamp(tv);
//转换成rtp时间戳 //转换成rtp时间戳
rtcp->rtpts = htonl(uint32_t(_last_rtp_stamp * (_sample_rate / 1000.0))); rtcp->rtpts = htonl(_last_rtp_stamp);
rtcp->packet_count = htonl((uint32_t) _packets); rtcp->packet_count = htonl((uint32_t) _packets);
rtcp->octet_count = htonl((uint32_t) _bytes); rtcp->octet_count = htonl((uint32_t) _bytes);
return RtcpHeader::toBuffer(std::move(rtcp)); return RtcpHeader::toBuffer(std::move(rtcp));
......
...@@ -22,15 +22,14 @@ public: ...@@ -22,15 +22,14 @@ public:
using Ptr = std::shared_ptr<RtcpContext>; using Ptr = std::shared_ptr<RtcpContext>;
/** /**
* 创建rtcp上下文 * 创建rtcp上下文
* @param sample_rate 音频采用率,视频一般为90000
* @param is_receiver 是否为rtp接收者,接收者更消耗性能 * @param is_receiver 是否为rtp接收者,接收者更消耗性能
*/ */
RtcpContext(uint32_t sample_rate, bool is_receiver); RtcpContext(bool is_receiver);
/** /**
* 输出或输入rtp时调用 * 输出或输入rtp时调用
* @param seq rtp的seq * @param seq rtp的seq
* @param stamp rtp的时间戳,单位毫秒 * @param stamp rtp的时间戳,单位采样数(非毫秒)
* @param bytes rtp数据长度 * @param bytes rtp数据长度
*/ */
void onRtp(uint16_t seq, uint32_t stamp, size_t bytes); void onRtp(uint16_t seq, uint32_t stamp, size_t bytes);
...@@ -87,8 +86,6 @@ private: ...@@ -87,8 +86,6 @@ private:
bool _is_receiver; bool _is_receiver;
//时间戳抖动值 //时间戳抖动值
double _jitter = 0; double _jitter = 0;
//视频默认90000,音频为采样率
uint32_t _sample_rate;
//收到或发送的rtp的字节数 //收到或发送的rtp的字节数
size_t _bytes = 0; size_t _bytes = 0;
//收到或发送的rtp的个数 //收到或发送的rtp的个数
......
...@@ -24,36 +24,23 @@ static inline bool checkTS(const uint8_t *packet, size_t bytes){ ...@@ -24,36 +24,23 @@ static inline bool checkTS(const uint8_t *packet, size_t bytes){
return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE;
} }
class RtpReceiverImp : public RtpReceiver { class RtpReceiverImp : public RtpTrackImp {
public: public:
using Ptr = std::shared_ptr<RtpReceiverImp>; using Ptr = std::shared_ptr<RtpReceiverImp>;
RtpReceiverImp(int sample_rate, function<void(RtpPacket::Ptr rtp)> cb, function<void(const RtpPacket::Ptr &rtp)> cb_before = nullptr){ RtpReceiverImp(int sample_rate, RtpTrackImp::OnSorted cb, RtpTrackImp::BeforeSorted cb_before = nullptr){
_sample_rate = sample_rate; _sample_rate = sample_rate;
_on_sort = std::move(cb); setOnSorted(std::move(cb));
_on_before_sort = std::move(cb_before); setBeforeSorted(std::move(cb_before));
} }
~RtpReceiverImp() override = default; ~RtpReceiverImp() override = default;
bool inputRtp(TrackType type, uint8_t *ptr, size_t len){ bool inputRtp(TrackType type, uint8_t *ptr, size_t len){
return handleOneRtp((int) type, type, _sample_rate, ptr, len); return RtpTrack::inputRtp(type, _sample_rate, ptr, len);
}
protected:
void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override {
_on_sort(std::move(rtp));
}
void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override {
if (_on_before_sort) {
_on_before_sort(rtp);
}
} }
private: private:
int _sample_rate; int _sample_rate;
function<void(RtpPacket::Ptr rtp)> _on_sort;
function<void(const RtpPacket::Ptr &rtp)> _on_before_sort;
}; };
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
......
...@@ -27,7 +27,7 @@ class RtcpHelper : public RtcpContext, public std::enable_shared_from_this<RtcpH ...@@ -27,7 +27,7 @@ class RtcpHelper : public RtcpContext, public std::enable_shared_from_this<RtcpH
public: public:
using Ptr = std::shared_ptr<RtcpHelper>; using Ptr = std::shared_ptr<RtcpHelper>;
RtcpHelper(Socket::Ptr rtcp_sock, uint32_t sample_rate) : RtcpContext(sample_rate, true){ RtcpHelper(Socket::Ptr rtcp_sock, uint32_t sample_rate) : RtcpContext(true){
_rtcp_sock = std::move(rtcp_sock); _rtcp_sock = std::move(rtcp_sock);
_sample_rate = sample_rate; _sample_rate = sample_rate;
} }
...@@ -35,7 +35,7 @@ public: ...@@ -35,7 +35,7 @@ public:
void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){ void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){
//统计rtp接受情况,用于发送rr包 //统计rtp接受情况,用于发送rr包
auto header = (RtpHeader *) buf->data(); auto header = (RtpHeader *) buf->data();
onRtp(ntohs(header->seq), ntohl(header->stamp) * uint64_t(1000) / _sample_rate, buf->size()); onRtp(ntohs(header->seq), ntohl(header->stamp), buf->size());
sendRtcp(ntohl(header->ssrc), addr, addr_len); sendRtcp(ntohl(header->ssrc), addr, addr_len);
} }
......
...@@ -103,7 +103,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { ...@@ -103,7 +103,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
} }
try { try {
_process->inputRtp(false, getSock(), data, len, &_addr); _process->inputRtp(false, getSock(), data, len, &_addr);
} catch (RtpReceiver::BadRtpException &ex) { } catch (RtpTrack::BadRtpException &ex) {
if (!_is_udp) { if (!_is_udp) {
WarnL << ex.what() << ",开始搜索ssrc以便恢复上下文"; WarnL << ex.what() << ",开始搜索ssrc以便恢复上下文";
_search_rtp = true; _search_rtp = true;
......
...@@ -15,19 +15,23 @@ ...@@ -15,19 +15,23 @@
namespace mediakit { namespace mediakit {
RtpReceiver::RtpReceiver() { RtpTrack::RtpTrack() {
int index = 0; setOnSort([this](uint16_t seq, RtpPacket::Ptr &packet) {
for (auto &sortor : _rtp_sortor) { onRtpSorted(std::move(packet));
sortor.setOnSort([this, index](uint16_t seq, RtpPacket::Ptr &packet) { });
onRtpSorted(std::move(packet), index); }
});
++index; uint32_t RtpTrack::getSSRC() const {
} return _ssrc;
} }
RtpReceiver::~RtpReceiver() {} void RtpTrack::clear() {
_ssrc = 0;
_ssrc_alive.resetTime();
PacketSortor<RtpPacket::Ptr>::clear();
}
bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len) { bool RtpTrack::inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len) {
if (len < RtpPacket::kRtpHeaderSize) { if (len < RtpPacket::kRtpHeaderSize) {
WarnL << "rtp包太小:" << len; WarnL << "rtp包太小:" << len;
return false; return false;
...@@ -52,23 +56,23 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 ...@@ -52,23 +56,23 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8
//比对缓存ssrc //比对缓存ssrc
auto ssrc = ntohl(header->ssrc); auto ssrc = ntohl(header->ssrc);
if (!_ssrc[index]) { if (!_ssrc) {
//记录并锁定ssrc //记录并锁定ssrc
_ssrc[index] = ssrc; _ssrc = ssrc;
_ssrc_alive[index].resetTime(); _ssrc_alive.resetTime();
} else if (_ssrc[index] == ssrc) { } else if (_ssrc == ssrc) {
//ssrc匹配正确,刷新计时器 //ssrc匹配正确,刷新计时器
_ssrc_alive[index].resetTime(); _ssrc_alive.resetTime();
} else { } else {
//ssrc错误 //ssrc错误
if (_ssrc_alive[index].elapsedTime() < 3 * 1000) { if (_ssrc_alive.elapsedTime() < 3 * 1000) {
//接受正确ssrc的rtp在10秒内,那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp //接受正确ssrc的rtp在10秒内,那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp
WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc[index]; WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc;
return false; return false;
} }
InfoL << "rtp流ssrc切换:" << _ssrc[index] << " -> " << ssrc; InfoL << "rtp流ssrc切换:" << _ssrc << " -> " << ssrc;
_ssrc[index] = ssrc; _ssrc = ssrc;
_ssrc_alive[index].resetTime(); _ssrc_alive.resetTime();
} }
auto rtp = RtpPacket::create(); auto rtp = RtpPacket::create();
...@@ -87,29 +91,32 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 ...@@ -87,29 +91,32 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8
//拷贝rtp //拷贝rtp
memcpy(&data[4], ptr, len); memcpy(&data[4], ptr, len);
onBeforeRtpSorted(rtp, index); onBeforeRtpSorted(rtp);
auto seq = rtp->getSeq(); auto seq = rtp->getSeq();
_rtp_sortor[index].sortPacket(seq, std::move(rtp)); sortPacket(seq, std::move(rtp));
return true; return true;
} }
void RtpReceiver::clear() { ////////////////////////////////////////////////////////////////////////////////////
CLEAR_ARR(_ssrc);
for (auto &sortor : _rtp_sortor) { void RtpTrackImp::setOnSorted(OnSorted cb) {
sortor.clear(); _on_sorted = std::move(cb);
}
} }
size_t RtpReceiver::getJitterSize(int index) const{ void RtpTrackImp::setBeforeSorted(BeforeSorted cb) {
return _rtp_sortor[index].getJitterSize(); _on_before_sorted = std::move(cb);
} }
size_t RtpReceiver::getCycleCount(int index) const{ void RtpTrackImp::onRtpSorted(RtpPacket::Ptr rtp) {
return _rtp_sortor[index].getCycleCount(); if (_on_sorted) {
_on_sorted(std::move(rtp));
}
} }
uint32_t RtpReceiver::getSSRC(int index) const{ void RtpTrackImp::onBeforeRtpSorted(const RtpPacket::Ptr &rtp) {
return _ssrc[index]; if (_on_before_sorted) {
_on_before_sorted(rtp);
}
} }
}//namespace mediakit }//namespace mediakit
...@@ -160,11 +160,8 @@ private: ...@@ -160,11 +160,8 @@ private:
function<void(SEQ seq, T &packet)> _cb; function<void(SEQ seq, T &packet)> _cb;
}; };
class RtpReceiver { class RtpTrack : private PacketSortor<RtpPacket::Ptr>{
public: public:
RtpReceiver();
virtual ~RtpReceiver();
class BadRtpException : public invalid_argument { class BadRtpException : public invalid_argument {
public: public:
template<typename Type> template<typename Type>
...@@ -172,7 +169,60 @@ public: ...@@ -172,7 +169,60 @@ public:
~BadRtpException() = default; ~BadRtpException() = default;
}; };
RtpTrack();
virtual ~RtpTrack() = default;
void clear();
uint32_t getSSRC() const;
bool inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len);
protected: protected:
virtual void onRtpSorted(RtpPacket::Ptr rtp) {}
virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) {}
private:
uint32_t _ssrc = 0;
Ticker _ssrc_alive;
};
class RtpTrackImp : public RtpTrack{
public:
using OnSorted = function<void(RtpPacket::Ptr)>;
using BeforeSorted = function<void(const RtpPacket::Ptr &)>;
RtpTrackImp() = default;
~RtpTrackImp() override = default;
void setOnSorted(OnSorted cb);
void setBeforeSorted(BeforeSorted cb);
protected:
void onRtpSorted(RtpPacket::Ptr rtp) override;
void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) override;
private:
OnSorted _on_sorted;
BeforeSorted _on_before_sorted;
};
template<int kCount = 2>
class RtpMultiReceiver {
public:
RtpMultiReceiver() {
int index = 0;
for (auto &track : _track) {
track.setOnSorted([this, index](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp), index);
});
track.setBeforeSorted([this, index](const RtpPacket::Ptr &rtp) {
onBeforeRtpSorted(rtp, index);
});
++index;
}
}
virtual ~RtpMultiReceiver() = default;
/** /**
* 输入数据指针生成并排序rtp包 * 输入数据指针生成并排序rtp包
* @param index track下标索引 * @param index track下标索引
...@@ -182,34 +232,49 @@ protected: ...@@ -182,34 +232,49 @@ protected:
* @param len rtp数据指针长度 * @param len rtp数据指针长度
* @return 解析成功返回true * @return 解析成功返回true
*/ */
bool handleOneRtp(int index, TrackType type, int samplerate, uint8_t *ptr, size_t len); bool handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len){
return _track[index].inputRtp(type, sample_rate, ptr, len);
}
void clear() {
for (auto &track : _track) {
track.clear();
}
}
size_t getJitterSize(int index) const {
return _track[index].getJitterSize();
}
size_t getCycleCount(int index) const {
return _track[index].getCycleCount();
}
uint32_t getSSRC(int index) const {
return _track[index].getSSRC();
}
protected:
/** /**
* rtp数据包排序后输出 * rtp数据包排序后输出
* @param rtp rtp数据包 * @param rtp rtp数据包
* @param track_index track索引 * @param track_index track索引
*/ */
virtual void onRtpSorted(RtpPacket::Ptr rtp, int track_index) {} virtual void onRtpSorted(RtpPacket::Ptr rtp, int index) {}
/** /**
* 解析出rtp但还未排序 * 解析出rtp但还未排序
* @param rtp rtp数据包 * @param rtp rtp数据包
* @param track_index track索引 * @param track_index track索引
*/ */
virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) {} virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int index) {}
void clear();
size_t getJitterSize(int track_index) const;
size_t getCycleCount(int track_index) const;
uint32_t getSSRC(int track_index) const;
private: private:
uint32_t _ssrc[2] = {0, 0}; RtpTrackImp _track[kCount];
Ticker _ssrc_alive[2];
//rtp排序缓存,根据seq排序
PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
}; };
using RtpReceiver = RtpMultiReceiver<2>;
}//namespace mediakit }//namespace mediakit
......
...@@ -205,7 +205,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { ...@@ -205,7 +205,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
} }
_rtcp_context.clear(); _rtcp_context.clear();
for (auto &track : _sdp_track) { for (auto &track : _sdp_track) {
_rtcp_context.emplace_back(std::make_shared<RtcpContext>(track->_samplerate, true)); _rtcp_context.emplace_back(std::make_shared<RtcpContext>(true));
} }
sendSetup(0); sendSetup(0);
} }
...@@ -591,7 +591,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC ...@@ -591,7 +591,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){ void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){
auto &rtcp_ctx = _rtcp_context[track_idx]; auto &rtcp_ctx = _rtcp_context[track_idx];
rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
auto &ticker = _rtcp_send_ticker[track_idx]; auto &ticker = _rtcp_send_ticker[track_idx];
if (ticker.elapsedTime() < 3 * 1000) { if (ticker.elapsedTime() < 3 * 1000) {
......
...@@ -179,7 +179,7 @@ void RtspPusher::sendAnnounce() { ...@@ -179,7 +179,7 @@ void RtspPusher::sendAnnounce() {
} }
_rtcp_context.clear(); _rtcp_context.clear();
for (auto &track : _track_vec) { for (auto &track : _track_vec) {
_rtcp_context.emplace_back(std::make_shared<RtcpContext>(track->_samplerate, false)); _rtcp_context.emplace_back(std::make_shared<RtcpContext>(false));
} }
_on_res_func = std::bind(&RtspPusher::handleResAnnounce, this, placeholders::_1); _on_res_func = std::bind(&RtspPusher::handleResAnnounce, this, placeholders::_1);
sendRtspRequest("ANNOUNCE", _url, {}, src->getSdp()); sendRtspRequest("ANNOUNCE", _url, {}, src->getSdp());
...@@ -360,7 +360,7 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){ ...@@ -360,7 +360,7 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){
int track_index = getTrackIndexByTrackType(rtp->type); int track_index = getTrackIndexByTrackType(rtp->type);
auto &ticker = _rtcp_send_ticker[track_index]; auto &ticker = _rtcp_send_ticker[track_index];
auto &rtcp_ctx = _rtcp_context[track_index]; auto &rtcp_ctx = _rtcp_context[track_index];
rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
//send rtcp every 5 second //send rtcp every 5 second
if (ticker.elapsedTime() > 5 * 1000) { if (ticker.elapsedTime() > 5 * 1000) {
......
...@@ -252,7 +252,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { ...@@ -252,7 +252,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
} }
_rtcp_context.clear(); _rtcp_context.clear();
for (auto &track : _sdp_track) { for (auto &track : _sdp_track) {
_rtcp_context.emplace_back(std::make_shared<RtcpContext>(track->_samplerate, true)); _rtcp_context.emplace_back(std::make_shared<RtcpContext>(true));
} }
_push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid); _push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
...@@ -413,7 +413,7 @@ void RtspSession::onAuthSuccess() { ...@@ -413,7 +413,7 @@ void RtspSession::onAuthSuccess() {
} }
strongSelf->_rtcp_context.clear(); strongSelf->_rtcp_context.clear();
for (auto &track : strongSelf->_sdp_track) { for (auto &track : strongSelf->_sdp_track) {
strongSelf->_rtcp_context.emplace_back(std::make_shared<RtcpContext>(track->_samplerate, false)); strongSelf->_rtcp_context.emplace_back(std::make_shared<RtcpContext>(false));
} }
strongSelf->_sessionid = makeRandStr(12); strongSelf->_sessionid = makeRandStr(12);
strongSelf->_play_src = rtsp_src; strongSelf->_play_src = rtsp_src;
...@@ -1126,7 +1126,7 @@ void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){ ...@@ -1126,7 +1126,7 @@ void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){
void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){ void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){
int track_index = getTrackIndexByTrackType(rtp->type); int track_index = getTrackIndexByTrackType(rtp->type);
auto &rtcp_ctx = _rtcp_context[track_index]; auto &rtcp_ctx = _rtcp_context[track_index];
rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); rtcp_ctx->onRtp(rtp->getSeq(), ntohl(rtp->getHeader()->stamp), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
auto &ticker = _rtcp_send_tickers[track_index]; auto &ticker = _rtcp_send_tickers[track_index];
//send rtcp every 5 second //send rtcp every 5 second
......
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "Nack.h"
static constexpr uint32_t kMaxNackMS = 10 * 1000;
void NackList::push_back(RtpPacket::Ptr rtp) {
auto seq = rtp->getSeq();
_nack_cache_seq.emplace_back(seq);
_nack_cache_pkt.emplace(seq, std::move(rtp));
while (get_cache_ms() > kMaxNackMS) {
//需要清除部分nack缓存
pop_front();
}
}
void NackList::for_each_nack(const FCI_NACK &nack, const function<void(const RtpPacket::Ptr &rtp)> &func) {
auto seq = nack.getPid();
for (auto bit : nack.getBitArray()) {
if (bit) {
//丢包
RtpPacket::Ptr *ptr = get_rtp(seq);
if (ptr) {
func(*ptr);
}
}
++seq;
}
}
void NackList::pop_front() {
if (_nack_cache_seq.empty()) {
return;
}
_nack_cache_pkt.erase(_nack_cache_seq.front());
_nack_cache_seq.pop_front();
}
RtpPacket::Ptr *NackList::get_rtp(uint16_t seq) {
auto it = _nack_cache_pkt.find(seq);
if (it == _nack_cache_pkt.end()) {
return nullptr;
}
return &it->second;
}
uint32_t NackList::get_cache_ms() {
if (_nack_cache_seq.size() < 2) {
return 0;
}
uint32_t back = _nack_cache_pkt[_nack_cache_seq.back()]->getStampMS();
uint32_t front = _nack_cache_pkt[_nack_cache_seq.front()]->getStampMS();
if (back > front) {
return back - front;
}
//很有可能回环了
return back + (UINT32_MAX - front);
}
////////////////////////////////////////////////////////////////////////////////////////////////
void NackContext::received(uint16_t seq) {
if (!_last_max_seq && _seq.empty()) {
_last_max_seq = seq - 1;
}
_seq.emplace(seq);
auto max_seq = *_seq.rbegin();
auto min_seq = *_seq.begin();
auto diff = max_seq - min_seq;
if (!diff) {
return;
}
if (diff > UINT32_MAX / 2) {
//回环
_seq.clear();
_last_max_seq = min_seq;
return;
}
if (_seq.size() == diff + 1 && _last_max_seq + 1 == min_seq) {
//都是连续的seq,未丢包
_seq.clear();
_last_max_seq = max_seq;
} else {
//seq不连续,有丢包
if (min_seq == _last_max_seq + 1) {
//前面部分seq是连续的,未丢包,移除之
eraseFrontSeq();
}
//有丢包,丢包从_last_max_seq开始
if (max_seq - _last_max_seq > FCI_NACK::kBitSize) {
vector<bool> vec;
vec.resize(FCI_NACK::kBitSize);
for (auto i = 0; i < FCI_NACK::kBitSize; ++i) {
vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end();
}
doNack(FCI_NACK(_last_max_seq + 1, vec));
_last_max_seq += FCI_NACK::kBitSize + 1;
if (_last_max_seq >= max_seq) {
_seq.clear();
} else {
auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq);
_seq.erase(_seq.begin(), it);
}
}
}
}
void NackContext::setOnNack(onNack cb) {
_cb = std::move(cb);
}
void NackContext::doNack(const FCI_NACK &nack) {
if (_cb) {
_cb(nack);
}
}
void NackContext::eraseFrontSeq() {
//前面部分seq是连续的,未丢包,移除之
for (auto it = _seq.begin(); it != _seq.end();) {
if (*it != _last_max_seq + 1) {
//seq不连续,丢包了
break;
}
_last_max_seq = *it;
it = _seq.erase(it);
}
}
\ No newline at end of file
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_NACK_H
#define ZLMEDIAKIT_NACK_H
#include "Rtsp/Rtsp.h"
#include "Rtcp/RtcpFCI.h"
using namespace mediakit;
class NackList {
public:
NackList() = default;
~NackList() = default;
void push_back(RtpPacket::Ptr rtp);
void for_each_nack(const FCI_NACK &nack, const function<void(const RtpPacket::Ptr &rtp)> &cb);
private:
void pop_front();
uint32_t get_cache_ms();
RtpPacket::Ptr *get_rtp(uint16_t seq);
private:
deque<uint16_t> _nack_cache_seq;
unordered_map<uint16_t, RtpPacket::Ptr> _nack_cache_pkt;
};
class NackContext {
public:
using onNack = function<void(const FCI_NACK &nack)>;
NackContext() = default;
~NackContext() = default;
void received(uint16_t seq);
void setOnNack(onNack cb);
private:
void eraseFrontSeq();
void doNack(const FCI_NACK &nack);
private:
onNack _cb;
set<uint16_t> _seq;
uint16_t _last_max_seq = 0;
};
#endif //ZLMEDIAKIT_NACK_H
...@@ -561,4 +561,90 @@ void RtpExt::setType(RtpExtType type) { ...@@ -561,4 +561,90 @@ void RtpExt::setType(RtpExtType type) {
RtpExtType RtpExt::getType() const { RtpExtType RtpExt::getType() const {
return _type; return _type;
} }
\ No newline at end of file
RtpExtContext::RtpExtContext(const RtcMedia &m){
for (auto &ext : m.extmap) {
auto ext_type = RtpExt::getExtType(ext.ext);
_rtp_ext_id_to_type.emplace(ext.id, ext_type);
_rtp_ext_type_to_id.emplace(ext_type, ext.id);
}
}
string RtpExtContext::getRid(uint32_t ssrc) const{
auto it = _ssrc_to_rid.find(ssrc);
if (it == _ssrc_to_rid.end()) {
return "";
}
return it->second;
}
void RtpExtContext::setRid(uint32_t ssrc, const string &rid) {
_ssrc_to_rid[ssrc] = rid;
}
void RtpExtContext::changeRtpExtId(const RtpHeader *header, bool is_recv, string *rid_ptr) {
string rid, repaired_rid;
auto ext_map = RtpExt::getExtValue(header);
for (auto &pr : ext_map) {
if (is_recv) {
auto it = _rtp_ext_id_to_type.find(pr.first);
if (it == _rtp_ext_id_to_type.end()) {
WarnL << "接收rtp时,忽略不识别的rtp ext, id=" << (int) pr.first;
pr.second.clearExt();
continue;
}
pr.second.setType(it->second);
//重新赋值ext id为 ext type,作为后面处理ext的统一中间类型
pr.second.setExtId((uint8_t) it->second);
switch (it->second) {
case RtpExtType::sdes_rtp_stream_id : rid = pr.second.getRtpStreamId(); break;
case RtpExtType::sdes_repaired_rtp_stream_id : repaired_rid = pr.second.getRepairedRtpStreamId(); break;
default : break;
}
} else {
pr.second.setType((RtpExtType) pr.first);
auto it = _rtp_ext_type_to_id.find((RtpExtType) pr.first);
if (it == _rtp_ext_type_to_id.end()) {
WarnL << "发送rtp时, 忽略不被客户端支持rtp ext:" << pr.second.dumpString();
pr.second.clearExt();
continue;
}
//重新赋值ext id为客户端sdp声明的类型
pr.second.setExtId(it->second);
}
}
if (!is_recv) {
return;
}
if (rid.empty()) {
rid = repaired_rid;
}
auto ssrc = ntohl(header->ssrc);
if (rid.empty()) {
//获取rid
rid = _ssrc_to_rid[ssrc];
} else {
//设置rid
auto it = _ssrc_to_rid.find(ssrc);
if (it == _ssrc_to_rid.end() || it->second != rid) {
_ssrc_to_rid[ssrc] = rid;
onGetRtp(header->pt, ssrc, rid);
}
}
if (rid_ptr) {
*rid_ptr = rid;
}
}
void RtpExtContext::setOnGetRtp(OnGetRtp cb) {
_cb = std::move(cb);
}
void RtpExtContext::onGetRtp(uint8_t pt, uint32_t ssrc, const string &rid){
if (_cb) {
_cb(pt, ssrc, rid);
}
}
...@@ -108,5 +108,31 @@ private: ...@@ -108,5 +108,31 @@ private:
RtpExtType _type = RtpExtType::padding; RtpExtType _type = RtpExtType::padding;
}; };
class RtcMedia;
class RtpExtContext {
public:
using Ptr = std::shared_ptr<RtpExtContext>;
using OnGetRtp = function<void(uint8_t pt, uint32_t ssrc, const string &rid)>;
RtpExtContext(const RtcMedia &media);
~RtpExtContext() = default;
void setOnGetRtp(OnGetRtp cb);
string getRid(uint32_t ssrc) const;
void setRid(uint32_t ssrc, const string &rid);
void changeRtpExtId(const RtpHeader *header, bool is_recv, string *rid_ptr = nullptr);
private:
void onGetRtp(uint8_t pt, uint32_t ssrc, const string &rid);
private:
OnGetRtp _cb;
//发送rtp时需要修改rtp ext id
map<RtpExtType, uint8_t> _rtp_ext_type_to_id;
//接收rtp时需要修改rtp ext id
unordered_map<uint8_t, RtpExtType> _rtp_ext_id_to_type;
//ssrc --> rid
unordered_map<uint32_t/*simulcast ssrc*/, string/*rid*/> _ssrc_to_rid;
};
#endif //ZLMEDIAKIT_RTPEXT_H #endif //ZLMEDIAKIT_RTPEXT_H
...@@ -1636,7 +1636,8 @@ RETRY: ...@@ -1636,7 +1636,8 @@ RETRY:
if (configure.direction != RtpDirection::recvonly && if (configure.direction != RtpDirection::recvonly &&
configure.direction != RtpDirection::sendrecv) { configure.direction != RtpDirection::sendrecv) {
//我们不支持接收 //我们不支持接收
continue; answer_media.direction = RtpDirection::inactive;
break;
} }
answer_media.direction = RtpDirection::recvonly; answer_media.direction = RtpDirection::recvonly;
break; break;
...@@ -1645,7 +1646,8 @@ RETRY: ...@@ -1645,7 +1646,8 @@ RETRY:
if (configure.direction != RtpDirection::sendonly && if (configure.direction != RtpDirection::sendonly &&
configure.direction != RtpDirection::sendrecv) { configure.direction != RtpDirection::sendrecv) {
//我们不支持发送 //我们不支持发送
continue; answer_media.direction = RtpDirection::inactive;
break;
} }
answer_media.direction = RtpDirection::sendonly; answer_media.direction = RtpDirection::sendonly;
break; break;
......
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
#include "Rtsp/RtspMediaSourceImp.h" #include "Rtsp/RtspMediaSourceImp.h"
#include "Rtcp/RtcpContext.h" #include "Rtcp/RtcpContext.h"
#include "Rtcp/RtcpFCI.h" #include "Rtcp/RtcpFCI.h"
#include "Nack.h"
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
...@@ -125,151 +127,26 @@ private: ...@@ -125,151 +127,26 @@ private:
RtcSession::Ptr _answer_sdp; RtcSession::Ptr _answer_sdp;
}; };
class RtpReceiverImp; class RtpChannel;
class MediaTrack {
class NackList {
public:
void push_back(RtpPacket::Ptr rtp) {
auto seq = rtp->getSeq();
_nack_cache_seq.emplace_back(seq);
_nack_cache_pkt.emplace(seq, std::move(rtp));
while (get_cache_ms() > kMaxNackMS) {
//需要清除部分nack缓存
pop_front();
}
}
template<typename FUNC>
void for_each_nack(const FCI_NACK &nack, const FUNC &func) {
auto seq = nack.getPid();
for (auto bit : nack.getBitArray()) {
if (bit) {
//丢包
RtpPacket::Ptr *ptr = get_rtp(seq);
if (ptr) {
func(*ptr);
}
}
++seq;
}
}
private:
void pop_front() {
if (_nack_cache_seq.empty()) {
return;
}
_nack_cache_pkt.erase(_nack_cache_seq.front());
_nack_cache_seq.pop_front();
}
RtpPacket::Ptr *get_rtp(uint16_t seq) {
auto it = _nack_cache_pkt.find(seq);
if (it == _nack_cache_pkt.end()) {
return nullptr;
}
return &it->second;
}
uint32_t get_cache_ms() {
if (_nack_cache_seq.size() < 2) {
return 0;
}
uint32_t back = _nack_cache_pkt[_nack_cache_seq.back()]->getStampMS();
uint32_t front = _nack_cache_pkt[_nack_cache_seq.front()]->getStampMS();
if (back > front) {
return back - front;
}
//很有可能回环了
return back + (UINT32_MAX - front);
}
private:
static constexpr uint32_t kMaxNackMS = 10 * 1000;
deque<uint16_t> _nack_cache_seq;
unordered_map<uint16_t, RtpPacket::Ptr > _nack_cache_pkt;
};
class NackContext {
public: public:
using onNack = function<void(const FCI_NACK &nack)>; using Ptr = std::shared_ptr<MediaTrack>;
const RtcCodecPlan *plan_rtp;
void received(uint16_t seq) { const RtcCodecPlan *plan_rtx;
if (!_last_max_seq && _seq.empty()) { uint32_t offer_ssrc_rtp = 0;
_last_max_seq = seq - 1; uint32_t offer_ssrc_rtx = 0;
} uint32_t answer_ssrc_rtp = 0;
_seq.emplace(seq); uint32_t answer_ssrc_rtx = 0;
auto max_seq = *_seq.rbegin(); const RtcMedia *media;
auto min_seq = *_seq.begin(); RtpExtContext::Ptr rtp_ext_ctx;
auto diff = max_seq - min_seq;
if (!diff) { //for send rtp
return; NackList nack_list;
} RtcpContext::Ptr rtcp_context_send;
if (diff > UINT32_MAX / 2) { //for recv rtp
//回环 unordered_map<string/*rid*/, std::shared_ptr<RtpChannel> > rtp_channel;
_seq.clear(); std::shared_ptr<RtpChannel> getRtpChannel(uint32_t ssrc) const;
_last_max_seq = min_seq;
return;
}
if (_seq.size() == diff + 1 && _last_max_seq + 1 == min_seq) {
//都是连续的seq,未丢包
_seq.clear();
_last_max_seq = max_seq;
} else {
//seq不连续,有丢包
if (min_seq == _last_max_seq + 1) {
//前面部分seq是连续的,未丢包,移除之
eraseFrontSeq();
}
//有丢包,丢包从_last_max_seq开始
if (max_seq - _last_max_seq > FCI_NACK::kBitSize) {
vector<bool> vec;
vec.resize(FCI_NACK::kBitSize);
for (auto i = 0; i < FCI_NACK::kBitSize; ++i) {
vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end();
}
doNack(FCI_NACK(_last_max_seq + 1, vec));
_last_max_seq += FCI_NACK::kBitSize + 1;
if (_last_max_seq >= max_seq) {
_seq.clear();
} else {
auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq);
_seq.erase(_seq.begin(), it);
}
}
}
}
void setOnNack(onNack cb) {
_cb = std::move(cb);
}
private:
void doNack(const FCI_NACK &nack) {
if (_cb) {
_cb(nack);
}
}
void eraseFrontSeq(){
//前面部分seq是连续的,未丢包,移除之
for (auto it = _seq.begin(); it != _seq.end();) {
if (*it != _last_max_seq + 1) {
//seq不连续,丢包了
break;
}
_last_max_seq = *it;
it = _seq.erase(it);
}
}
private:
onNack _cb;
set<uint16_t> _seq;
uint16_t _last_max_seq = 0;
}; };
class WebRtcTransportImp : public WebRtcTransport, public MediaSourceEvent, public SockInfo, public std::enable_shared_from_this<WebRtcTransportImp>{ class WebRtcTransportImp : public WebRtcTransport, public MediaSourceEvent, public SockInfo, public std::enable_shared_from_this<WebRtcTransportImp>{
...@@ -298,8 +175,6 @@ protected: ...@@ -298,8 +175,6 @@ protected:
void onRtcConfigure(RtcConfigure &configure) const override; void onRtcConfigure(RtcConfigure &configure) const override;
void onRtp(const char *buf, size_t len) override; void onRtp(const char *buf, size_t len) override;
void onRtp_l(const char *buf, size_t len, bool rtx);
void onRtcp(const char *buf, size_t len) override; void onRtcp(const char *buf, size_t len) override;
void onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) override; void onBeforeEncryptRtp(const char *buf, size_t &len, void *ctx) override;
void onBeforeEncryptRtcp(const char *buf, size_t &len, void *ctx) override {}; void onBeforeEncryptRtcp(const char *buf, size_t &len, void *ctx) override {};
...@@ -339,28 +214,9 @@ private: ...@@ -339,28 +214,9 @@ private:
bool canSendRtp() const; bool canSendRtp() const;
bool canRecvRtp() const; bool canRecvRtp() const;
class RtpPayloadInfo { void onSortedRtp(MediaTrack &track, const string &rid, RtpPacket::Ptr rtp);
public: void onSendNack(MediaTrack &track, const FCI_NACK &nack, uint32_t ssrc);
using Ptr = std::shared_ptr<RtpPayloadInfo>; void createRtpChannel(const string &rid, uint32_t ssrc, const MediaTrack::Ptr &track);
const RtcCodecPlan *plan_rtp;
const RtcCodecPlan *plan_rtx;
uint32_t offer_ssrc_rtp = 0;
uint32_t offer_ssrc_rtx = 0;
uint32_t answer_ssrc_rtp = 0;
uint32_t answer_ssrc_rtx = 0;
const RtcMedia *media;
NackList nack_list;
RtcpContext::Ptr rtcp_context_send;
unordered_map<string/*rid*/, std::pair<uint32_t/*rtp ssrc*/, uint32_t/*rtx ssrc*/> > rid_ssrc;
unordered_map<uint32_t/*rtx ssrc*/, uint32_t/*rtp ssrc*/> rtx_ssrc_to_rtp_ssrc;
unordered_map<uint32_t/*simulcast ssrc*/, NackContext> nack_ctx;
unordered_map<uint32_t/*simulcast ssrc*/, RtcpContext::Ptr> rtcp_context_recv;
unordered_map<uint32_t/*simulcast ssrc*/, std::shared_ptr<RtpReceiverImp> > receiver;
};
void onSortedRtp(RtpPayloadInfo &info, const string &rid, RtpPacket::Ptr rtp);
void onSendNack(RtpPayloadInfo &info, const FCI_NACK &nack, uint32_t ssrc);
void changeRtpExtId(RtpPayloadInfo &info, const RtpHeader *header, bool is_recv, bool is_rtx = false, string *rid_ptr = nullptr) const;
private: private:
uint16_t _rtx_seq[2] = {0, 0}; uint16_t _rtx_seq[2] = {0, 0};
...@@ -386,13 +242,9 @@ private: ...@@ -386,13 +242,9 @@ private:
//播放rtsp源的reader对象 //播放rtsp源的reader对象
RtspMediaSource::RingType::RingReader::Ptr _reader; RtspMediaSource::RingType::RingReader::Ptr _reader;
//根据发送rtp的track类型获取相关信息 //根据发送rtp的track类型获取相关信息
RtpPayloadInfo::Ptr _send_rtp_info[2]; MediaTrack::Ptr _type_to_track[2];
//根据接收rtp的pt获取相关信息 //根据接收rtp的pt获取相关信息
unordered_map<uint8_t/*pt*/, std::pair<bool/*is rtx*/,RtpPayloadInfo::Ptr> > _rtp_info_pt; unordered_map<uint8_t/*pt*/, std::pair<bool/*is rtx*/,MediaTrack::Ptr> > _pt_to_track;
//根据rtcp的ssrc获取相关信息 //根据rtcp的ssrc获取相关信息,收发rtp和rtx的ssrc都会记录
unordered_map<uint32_t/*ssrc*/, std::pair<bool/*is rtx*/,RtpPayloadInfo::Ptr> > _rtp_info_ssrc; unordered_map<uint32_t/*ssrc*/, MediaTrack::Ptr> _ssrc_to_track;
//发送rtp时需要修改rtp ext id
map<RtpExtType, uint8_t> _rtp_ext_type_to_id;
//接收rtp时需要修改rtp ext id
unordered_map<uint8_t, RtpExtType> _rtp_ext_id_to_type;
}; };
\ No newline at end of file
...@@ -7399,18 +7399,11 @@ var ZLMRTCClient = (function (exports) { ...@@ -7399,18 +7399,11 @@ var ZLMRTCClient = (function (exports) {
}; };
if (this.options.simulcast && stream.getVideoTracks().length > 0) { if (this.options.simulcast && stream.getVideoTracks().length > 0) {
VideoTransceiverInit.sendEncodings = [{ VideoTransceiverInit.sendEncodings = [
rid: 'q', { rid: "h", active: true, maxBitrate: 1000000 },
active: true, { rid: "m", active: true, maxBitrate: 500000, scaleResolutionDownBy: 2 },
scaleResolutionDownBy: 4.0 { rid: "l", active: true, maxBitrate: 200000, scaleResolutionDownBy: 4 }
}, { ];
rid: 'h',
active: true,
scaleResolutionDownBy: 2.0
}, {
rid: 'f',
active: true
}];
} }
if (stream.getAudioTracks().length > 0) { if (stream.getAudioTracks().length > 0) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论