Commit 070bf19c by xiongziliang

重写rtp框架

parent 5175c52a
...@@ -195,8 +195,6 @@ sslport=19350 ...@@ -195,8 +195,6 @@ sslport=19350
#音频mtu大小,该参数限制rtp最大字节数,推荐不要超过1400 #音频mtu大小,该参数限制rtp最大字节数,推荐不要超过1400
#加大该值会明显增加直播延时 #加大该值会明显增加直播延时
audioMtuSize=600 audioMtuSize=600
#rtp时间戳回环时间,单位毫秒
cycleMS=46800000
#视频mtu大小,该参数限制rtp最大字节数,推荐不要超过1400 #视频mtu大小,该参数限制rtp最大字节数,推荐不要超过1400
videoMtuSize=1400 videoMtuSize=1400
......
...@@ -194,15 +194,12 @@ const string kAudioMtuSize = RTP_FIELD"audioMtuSize"; ...@@ -194,15 +194,12 @@ const string kAudioMtuSize = RTP_FIELD"audioMtuSize";
const string kMaxRtpCount = RTP_FIELD"maxRtpCount"; const string kMaxRtpCount = RTP_FIELD"maxRtpCount";
//如果RTP序列正确次数累计达到该数字就启动清空排序缓存 //如果RTP序列正确次数累计达到该数字就启动清空排序缓存
const string kClearCount = RTP_FIELD"clearCount"; const string kClearCount = RTP_FIELD"clearCount";
//最大RTP时间为13个小时,每13小时回环一次
const string kCycleMS = RTP_FIELD"cycleMS";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kVideoMtuSize] = 1400; mINI::Instance()[kVideoMtuSize] = 1400;
mINI::Instance()[kAudioMtuSize] = 600; mINI::Instance()[kAudioMtuSize] = 600;
mINI::Instance()[kMaxRtpCount] = 50; mINI::Instance()[kMaxRtpCount] = 50;
mINI::Instance()[kClearCount] = 10; mINI::Instance()[kClearCount] = 10;
mINI::Instance()[kCycleMS] = 13*60*60*1000;
},nullptr); },nullptr);
} //namespace Rtsp } //namespace Rtsp
......
...@@ -251,8 +251,6 @@ extern const string kAudioMtuSize; ...@@ -251,8 +251,6 @@ extern const string kAudioMtuSize;
extern const string kMaxRtpCount; extern const string kMaxRtpCount;
//如果RTP序列正确次数累计达到该数字就启动清空排序缓存 //如果RTP序列正确次数累计达到该数字就启动清空排序缓存
extern const string kClearCount; extern const string kClearCount;
//最大RTP时间为13个小时,每13小时回环一次
extern const string kCycleMS;
} //namespace Rtsp } //namespace Rtsp
////////////组播配置/////////// ////////////组播配置///////////
......
...@@ -25,32 +25,30 @@ AACRtpEncoder::AACRtpEncoder(uint32_t ui32Ssrc, ...@@ -25,32 +25,30 @@ AACRtpEncoder::AACRtpEncoder(uint32_t ui32Ssrc,
} }
void AACRtpEncoder::inputFrame(const Frame::Ptr &frame) { void AACRtpEncoder::inputFrame(const Frame::Ptr &frame) {
GET_CONFIG(uint32_t, cycleMS, Rtp::kCycleMS); auto stamp = frame->dts();
auto uiStamp = frame->dts(); auto data = frame->data() + frame->prefixSize();
auto pcData = frame->data() + frame->prefixSize(); auto len = frame->size() - frame->prefixSize();
auto iLen = frame->size() - frame->prefixSize(); auto ptr = (char *) data;
auto remain_size = len;
uiStamp %= cycleMS; auto max_size = getMaxSize() - 4;
auto *ptr = (char *) pcData; while (remain_size > 0) {
auto iSize = iLen; if (remain_size <= max_size) {
while (iSize > 0) { _section_buf[0] = 0;
if (iSize <= _ui32MtuSize - 20) { _section_buf[1] = 16;
_aucSectionBuf[0] = 0; _section_buf[2] = (len >> 5) & 0xFF;
_aucSectionBuf[1] = 16; _section_buf[3] = ((len & 0x1F) << 3) & 0xFF;
_aucSectionBuf[2] = (iLen >> 5) & 0xFF; memcpy(_section_buf + 4, ptr, remain_size);
_aucSectionBuf[3] = ((iLen & 0x1F) << 3) & 0xFF; makeAACRtp(_section_buf, remain_size + 4, true, stamp);
memcpy(_aucSectionBuf + 4, ptr, iSize);
makeAACRtp(_aucSectionBuf, iSize + 4, true, uiStamp);
break; break;
} }
_aucSectionBuf[0] = 0; _section_buf[0] = 0;
_aucSectionBuf[1] = 16; _section_buf[1] = 16;
_aucSectionBuf[2] = ((iLen) >> 5) & 0xFF; _section_buf[2] = ((len) >> 5) & 0xFF;
_aucSectionBuf[3] = ((iLen & 0x1F) << 3) & 0xFF; _section_buf[3] = ((len & 0x1F) << 3) & 0xFF;
memcpy(_aucSectionBuf + 4, ptr, _ui32MtuSize - 20); memcpy(_section_buf + 4, ptr, max_size);
makeAACRtp(_aucSectionBuf, _ui32MtuSize - 16, false, uiStamp); makeAACRtp(_section_buf, max_size + 4, false, stamp);
ptr += (_ui32MtuSize - 20); ptr += max_size;
iSize -= (_ui32MtuSize - 20); remain_size -= max_size;
} }
} }
...@@ -82,15 +80,16 @@ void AACRtpDecoder::obtainFrame() { ...@@ -82,15 +80,16 @@ void AACRtpDecoder::obtainFrame() {
_frame->_codec_id = CodecAAC; _frame->_codec_id = CodecAAC;
} }
bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) { bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos) {
auto stamp = rtp->getStampMS();
//rtp数据开始部分 //rtp数据开始部分
uint8_t *ptr = (uint8_t *) rtppack->data() + rtppack->offset; auto ptr = rtp->getPayload();
//rtp数据末尾 //rtp数据末尾
uint8_t *end = (uint8_t *) rtppack->data() + rtppack->size(); auto end = ptr + rtp->getPayloadSize();
//首2字节表示Au-Header的个数,单位bit,所以除以16得到Au-Header个数 //首2字节表示Au-Header的个数,单位bit,所以除以16得到Au-Header个数
uint16_t au_header_count = ((ptr[0] << 8) | ptr[1]) >> 4; auto au_header_count = ((ptr[0] << 8) | ptr[1]) >> 4;
//记录au_header起始指针 //记录au_header起始指针
uint8_t *au_header_ptr = ptr + 2; auto au_header_ptr = ptr + 2;
ptr = au_header_ptr + au_header_count * 2; ptr = au_header_ptr + au_header_count * 2;
if (end < ptr) { if (end < ptr) {
...@@ -100,11 +99,11 @@ bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) { ...@@ -100,11 +99,11 @@ bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) {
if (!_last_dts) { if (!_last_dts) {
//记录第一个时间戳 //记录第一个时间戳
_last_dts = rtppack->timeStamp; _last_dts = stamp;
} }
//每个audio unit时间戳增量 //每个audio unit时间戳增量
auto dts_inc = (rtppack->timeStamp - _last_dts) / au_header_count; auto dts_inc = (stamp - _last_dts) / au_header_count;
if (dts_inc < 0 && dts_inc > 100) { if (dts_inc < 0 && dts_inc > 100) {
//时间戳增量异常,忽略 //时间戳增量异常,忽略
dts_inc = 0; dts_inc = 0;
...@@ -129,7 +128,7 @@ bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) { ...@@ -129,7 +128,7 @@ bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) {
} }
} }
//记录上次时间戳 //记录上次时间戳
_last_dts = rtppack->timeStamp; _last_dts = stamp;
return false; return false;
} }
......
...@@ -80,7 +80,7 @@ private: ...@@ -80,7 +80,7 @@ private:
void makeAACRtp(const void *pData, size_t uiLen, bool bMark, uint32_t uiStamp); void makeAACRtp(const void *pData, size_t uiLen, bool bMark, uint32_t uiStamp);
private: private:
unsigned char _aucSectionBuf[1600]; unsigned char _section_buf[1600];
}; };
}//namespace mediakit }//namespace mediakit
......
...@@ -29,14 +29,16 @@ void CommonRtpDecoder::obtainFrame() { ...@@ -29,14 +29,16 @@ void CommonRtpDecoder::obtainFrame() {
} }
bool CommonRtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool){ bool CommonRtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool){
auto payload = rtp->data() + rtp->offset; auto payload = rtp->getPayload();
auto size = rtp->size() - rtp->offset; auto size = rtp->getPayloadSize();
auto stamp = rtp->getStampMS();
auto seq = rtp->getSeq();
if (size <= 0) { if (size <= 0) {
//无实际负载 //无实际负载
return false; return false;
} }
if (_frame->_dts != rtp->timeStamp || _frame->_buffer.size() > _max_frame_size) { if (_frame->_dts != stamp || _frame->_buffer.size() > _max_frame_size) {
//时间戳发生变化或者缓存超过MAX_FRAME_SIZE,则清空上帧数据 //时间戳发生变化或者缓存超过MAX_FRAME_SIZE,则清空上帧数据
if (!_frame->_buffer.empty()) { if (!_frame->_buffer.empty()) {
//有有效帧,则输出 //有有效帧,则输出
...@@ -45,20 +47,20 @@ bool CommonRtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool){ ...@@ -45,20 +47,20 @@ bool CommonRtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool){
//新的一帧数据 //新的一帧数据
obtainFrame(); obtainFrame();
_frame->_dts = rtp->timeStamp; _frame->_dts = stamp;
_drop_flag = false; _drop_flag = false;
} else if (_last_seq != 0 && (uint16_t)(_last_seq + 1) != rtp->sequence) { } else if (_last_seq != 0 && (uint16_t)(_last_seq + 1) != seq) {
//时间戳未发生变化,但是seq却不连续,说明中间rtp丢包了,那么整帧应该废弃 //时间戳未发生变化,但是seq却不连续,说明中间rtp丢包了,那么整帧应该废弃
WarnL << "rtp丢包:" << _last_seq << " -> " << rtp->sequence; WarnL << "rtp丢包:" << _last_seq << " -> " << seq;
_drop_flag = true; _drop_flag = true;
_frame->_buffer.clear(); _frame->_buffer.clear();
} }
if (!_drop_flag) { if (!_drop_flag) {
_frame->_buffer.append(payload, size); _frame->_buffer.append((char *)payload, size);
} }
_last_seq = rtp->sequence; _last_seq = seq;
return false; return false;
} }
...@@ -70,18 +72,17 @@ CommonRtpEncoder::CommonRtpEncoder(CodecId codec, uint32_t ssrc, uint32_t mtu_si ...@@ -70,18 +72,17 @@ CommonRtpEncoder::CommonRtpEncoder(CodecId codec, uint32_t ssrc, uint32_t mtu_si
} }
void CommonRtpEncoder::inputFrame(const Frame::Ptr &frame){ void CommonRtpEncoder::inputFrame(const Frame::Ptr &frame){
GET_CONFIG(uint32_t, cycleMS, Rtp::kCycleMS); auto stamp = frame->dts();
auto stamp = frame->dts() % cycleMS;
auto ptr = frame->data() + frame->prefixSize(); auto ptr = frame->data() + frame->prefixSize();
auto len = frame->size() - frame->prefixSize(); auto len = frame->size() - frame->prefixSize();
auto remain_size = len; auto remain_size = len;
const auto max_rtp_size = _ui32MtuSize - 20; auto max_size = getMaxSize();
bool mark = false; bool mark = false;
while (remain_size > 0) { while (remain_size > 0) {
size_t rtp_size; size_t rtp_size;
if (remain_size > max_rtp_size) { if (remain_size > max_size) {
rtp_size = max_rtp_size; rtp_size = max_size;
} else { } else {
rtp_size = remain_size; rtp_size = remain_size;
mark = true; mark = true;
......
...@@ -46,7 +46,7 @@ bool H264RtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos) { ...@@ -46,7 +46,7 @@ bool H264RtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos) {
return decodeRtp(rtp); return decodeRtp(rtp);
} }
bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtp) {
/** /**
* h264帧类型 * h264帧类型
* Type==1:P/B frame * Type==1:P/B frame
...@@ -71,8 +71,11 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -71,8 +71,11 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
29 FU-B Fragmentation unit 5.8 29 FU-B Fragmentation unit 5.8
30-31 undefined - 30-31 undefined -
*/ */
const uint8_t *frame = (uint8_t *) rtppack->data() + rtppack->offset; auto frame = rtp->getPayload();
auto length = rtppack->size() - rtppack->offset; auto length = rtp->getPayloadSize();
auto stamp = rtp->getStampMS();
auto seq = rtp->getSeq();
int nal_type = *frame & 0x1F; int nal_type = *frame & 0x1F;
int nal_suffix = *frame & (~0x1F); int nal_suffix = *frame & (~0x1F);
...@@ -80,7 +83,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -80,7 +83,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
//a full frame //a full frame
_h264frame->_buffer.assign("\x0\x0\x0\x1", 4); _h264frame->_buffer.assign("\x0\x0\x0\x1", 4);
_h264frame->_buffer.append((char *) frame, length); _h264frame->_buffer.append((char *) frame, length);
_h264frame->_pts = rtppack->timeStamp; _h264frame->_pts = stamp;
auto key = _h264frame->keyFrame(); auto key = _h264frame->keyFrame();
onGetH264(_h264frame); onGetH264(_h264frame);
return (key); //i frame return (key); //i frame
...@@ -107,7 +110,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -107,7 +110,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
//有有效数据 //有有效数据
_h264frame->_buffer.assign("\x0\x0\x0\x1", 4); _h264frame->_buffer.assign("\x0\x0\x0\x1", 4);
_h264frame->_buffer.append((char *) ptr, len); _h264frame->_buffer.append((char *) ptr, len);
_h264frame->_pts = rtppack->timeStamp; _h264frame->_pts = stamp;
if ((ptr[0] & 0x1F) == H264Frame::NAL_IDR) { if ((ptr[0] & 0x1F) == H264Frame::NAL_IDR) {
haveIDR = true; haveIDR = true;
} }
...@@ -127,16 +130,16 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -127,16 +130,16 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
_h264frame->_buffer.assign("\x0\x0\x0\x1", 4); _h264frame->_buffer.assign("\x0\x0\x0\x1", 4);
_h264frame->_buffer.push_back(nal_suffix | fu.type); _h264frame->_buffer.push_back(nal_suffix | fu.type);
_h264frame->_buffer.append((char *) frame + 2, length - 2); _h264frame->_buffer.append((char *) frame + 2, length - 2);
_h264frame->_pts = rtppack->timeStamp; _h264frame->_pts = stamp;
//该函数return时,保存下当前sequence,以便下次对比seq是否连续 //该函数return时,保存下当前sequence,以便下次对比seq是否连续
_lastSeq = rtppack->sequence; _lastSeq = seq;
return _h264frame->keyFrame(); return _h264frame->keyFrame();
} }
if (rtppack->sequence != (uint16_t)(_lastSeq + 1) && rtppack->sequence != 0) { if (seq != (uint16_t)(_lastSeq + 1) && seq != 0) {
//中间的或末尾的rtp包,其seq必须连续(如果回环了则判定为连续),否则说明rtp丢包,那么该帧不完整,必须得丢弃 //中间的或末尾的rtp包,其seq必须连续(如果回环了则判定为连续),否则说明rtp丢包,那么该帧不完整,必须得丢弃
_h264frame->_buffer.clear(); _h264frame->_buffer.clear();
WarnL << "rtp丢包: " << rtppack->sequence << " != " << _lastSeq << " + 1,该帧被废弃"; WarnL << "rtp丢包: " << seq << " != " << _lastSeq << " + 1,该帧被废弃";
return false; return false;
} }
...@@ -144,13 +147,13 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -144,13 +147,13 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
//该帧的中间rtp包 FU-A mid //该帧的中间rtp包 FU-A mid
_h264frame->_buffer.append((char *) frame + 2, length - 2); _h264frame->_buffer.append((char *) frame + 2, length - 2);
//该函数return时,保存下当前sequence,以便下次对比seq是否连续 //该函数return时,保存下当前sequence,以便下次对比seq是否连续
_lastSeq = rtppack->sequence; _lastSeq = seq;
return false; return false;
} }
//该帧最后一个rtp包 FU-A end //该帧最后一个rtp包 FU-A end
_h264frame->_buffer.append((char *) frame + 2, length - 2); _h264frame->_buffer.append((char *) frame + 2, length - 2);
_h264frame->_pts = rtppack->timeStamp; _h264frame->_pts = stamp;
onGetH264(_h264frame); onGetH264(_h264frame);
return false; return false;
} }
...@@ -163,7 +166,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { ...@@ -163,7 +166,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) {
// 0 udef // 0 udef
// 30 udef // 30 udef
// 31 udef // 31 udef
WarnL << "不支持的rtp类型:" << (int) nal_type << " " << rtppack->sequence; WarnL << "不支持的rtp类型:" << (int) nal_type << " " << seq;
return false; return false;
} }
} }
...@@ -193,15 +196,14 @@ H264RtpEncoder::H264RtpEncoder(uint32_t ui32Ssrc, ...@@ -193,15 +196,14 @@ H264RtpEncoder::H264RtpEncoder(uint32_t ui32Ssrc,
} }
void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) {
GET_CONFIG(uint32_t,cycleMS,Rtp::kCycleMS);
auto ptr = frame->data() + frame->prefixSize(); auto ptr = frame->data() + frame->prefixSize();
auto len = frame->size() - frame->prefixSize(); auto len = frame->size() - frame->prefixSize();
auto pts = frame->pts() % cycleMS; auto pts = frame->pts();
auto nal_type = H264_TYPE(ptr[0]); auto nal_type = H264_TYPE(ptr[0]);
size_t payload_size = _ui32MtuSize - 2; auto max_size = getMaxSize() - 2;
//超过MTU则按照FU-A模式打包 //超过MTU则按照FU-A模式打包
if (len > payload_size + 1) { if (len > max_size + 1) {
//最高位bit为forbidden_zero_bit, //最高位bit为forbidden_zero_bit,
//后面2bit为nal_ref_idc(帧重要程度),00:可以丢,11:不能丢 //后面2bit为nal_ref_idc(帧重要程度),00:可以丢,11:不能丢
//末尾5bit为nalu type,固定为28(FU-A) //末尾5bit为nalu type,固定为28(FU-A)
...@@ -211,10 +213,10 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -211,10 +213,10 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) {
bool mark_bit = false; bool mark_bit = false;
size_t offset = 1; size_t offset = 1;
while (!mark_bit) { while (!mark_bit) {
if (len <= offset + payload_size) { if (len <= offset + max_size) {
//FU-A end //FU-A end
mark_bit = true; mark_bit = true;
payload_size = len - offset; max_size = len - offset;
s_e_r_flags = (1 << 6) | nal_type; s_e_r_flags = (1 << 6) | nal_type;
} else if (fu_a_start) { } else if (fu_a_start) {
//FU-A start //FU-A start
...@@ -226,19 +228,19 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -226,19 +228,19 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) {
{ {
//传入nullptr先不做payload的内存拷贝 //传入nullptr先不做payload的内存拷贝
auto rtp = makeRtp(getTrackType(), nullptr, payload_size + 2, mark_bit, pts); auto rtp = makeRtp(getTrackType(), nullptr, max_size + 2, mark_bit, pts);
//rtp payload 负载部分 //rtp payload 负载部分
uint8_t *payload = (uint8_t*)rtp->data() + rtp->offset; uint8_t *payload = rtp->getPayload();
//FU-A 第1个字节 //FU-A 第1个字节
payload[0] = nal_fu_a; payload[0] = nal_fu_a;
//FU-A 第2个字节 //FU-A 第2个字节
payload[1] = s_e_r_flags; payload[1] = s_e_r_flags;
//H264 数据 //H264 数据
memcpy(payload + 2, (unsigned char *) ptr + offset, payload_size); memcpy(payload + 2, (unsigned char *) ptr + offset, max_size);
//输入到rtp环形缓存 //输入到rtp环形缓存
RtpCodec::inputRtp(rtp, fu_a_start && nal_type == H264Frame::NAL_IDR); RtpCodec::inputRtp(rtp, fu_a_start && nal_type == H264Frame::NAL_IDR);
} }
offset += payload_size; offset += max_size;
fu_a_start = false; fu_a_start = false;
} }
} else { } else {
......
...@@ -168,9 +168,11 @@ bool H265RtpDecoder::mergeFu(const uint8_t *ptr, ssize_t size, uint16_t seq, uin ...@@ -168,9 +168,11 @@ bool H265RtpDecoder::mergeFu(const uint8_t *ptr, ssize_t size, uint16_t seq, uin
return false; return false;
} }
bool H265RtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool ) { bool H265RtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool ) {
const uint8_t *frame = (uint8_t *) rtppack->data() + rtppack->offset; auto frame = rtp->getPayload();
auto length = rtppack->size() - rtppack->offset; auto length = rtp->getPayloadSize();
auto stamp = rtp->getStampMS();
auto seq = rtp->getSeq();
int nal = H265_TYPE(frame[0]); int nal = H265_TYPE(frame[0]);
if (nal > 50){ if (nal > 50){
...@@ -185,13 +187,13 @@ bool H265RtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool ) { ...@@ -185,13 +187,13 @@ bool H265RtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool ) {
return false; return false;
case 48: case 48:
// aggregated packet (AP) - with two or more NAL units // aggregated packet (AP) - with two or more NAL units
return unpackAp(frame, length, rtppack->timeStamp); return unpackAp(frame, length, stamp);
case 49: case 49:
// fragmentation unit (FU) // fragmentation unit (FU)
return mergeFu(frame, length, rtppack->sequence, rtppack->timeStamp); return mergeFu(frame, length, seq, stamp);
default: default:
// 4.4.1. Single NAL Unit Packets (p24) // 4.4.1. Single NAL Unit Packets (p24)
return singleFrame(frame, length, rtppack->timeStamp); return singleFrame(frame, length, stamp);
} }
} }
...@@ -228,25 +230,24 @@ H265RtpEncoder::H265RtpEncoder(uint32_t ui32Ssrc, ...@@ -228,25 +230,24 @@ H265RtpEncoder::H265RtpEncoder(uint32_t ui32Ssrc,
} }
void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) {
GET_CONFIG(uint32_t, cycleMS, Rtp::kCycleMS);
auto ptr = (uint8_t *) frame->data() + frame->prefixSize(); auto ptr = (uint8_t *) frame->data() + frame->prefixSize();
auto len = frame->size() - frame->prefixSize(); auto len = frame->size() - frame->prefixSize();
auto pts = frame->pts() % cycleMS; auto pts = frame->pts();
auto nal_type = H265_TYPE(ptr[0]); //获取NALU的5bit 帧类型 auto nal_type = H265_TYPE(ptr[0]); //获取NALU的5bit 帧类型
size_t payload_size = _ui32MtuSize - 3; auto max_size = getMaxSize() - 3;
//超过MTU,按照FU方式打包 //超过MTU,按照FU方式打包
if (len > payload_size + 2) { if (len > max_size + 2) {
//获取帧头数据,1byte //获取帧头数据,1byte
unsigned char s_e_flags; unsigned char s_e_flags;
bool fu_start = true; bool fu_start = true;
bool mark_bit = false; bool mark_bit = false;
size_t offset = 2; size_t offset = 2;
while (!mark_bit) { while (!mark_bit) {
if (len <= offset + payload_size) { if (len <= offset + max_size) {
//FU end //FU end
mark_bit = true; mark_bit = true;
payload_size = len - offset; max_size = len - offset;
s_e_flags = (1 << 6) | nal_type; s_e_flags = (1 << 6) | nal_type;
} else if (fu_start) { } else if (fu_start) {
//FU start //FU start
...@@ -258,9 +259,9 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -258,9 +259,9 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) {
{ {
//传入nullptr先不做payload的内存拷贝 //传入nullptr先不做payload的内存拷贝
auto rtp = makeRtp(getTrackType(), nullptr, payload_size + 3, mark_bit, pts); auto rtp = makeRtp(getTrackType(), nullptr, max_size + 3, mark_bit, pts);
//rtp payload 负载部分 //rtp payload 负载部分
uint8_t *payload = (uint8_t *) rtp->data() + rtp->offset; uint8_t *payload = rtp->getPayload();
//FU 第1个字节,表明为FU //FU 第1个字节,表明为FU
payload[0] = 49 << 1; payload[0] = 49 << 1;
//FU 第2个字节貌似固定为1 //FU 第2个字节貌似固定为1
...@@ -268,12 +269,12 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -268,12 +269,12 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) {
//FU 第3个字节 //FU 第3个字节
payload[2] = s_e_flags; payload[2] = s_e_flags;
//H265 数据 //H265 数据
memcpy(payload + 3, ptr + offset, payload_size); memcpy(payload + 3, ptr + offset, max_size);
//输入到rtp环形缓存 //输入到rtp环形缓存
RtpCodec::inputRtp(rtp, fu_start && H265Frame::isKeyFrame(nal_type)); RtpCodec::inputRtp(rtp, fu_start && H265Frame::isKeyFrame(nal_type));
} }
offset += payload_size; offset += max_size;
fu_start = false; fu_start = false;
} }
} else { } else {
......
...@@ -35,8 +35,9 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) { ...@@ -35,8 +35,9 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
} }
void GB28181Process::onRtpSorted(const RtpPacket::Ptr &rtp, int) { void GB28181Process::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
auto pt = rtp->getHeader()->pt;
if (!_rtp_decoder) { if (!_rtp_decoder) {
switch (rtp->PT) { switch (pt) {
case 98: { case 98: {
//H264负载 //H264负载
_rtp_decoder = std::make_shared<H264RtpDecoder>(); _rtp_decoder = std::make_shared<H264RtpDecoder>();
...@@ -44,8 +45,8 @@ void GB28181Process::onRtpSorted(const RtpPacket::Ptr &rtp, int) { ...@@ -44,8 +45,8 @@ void GB28181Process::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
break; break;
} }
default: { default: {
if (rtp->PT != 33 && rtp->PT != 96) { if (pt != 33 && pt != 96) {
WarnL << "rtp payload type未识别(" << (int) rtp->PT << "),已按ts或ps负载处理"; WarnL << "rtp payload type未识别(" << (int) pt << "),已按ts或ps负载处理";
} }
//ts或ps负载 //ts或ps负载
_rtp_decoder = std::make_shared<CommonRtpDecoder>(CodecInvalid, 32 * 1024); _rtp_decoder = std::make_shared<CommonRtpDecoder>(CodecInvalid, 32 * 1024);
......
...@@ -28,7 +28,7 @@ void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer) { ...@@ -28,7 +28,7 @@ void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer) {
void RtpCachePS::onRTP(Buffer::Ptr buffer) { void RtpCachePS::onRTP(Buffer::Ptr buffer) {
auto rtp = static_pointer_cast<RtpPacket>(buffer); auto rtp = static_pointer_cast<RtpPacket>(buffer);
auto stamp = rtp->timeStamp; auto stamp = rtp->getStampMS();
input(stamp, std::move(buffer)); input(stamp, std::move(buffer));
} }
......
...@@ -13,43 +13,37 @@ ...@@ -13,43 +13,37 @@
namespace mediakit{ namespace mediakit{
RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, size_t len, bool mark, uint32_t stamp) { RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, size_t len, bool mark, uint32_t stamp) {
uint16_t payload_len = (uint16_t)(len + 12); uint16_t payload_len = (uint16_t) (len + RtpPacket::kRtpHeaderSize);
uint32_t ts = htonl((_ui32SampleRate / 1000) * stamp); auto rtp = ResourcePoolHelper<RtpPacket>::obtainObj();
uint16_t sq = htons(_ui16Sequence); rtp->setCapacity(payload_len + RtpPacket::kRtpTcpHeaderSize);
uint32_t sc = htonl(_ui32Ssrc); rtp->setSize(payload_len + RtpPacket::kRtpTcpHeaderSize);
rtp->sample_rate = _sample_rate;
auto rtp_ptr = ResourcePoolHelper<RtpPacket>::obtainObj(); rtp->type = type;
rtp_ptr->setCapacity(len + 16);
rtp_ptr->setSize(len + 16); //rtsp over tcp 头
auto ptr = (uint8_t *) rtp->data();
auto *rtp = (unsigned char *)rtp_ptr->data(); ptr[0] = '$';
rtp[0] = '$'; ptr[1] = _interleaved;
rtp[1] = _ui8Interleaved; ptr[2] = payload_len >> 8;
rtp[2] = payload_len >> 8; ptr[3] = payload_len & 0xFF;
rtp[3] = payload_len & 0xFF;
rtp[4] = 0x80; //rtp头
rtp[5] = (mark << 7) | _ui8PayloadType; auto header = rtp->getHeader();
memcpy(&rtp[6], &sq, 2); header->version = RtpPacket::kRtpVersion;
memcpy(&rtp[8], &ts, 4); header->padding = 0;
//ssrc header->ext = 0;
memcpy(&rtp[12], &sc, 4); header->csrc = 0;
header->mark = mark;
if(data){ header->pt = _pt;
//payload header->seq = htons(_seq++);
memcpy(&rtp[16], data, len); header->stamp = htonl(uint64_t(stamp) * _sample_rate / 1000);
header->ssrc = htonl(_ssrc);
//有效负载
if (data) {
memcpy(&ptr[RtpPacket::kRtpHeaderSize + RtpPacket::kRtpTcpHeaderSize], data, len);
} }
return rtp;
rtp_ptr->PT = _ui8PayloadType;
rtp_ptr->interleaved = _ui8Interleaved;
rtp_ptr->mark = mark;
rtp_ptr->sequence = _ui16Sequence;
rtp_ptr->timeStamp = stamp;
rtp_ptr->ssrc = _ui32Ssrc;
rtp_ptr->type = type;
rtp_ptr->offset = 16;
_ui16Sequence++;
_ui32TimeStamp = stamp;
return rtp_ptr;
} }
}//namespace mediakit }//namespace mediakit
......
...@@ -58,63 +58,41 @@ protected: ...@@ -58,63 +58,41 @@ protected:
RingType::Ptr _rtpRing; RingType::Ptr _rtpRing;
}; };
class RtpInfo : public ResourcePoolHelper<RtpPacket>{
class RtpInfo : public ResourcePoolHelper<RtpPacket>{
public: public:
typedef std::shared_ptr<RtpInfo> Ptr; typedef std::shared_ptr<RtpInfo> Ptr;
RtpInfo(uint32_t ui32Ssrc, RtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved) {
uint32_t ui32MtuSize, if (ssrc == 0) {
uint32_t ui32SampleRate, ssrc = ((uint64_t) this) & 0xFFFFFFFF;
uint8_t ui8PayloadType,
uint8_t ui8Interleaved) {
if(ui32Ssrc == 0){
ui32Ssrc = ((uint64_t)this) & 0xFFFFFFFF;
} }
_ui32Ssrc = ui32Ssrc; _pt = pt;
_ui32SampleRate = ui32SampleRate; _ssrc = ssrc;
_ui32MtuSize = ui32MtuSize; _mtu_size = mtu_size;
_ui8PayloadType = ui8PayloadType; _sample_rate = sample_rate;
_ui8Interleaved = ui8Interleaved; _interleaved = interleaved;
} }
virtual ~RtpInfo(){} ~RtpInfo() override {}
int getInterleaved() const {
return _ui8Interleaved;
}
int getPayloadType() const {
return _ui8PayloadType;
}
int getSampleRate() const { //返回rtp负载最大长度
return _ui32SampleRate; size_t getMaxSize() const {
return _mtu_size - RtpPacket::kRtpHeaderSize;
} }
uint32_t getSsrc() const { uint32_t getSsrc() const {
return _ui32Ssrc; return _ssrc;
} }
uint16_t getSeqence() const {
return _ui16Sequence;
}
uint32_t getTimestamp() const {
return _ui32TimeStamp;
}
uint32_t getMtuSize() const {
return _ui32MtuSize;
}
RtpPacket::Ptr makeRtp(TrackType type,const void *data, size_t len, bool mark, uint32_t stamp); RtpPacket::Ptr makeRtp(TrackType type,const void *data, size_t len, bool mark, uint32_t stamp);
protected: private:
uint32_t _ui32Ssrc; uint8_t _pt;
uint32_t _ui32SampleRate; uint8_t _interleaved;
uint32_t _ui32MtuSize; uint16_t _seq = 0;
uint8_t _ui8PayloadType; uint32_t _ssrc;
uint8_t _ui8Interleaved; uint32_t _sample_rate;
uint16_t _ui16Sequence = 0; size_t _mtu_size;
uint32_t _ui32TimeStamp = 0;
}; };
class RtpCodec : public RtpRing, public FrameDispatcher , public CodecInfo{ class RtpCodec : public RtpRing, public FrameDispatcher , public CodecInfo{
......
...@@ -11,10 +11,6 @@ ...@@ -11,10 +11,6 @@
#include "Common/config.h" #include "Common/config.h"
#include "RtpReceiver.h" #include "RtpReceiver.h"
#define AV_RB16(x) \
((((const uint8_t*)(x))[0] << 8) | \
((const uint8_t*)(x))[1])
#define RTP_MAX_SIZE (10 * 1024) #define RTP_MAX_SIZE (10 * 1024)
namespace mediakit { namespace mediakit {
...@@ -28,122 +24,67 @@ RtpReceiver::RtpReceiver() { ...@@ -28,122 +24,67 @@ RtpReceiver::RtpReceiver() {
++index; ++index;
} }
} }
RtpReceiver::~RtpReceiver() {} RtpReceiver::~RtpReceiver() {}
bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, uint8_t *rtp_raw_ptr, size_t rtp_raw_len) { bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len) {
if (rtp_raw_len < 12) { if (len < RtpPacket::kRtpHeaderSize) {
WarnL << "rtp包太小:" << rtp_raw_len; WarnL << "rtp包太小:" << len;
return false; return false;
} }
if (len > RTP_MAX_SIZE) {
uint32_t version = rtp_raw_ptr[0] >> 6; WarnL << "超大的rtp包:" << len << " > " << RTP_MAX_SIZE;
uint8_t padding = 0; return false;
uint8_t ext = rtp_raw_ptr[0] & 0x10;
uint8_t csrc = rtp_raw_ptr[0] & 0x0f;
if (rtp_raw_ptr[0] & 0x20) {
//获取padding大小
padding = rtp_raw_ptr[rtp_raw_len - 1];
//移除padding flag
rtp_raw_ptr[0] &= ~0x20;
//移除padding字节
rtp_raw_len -= padding;
}
if (version != 2) {
throw std::invalid_argument("非法的rtp,version != 2");
} }
if (!sample_rate) {
auto rtp_ptr = _rtp_pool.obtain();
auto &rtp = *rtp_ptr;
rtp.type = type;
rtp.interleaved = 2 * type;
rtp.mark = rtp_raw_ptr[1] >> 7;
rtp.PT = rtp_raw_ptr[1] & 0x7F;
//序列号,内存对齐
memcpy(&rtp.sequence, rtp_raw_ptr + 2, 2);
rtp.sequence = ntohs(rtp.sequence);
//时间戳,内存对齐
memcpy(&rtp.timeStamp, rtp_raw_ptr + 4, 4);
rtp.timeStamp = ntohl(rtp.timeStamp);
if (!samplerate) {
//无法把时间戳转换成毫秒 //无法把时间戳转换成毫秒
return false; return false;
} }
//时间戳转换成毫秒 RtpHeader *header = (RtpHeader *) ptr;
rtp.timeStamp = rtp.timeStamp * 1000LL / samplerate; if (header->version != RtpPacket::kRtpVersion) {
throw std::invalid_argument("非法的rtp,version字段非法");
//ssrc,内存对齐
memcpy(&rtp.ssrc, rtp_raw_ptr + 8, 4);
rtp.ssrc = ntohl(rtp.ssrc);
if (_ssrc[track_index] != rtp.ssrc) {
if (_ssrc[track_index] == 0) {
//保存SSRC至track对象
_ssrc[track_index] = rtp.ssrc;
} else {
//ssrc错误
WarnL << "ssrc错误:" << rtp.ssrc << " != " << _ssrc[track_index];
if (_ssrc_err_count[track_index]++ > 10) {
//ssrc切换后清除老数据
WarnL << "ssrc更换:" << _ssrc[track_index] << " -> " << rtp.ssrc;
_rtp_sortor[track_index].clear();
_ssrc[track_index] = rtp.ssrc;
}
return false;
}
} }
if (!header->getPayloadSize(len)) {
//ssrc匹配正确,不匹配计数清零
_ssrc_err_count[track_index] = 0;
//rtp 12个固定字节头
rtp.offset = 12;
//rtp有csrc
rtp.offset += 4 * csrc;
if (ext) {
//rtp有ext
uint16_t reserved = AV_RB16(rtp_raw_ptr + rtp.offset);
uint16_t extlen = AV_RB16(rtp_raw_ptr + rtp.offset + 2) << 2;
rtp.offset += extlen + 4;
}
if (rtp_raw_len <= rtp.offset) {
//无有效负载的rtp包 //无有效负载的rtp包
return false; return false;
} }
if (rtp_raw_len > RTP_MAX_SIZE) { //比对缓存ssrc
WarnL << "超大的rtp包:" << rtp_raw_len << " > " << RTP_MAX_SIZE; auto ssrc = ntohl(header->ssrc);
if (!_ssrc[index]) {
//保存SSRC至track对象
_ssrc[index] = ssrc;
} else if (_ssrc[index] != ssrc) {
//ssrc错误
WarnL << "ssrc错误:" << ssrc << " != " << _ssrc[index];
return false; return false;
} }
//设置rtp负载长度 auto rtp = _rtp_pool.obtain();
rtp.setCapacity(rtp_raw_len + 4); //需要添加4个字节的rtp over tcp头
rtp.setSize(rtp_raw_len + 4); rtp->setCapacity(RtpPacket::kRtpTcpHeaderSize + len);
uint8_t *payload_ptr = (uint8_t *) rtp.data(); rtp->setSize(RtpPacket::kRtpTcpHeaderSize + len);
payload_ptr[0] = '$'; rtp->sample_rate = sample_rate;
payload_ptr[1] = rtp.interleaved; rtp->type = type;
payload_ptr[2] = (rtp_raw_len >> 8) & 0xFF;
payload_ptr[3] = rtp_raw_len & 0xFF; //赋值4个字节的rtp over tcp头
//添加rtp over tcp前4个字节的偏移量 uint8_t *data = (uint8_t *) rtp->data();
rtp.offset += 4; data[0] = '$';
//拷贝rtp负载 data[1] = 2 * type;
memcpy(payload_ptr + 4, rtp_raw_ptr, rtp_raw_len); data[2] = (len >> 8) & 0xFF;
//排序rtp data[3] = len & 0xFF;
auto seq = rtp_ptr->sequence; //拷贝rtp
onBeforeRtpSorted(rtp_ptr, track_index); memcpy(&data[4], ptr, len);
_rtp_sortor[track_index].sortPacket(seq, std::move(rtp_ptr));
onBeforeRtpSorted(rtp, index);
auto seq = rtp->getSeq();
_rtp_sortor[index].sortPacket(seq, std::move(rtp));
return true; return true;
} }
void RtpReceiver::clear() { void RtpReceiver::clear() {
CLEAR_ARR(_ssrc); CLEAR_ARR(_ssrc);
CLEAR_ARR(_ssrc_err_count);
for (auto &sortor : _rtp_sortor) { for (auto &sortor : _rtp_sortor) {
sortor.clear(); sortor.clear();
} }
...@@ -153,16 +94,16 @@ void RtpReceiver::setPoolSize(size_t size) { ...@@ -153,16 +94,16 @@ void RtpReceiver::setPoolSize(size_t size) {
_rtp_pool.setSize(size); _rtp_pool.setSize(size);
} }
size_t RtpReceiver::getJitterSize(int track_index) const{ size_t RtpReceiver::getJitterSize(int index) const{
return _rtp_sortor[track_index].getJitterSize(); return _rtp_sortor[index].getJitterSize();
} }
size_t RtpReceiver::getCycleCount(int track_index) const{ size_t RtpReceiver::getCycleCount(int index) const{
return _rtp_sortor[track_index].getCycleCount(); return _rtp_sortor[index].getCycleCount();
} }
uint32_t RtpReceiver::getSSRC(int track_index) const{ uint32_t RtpReceiver::getSSRC(int index) const{
return _ssrc[track_index]; return _ssrc[index];
} }
}//namespace mediakit }//namespace mediakit
...@@ -168,14 +168,14 @@ public: ...@@ -168,14 +168,14 @@ public:
protected: protected:
/** /**
* 输入数据指针生成并排序rtp包 * 输入数据指针生成并排序rtp包
* @param track_index track下标索引 * @param index track下标索引
* @param type track类型 * @param type track类型
* @param samplerate rtp时间戳基准时钟,视频为90000,音频为采样率 * @param samplerate rtp时间戳基准时钟,视频为90000,音频为采样率
* @param rtp_raw_ptr rtp数据指针 * @param ptr rtp数据指针
* @param rtp_raw_len rtp数据指针长度 * @param len rtp数据指针长度
* @return 解析成功返回true * @return 解析成功返回true
*/ */
bool handleOneRtp(int track_index, TrackType type, int samplerate, uint8_t *rtp_raw_ptr, size_t rtp_raw_len); bool handleOneRtp(int index, TrackType type, int samplerate, uint8_t *ptr, size_t len);
/** /**
* rtp数据包排序后输出 * rtp数据包排序后输出
...@@ -199,8 +199,6 @@ protected: ...@@ -199,8 +199,6 @@ protected:
private: private:
uint32_t _ssrc[2] = {0, 0}; uint32_t _ssrc[2] = {0, 0};
//ssrc不匹配计数
size_t _ssrc_err_count[2] = {0, 0};
//rtp排序缓存,根据seq排序 //rtp排序缓存,根据seq排序
PacketSortor<RtpPacket::Ptr> _rtp_sortor[2]; PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
//rtp循环池 //rtp循环池
......
...@@ -415,14 +415,101 @@ string printSSRC(uint32_t ui32Ssrc) { ...@@ -415,14 +415,101 @@ string printSSRC(uint32_t ui32Ssrc) {
} }
Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved){ Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved){
auto rtp_tcp = std::make_shared<BufferRaw>(4); auto rtp_tcp = std::make_shared<BufferRaw>(RtpPacket::kRtpTcpHeaderSize);
rtp_tcp->setSize(RtpPacket::kRtpTcpHeaderSize);
auto ptr = rtp_tcp->data(); auto ptr = rtp_tcp->data();
ptr[0] = '$'; ptr[0] = '$';
ptr[1] = interleaved; ptr[1] = interleaved;
ptr[2] = (size >> 8) & 0xFF; ptr[2] = (size >> 8) & 0xFF;
ptr[3] = size & 0xFF; ptr[3] = size & 0xFF;
rtp_tcp->setSize(4);
return rtp_tcp; return rtp_tcp;
} }
#define AV_RB16(x) \
((((const uint8_t*)(x))[0] << 8) | \
((const uint8_t*)(x))[1])
size_t RtpHeader::getCsrcSize() const {
//每个csrc占用4字节
return csrc << 2;
}
uint8_t *RtpHeader::getCsrcData() {
if (!csrc) {
return nullptr;
}
return &payload;
}
size_t RtpHeader::getExtSize() const {
//rtp有ext
if (!ext) {
return 0;
}
auto ext_ptr = &payload + getCsrcSize();
uint16_t reserved = AV_RB16(ext_ptr);
//每个ext占用4字节
return AV_RB16(ext_ptr + 2) << 2;
}
uint8_t *RtpHeader::getExtData() {
if (!ext) {
return nullptr;
}
auto ext_ptr = &payload + getCsrcSize();
//多出的4个字节分别为reserved、ext_len
return ext_ptr + 4 + getExtSize();
}
size_t RtpHeader::getPayloadOffset() const {
//有ext时,还需要忽略reserved、ext_len 4个字节
return getCsrcSize() + (ext ? (4 + getExtSize()) : 0);
}
uint8_t *RtpHeader::getPayloadData() {
return &payload + getPayloadOffset();
}
size_t RtpHeader::getPaddingSize(size_t rtp_size) const {
if (!padding) {
return 0;
}
auto end = (uint8_t *) this + rtp_size;
return *end;
}
size_t RtpHeader::getPayloadSize(size_t rtp_size){
auto invalid_size = getPayloadOffset() + getPaddingSize(rtp_size);
if (invalid_size + RtpPacket::kRtpHeaderSize >= rtp_size) {
return 0;
}
return rtp_size - invalid_size - RtpPacket::kRtpHeaderSize;
}
RtpHeader* RtpPacket::getHeader(){
//需除去rtcp over tcp 4个字节长度
return (RtpHeader*)(data() + RtpPacket::kRtpTcpHeaderSize);
}
uint16_t RtpPacket::getSeq(){
return ntohs(getHeader()->seq);
}
uint32_t RtpPacket::getStampMS(){
return ntohl(getHeader()->stamp) * uint64_t(1000) / sample_rate;
}
uint32_t RtpPacket::getSSRC(){
return ntohl(getHeader()->ssrc);
}
uint8_t* RtpPacket::getPayload(){
return getHeader()->getPayloadData();
}
size_t RtpPacket::getPayloadSize(){
//需除去rtcp over tcp 4个字节长度
return getHeader()->getPayloadSize(size() - kRtpTcpHeaderSize);
}
}//namespace mediakit }//namespace mediakit
\ No newline at end of file
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
#include <string> #include <string>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include "Common/config.h"
#include "Util/util.h" #include "Util/util.h"
#include "Common/config.h"
#include "Common/macros.h"
#include "Extension/Frame.h" #include "Extension/Frame.h"
using namespace std; using namespace std;
...@@ -68,18 +69,101 @@ typedef enum { ...@@ -68,18 +69,101 @@ typedef enum {
}; };
#if defined(_WIN32)
#pragma pack(push, 1)
#endif // defined(_WIN32)
class RtpHeader {
public:
#if __BYTE_ORDER == __BIG_ENDIAN
//版本号,固定为2
uint32_t version: 2;
//padding
uint32_t padding: 1;
//扩展
uint32_t ext: 1;
//csrc
uint32_t csrc: 4;
//mark
uint32_t mark: 1;
//负载类型
uint32_t pt: 7;
#else
//csrc
uint32_t csrc: 4;
//扩展
uint32_t ext: 1;
//padding
uint32_t padding: 1;
//版本号,固定为2
uint32_t version: 2;
//负载类型
uint32_t pt: 7;
//mark
uint32_t mark: 1;
#endif
//序列号
uint32_t seq: 16;
//时间戳
uint32_t stamp;
//ssrc
uint32_t ssrc;
//负载,如果有csrc和ext,前面为 4 * csrc + (4 + 4 * ext_len)
uint8_t payload;
public:
//返回csrc字段字节长度
size_t getCsrcSize() const;
//返回csrc字段首地址,不存在时返回nullptr
uint8_t *getCsrcData();
//返回ext字段字节长度
size_t getExtSize() const;
//返回ext段首地址,不存在时返回nullptr
uint8_t *getExtData();
//返回有效负载指针,跳过csrc、ext
uint8_t* getPayloadData();
//返回有效负载总长度,不包括csrc、ext、padding
size_t getPayloadSize(size_t rtp_size);
private:
//返回有效负载偏移量
size_t getPayloadOffset() const;
//返回padding长度
size_t getPaddingSize(size_t rtp_size) const;
} PACKED;
#if defined(_WIN32)
#pragma pack(pop)
#endif // defined(_WIN32)
//此rtp为rtp over tcp形式,需要忽略前4个字节
class RtpPacket : public BufferRaw{ class RtpPacket : public BufferRaw{
public: public:
typedef std::shared_ptr<RtpPacket> Ptr; using Ptr = std::shared_ptr<RtpPacket>;
bool mark; enum {
uint8_t interleaved; kRtpVersion = 2,
uint8_t PT; kRtpHeaderSize = 12,
kRtpTcpHeaderSize = 4
};
RtpHeader* getHeader();
//主机字节序的seq
uint16_t getSeq();
//主机字节序的时间戳,已经转换为毫秒
uint32_t getStampMS();
//主机字节序的ssrc
uint32_t getSSRC();
//有效负载,跳过csrc、ext
uint8_t* getPayload();
//有效负载长度,不包括csrc、ext、padding
size_t getPayloadSize();
//音视频类型
TrackType type; TrackType type;
uint16_t sequence; //音频为采样率,视频一般为90000
//时间戳,单位毫秒 uint32_t sample_rate;
uint32_t timeStamp;
uint32_t ssrc;
size_t offset;
}; };
Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved); Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved);
......
...@@ -160,10 +160,11 @@ public: ...@@ -160,10 +160,11 @@ public:
_speed[rtp->type] += rtp->size(); _speed[rtp->type] += rtp->size();
assert(rtp->type >= 0 && rtp->type < TrackMax); assert(rtp->type >= 0 && rtp->type < TrackMax);
auto &track = _tracks[rtp->type]; auto &track = _tracks[rtp->type];
auto stamp = rtp->getStampMS();
if (track) { if (track) {
track->_seq = rtp->sequence; track->_seq = rtp->getSeq();
track->_time_stamp = rtp->timeStamp; track->_time_stamp = stamp;
track->_ssrc = rtp->ssrc; track->_ssrc = rtp->getSSRC();
} }
if (!_ring) { if (!_ring) {
weak_ptr<RtspMediaSource> weakSelf = dynamic_pointer_cast<RtspMediaSource>(shared_from_this()); weak_ptr<RtspMediaSource> weakSelf = dynamic_pointer_cast<RtspMediaSource>(shared_from_this());
...@@ -183,7 +184,6 @@ public: ...@@ -183,7 +184,6 @@ public:
} }
} }
bool is_video = rtp->type == TrackVideo; bool is_video = rtp->type == TrackVideo;
auto stamp = rtp->timeStamp;
PacketCache<RtpPacket>::inputPacket(stamp, is_video, std::move(rtp), keyPos); PacketCache<RtpPacket>::inputPacket(stamp, is_video, std::move(rtp), keyPos);
} }
......
...@@ -478,10 +478,10 @@ void RtspPlayer::onRtpPacket(const char *data, size_t len) { ...@@ -478,10 +478,10 @@ void RtspPlayer::onRtpPacket(const char *data, size_t len) {
uint8_t interleaved = data[1]; uint8_t interleaved = data[1];
if(interleaved %2 == 0){ if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved); trackIdx = getTrackIndexByInterleaved(interleaved);
handleOneRtp(trackIdx, _sdp_track[trackIdx]->_type, _sdp_track[trackIdx]->_samplerate, (uint8_t *)data + 4, len - 4); handleOneRtp(trackIdx, _sdp_track[trackIdx]->_type, _sdp_track[trackIdx]->_samplerate, (uint8_t *)data + RtpPacket::kRtpTcpHeaderSize, len - RtpPacket::kRtpTcpHeaderSize);
}else{ }else{
trackIdx = getTrackIndexByInterleaved(interleaved - 1); trackIdx = getTrackIndexByInterleaved(interleaved - 1);
onRtcpPacket(trackIdx, _sdp_track[trackIdx], (uint8_t *) data + 4, len - 4); onRtcpPacket(trackIdx, _sdp_track[trackIdx], (uint8_t *) data + RtpPacket::kRtpTcpHeaderSize, len - RtpPacket::kRtpTcpHeaderSize);
} }
} }
...@@ -494,7 +494,7 @@ void RtspPlayer::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, uint8_t *data ...@@ -494,7 +494,7 @@ void RtspPlayer::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, uint8_t *data
} }
void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){ void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
_stamp[trackidx] = rtppt->timeStamp; _stamp[trackidx] = rtppt->getStampMS();
_rtp_recv_ticker.resetTime(); _rtp_recv_ticker.resetTime();
onRecvRTP(rtppt, _sdp_track[trackidx]); onRecvRTP(rtppt, _sdp_track[trackidx]);
} }
...@@ -593,7 +593,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC ...@@ -593,7 +593,7 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){ void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){
auto &rtcp_ctx = _rtcp_context[track_idx]; auto &rtcp_ctx = _rtcp_context[track_idx];
rtcp_ctx->onRtp(rtp->sequence, rtp->timeStamp, rtp->size() - 4); rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
auto &ticker = _rtcp_send_ticker[track_idx]; auto &ticker = _rtcp_send_ticker[track_idx];
if (ticker.elapsedTime() < 3 * 1000) { if (ticker.elapsedTime() < 3 * 1000) {
...@@ -627,10 +627,11 @@ void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){ ...@@ -627,10 +627,11 @@ void RtspPlayer::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_idx){
} }
}; };
auto rtcp = rtcp_ctx->createRtcpRR(rtp->ssrc + 1, rtp->ssrc); auto ssrc = rtp->getSSRC();
auto rtcp = rtcp_ctx->createRtcpRR(ssrc + 1, ssrc);
auto rtcp_sdes = RtcpSdes::create({SERVER_NAME}); auto rtcp_sdes = RtcpSdes::create({SERVER_NAME});
rtcp_sdes->items.type = (uint8_t) SdesType::RTCP_SDES_CNAME; rtcp_sdes->items.type = (uint8_t) SdesType::RTCP_SDES_CNAME;
rtcp_sdes->items.ssrc = htonl(rtp->ssrc); rtcp_sdes->items.ssrc = htonl(ssrc);
send_rtcp(this, track_idx, std::move(rtcp)); send_rtcp(this, track_idx, std::move(rtcp));
send_rtcp(this, track_idx, RtcpHeader::toBuffer(rtcp_sdes)); send_rtcp(this, track_idx, RtcpHeader::toBuffer(rtcp_sdes));
ticker.resetTime(); ticker.resetTime();
......
...@@ -154,7 +154,7 @@ void RtspPusher::onRtpPacket(const char *data, size_t len) { ...@@ -154,7 +154,7 @@ void RtspPusher::onRtpPacket(const char *data, size_t len) {
uint8_t interleaved = data[1]; uint8_t interleaved = data[1];
if (interleaved % 2 != 0) { if (interleaved % 2 != 0) {
trackIdx = getTrackIndexByInterleaved(interleaved - 1); trackIdx = getTrackIndexByInterleaved(interleaved - 1);
onRtcpPacket(trackIdx, _track_vec[trackIdx], (uint8_t *) data + 4, len - 4); onRtcpPacket(trackIdx, _track_vec[trackIdx], (uint8_t *) data + RtpPacket::kRtpTcpHeaderSize, len - RtpPacket::kRtpTcpHeaderSize);
} }
} }
...@@ -361,7 +361,7 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){ ...@@ -361,7 +361,7 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){
int track_index = getTrackIndexByTrackType(rtp->type); int track_index = getTrackIndexByTrackType(rtp->type);
auto &ticker = _rtcp_send_ticker[track_index]; auto &ticker = _rtcp_send_ticker[track_index];
auto &rtcp_ctx = _rtcp_context[track_index]; auto &rtcp_ctx = _rtcp_context[track_index];
rtcp_ctx->onRtp(rtp->sequence, rtp->timeStamp, rtp->size() - 4); rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
//send rtcp every 5 second //send rtcp every 5 second
if (ticker.elapsedTime() > 5 * 1000) { if (ticker.elapsedTime() > 5 * 1000) {
...@@ -376,10 +376,11 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){ ...@@ -376,10 +376,11 @@ void RtspPusher::updateRtcpContext(const RtpPacket::Ptr &rtp){
} }
}; };
auto rtcp = rtcp_ctx->createRtcpSR(rtp->ssrc + 1); auto ssrc = rtp->getSSRC();
auto rtcp = rtcp_ctx->createRtcpSR(ssrc + 1);
auto rtcp_sdes = RtcpSdes::create({SERVER_NAME}); auto rtcp_sdes = RtcpSdes::create({SERVER_NAME});
rtcp_sdes->items.type = (uint8_t) SdesType::RTCP_SDES_CNAME; rtcp_sdes->items.type = (uint8_t) SdesType::RTCP_SDES_CNAME;
rtcp_sdes->items.ssrc = htonl(rtp->ssrc); rtcp_sdes->items.ssrc = htonl(ssrc);
send_rtcp(this, track_index, std::move(rtcp)); send_rtcp(this, track_index, std::move(rtcp));
send_rtcp(this, track_index, RtcpHeader::toBuffer(rtcp_sdes)); send_rtcp(this, track_index, RtcpHeader::toBuffer(rtcp_sdes));
} }
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#include "RtspSession.h" #include "RtspSession.h"
#include "Util/MD5.h" #include "Util/MD5.h"
#include "Util/base64.h" #include "Util/base64.h"
#include "Rtcp/Rtcp.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
...@@ -171,10 +170,10 @@ void RtspSession::onRtpPacket(const char *data, size_t len) { ...@@ -171,10 +170,10 @@ void RtspSession::onRtpPacket(const char *data, size_t len) {
return; return;
} }
auto track_idx = getTrackIndexByInterleaved(interleaved); auto track_idx = getTrackIndexByInterleaved(interleaved);
handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (uint8_t *) data + 4, len - 4); handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (uint8_t *) data + RtpPacket::kRtpTcpHeaderSize, len - RtpPacket::kRtpTcpHeaderSize);
} else { } else {
auto track_idx = getTrackIndexByInterleaved(interleaved - 1); auto track_idx = getTrackIndexByInterleaved(interleaved - 1);
onRtcpPacket(track_idx, _sdp_track[track_idx], data + 4, len - 4); onRtcpPacket(track_idx, _sdp_track[track_idx], data + RtpPacket::kRtpTcpHeaderSize, len - RtpPacket::kRtpTcpHeaderSize);
} }
} }
...@@ -913,12 +912,6 @@ void RtspSession::send_NotAcceptable() { ...@@ -913,12 +912,6 @@ void RtspSession::send_NotAcceptable() {
} }
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) { void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) {
if (_start_stamp[track_idx] == -1) {
//记录起始时间戳
_start_stamp[track_idx] = rtp->timeStamp;
}
//时间戳增量
rtp->timeStamp -= _start_stamp[track_idx];
_push_src->onWrite(rtp, false); _push_src->onWrite(rtp, false);
} }
...@@ -1130,7 +1123,7 @@ void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){ ...@@ -1130,7 +1123,7 @@ void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){
void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){ void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){
int track_index = getTrackIndexByTrackType(rtp->type); int track_index = getTrackIndexByTrackType(rtp->type);
auto &rtcp_ctx = _rtcp_context[track_index]; auto &rtcp_ctx = _rtcp_context[track_index];
rtcp_ctx->onRtp(rtp->sequence, rtp->timeStamp, rtp->size() - 4); rtcp_ctx->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize);
auto &ticker = _rtcp_send_tickers[track_index]; auto &ticker = _rtcp_send_tickers[track_index];
//send rtcp every 5 second //send rtcp every 5 second
...@@ -1147,10 +1140,11 @@ void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){ ...@@ -1147,10 +1140,11 @@ void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){
} }
}; };
auto rtcp = _push_src ? rtcp_ctx->createRtcpRR(rtp->ssrc + 1, rtp->ssrc) : rtcp_ctx->createRtcpSR(rtp->ssrc + 1); auto ssrc = rtp->getSSRC();
auto rtcp = _push_src ? rtcp_ctx->createRtcpRR(ssrc + 1, ssrc) : rtcp_ctx->createRtcpSR(ssrc + 1);
auto rtcp_sdes = RtcpSdes::create({SERVER_NAME}); auto rtcp_sdes = RtcpSdes::create({SERVER_NAME});
rtcp_sdes->items.type = (uint8_t)SdesType::RTCP_SDES_CNAME; rtcp_sdes->items.type = (uint8_t)SdesType::RTCP_SDES_CNAME;
rtcp_sdes->items.ssrc = htonl(rtp->ssrc); rtcp_sdes->items.ssrc = htonl(ssrc);
send_rtcp(this, track_index, std::move(rtcp)); send_rtcp(this, track_index, std::move(rtcp));
send_rtcp(this, track_index, RtcpHeader::toBuffer(rtcp_sdes)); send_rtcp(this, track_index, RtcpHeader::toBuffer(rtcp_sdes));
} }
......
...@@ -171,8 +171,6 @@ private: ...@@ -171,8 +171,6 @@ private:
Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid; Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid;
//收到的seq,回复时一致 //收到的seq,回复时一致
int _cseq = 0; int _cseq = 0;
//rtsp推流起始时间戳,目的是为了同步
int32_t _start_stamp[2] = {-1, -1};
//消耗的总流量 //消耗的总流量
uint64_t _bytes_usage = 0; uint64_t _bytes_usage = 0;
//ContentBase //ContentBase
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论