Commit c92fc8a4 by xiongguangjie

format code and remove some useless code

parent 83d75c9a
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
namespace SRT { namespace SRT {
bool ACKPacket::loadFromData(uint8_t *buf, size_t len) { bool ACKPacket::loadFromData(uint8_t *buf, size_t len) {
if(len < ACK_CIF_SIZE + ControlPacket::HEADER_SIZE){ if (len < ACK_CIF_SIZE + ControlPacket::HEADER_SIZE) {
return false; return false;
} }
...@@ -11,7 +11,7 @@ bool ACKPacket::loadFromData(uint8_t *buf, size_t len) { ...@@ -11,7 +11,7 @@ bool ACKPacket::loadFromData(uint8_t *buf, size_t len) {
_data->assign((char *)(buf), len); _data->assign((char *)(buf), len);
ControlPacket::loadHeader(); ControlPacket::loadHeader();
ack_number = loadUint32(type_specific_info); ack_number = loadUint32(type_specific_info);
uint8_t* ptr = (uint8_t*)_data->data()+ControlPacket::HEADER_SIZE; uint8_t *ptr = (uint8_t *)_data->data() + ControlPacket::HEADER_SIZE;
last_ack_pkt_seq_number = loadUint32(ptr); last_ack_pkt_seq_number = loadUint32(ptr);
ptr += 4; ptr += 4;
...@@ -43,41 +43,41 @@ bool ACKPacket::storeToData() { ...@@ -43,41 +43,41 @@ bool ACKPacket::storeToData() {
control_type = ControlPacket::ACK; control_type = ControlPacket::ACK;
sub_type = 0; sub_type = 0;
storeUint32(type_specific_info,ack_number); storeUint32(type_specific_info, ack_number);
storeToHeader(); storeToHeader();
uint8_t* ptr = (uint8_t*)_data->data()+ControlPacket::HEADER_SIZE; uint8_t *ptr = (uint8_t *)_data->data() + ControlPacket::HEADER_SIZE;
storeUint32(ptr,last_ack_pkt_seq_number); storeUint32(ptr, last_ack_pkt_seq_number);
ptr += 4; ptr += 4;
storeUint32(ptr,rtt); storeUint32(ptr, rtt);
ptr += 4; ptr += 4;
storeUint32(ptr,rtt_variance); storeUint32(ptr, rtt_variance);
ptr += 4; ptr += 4;
storeUint32(ptr,pkt_recv_rate); storeUint32(ptr, pkt_recv_rate);
ptr += 4; ptr += 4;
storeUint32(ptr,available_buf_size); storeUint32(ptr, available_buf_size);
ptr += 4; ptr += 4;
storeUint32(ptr,estimated_link_capacity); storeUint32(ptr, estimated_link_capacity);
ptr += 4; ptr += 4;
storeUint32(ptr,recv_rate); storeUint32(ptr, recv_rate);
ptr += 4; ptr += 4;
return true; return true;
} }
std::string ACKPacket::dump(){ std::string ACKPacket::dump() {
_StrPrinter printer; _StrPrinter printer;
printer << "last_ack_pkt_seq_number="<<last_ack_pkt_seq_number<<\ printer << "last_ack_pkt_seq_number=" << last_ack_pkt_seq_number << " rtt=" << rtt
" rtt="<<rtt<<" rtt_variance="<<rtt_variance<<\ << " rtt_variance=" << rtt_variance << " pkt_recv_rate=" << pkt_recv_rate
" pkt_recv_rate="<<pkt_recv_rate<<" available_buf_size="<<available_buf_size<<\ << " available_buf_size=" << available_buf_size << " estimated_link_capacity=" << estimated_link_capacity
" estimated_link_capacity="<<estimated_link_capacity<<" recv_rate="<<recv_rate; << " recv_rate=" << recv_rate;
return std::move(printer); return std::move(printer);
} }
} // namespace } // namespace SRT
\ No newline at end of file \ No newline at end of file
...@@ -2,8 +2,7 @@ ...@@ -2,8 +2,7 @@
#define ZLMEDIAKIT_SRT_ACK_H #define ZLMEDIAKIT_SRT_ACK_H
#include "Packet.hpp" #include "Packet.hpp"
namespace SRT {
namespace SRT{
/* /*
0 1 2 3 0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
...@@ -33,16 +32,13 @@ namespace SRT{ ...@@ -33,16 +32,13 @@ namespace SRT{
Figure 13: ACK control packet Figure 13: ACK control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-ack-acknowledgment https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-ack-acknowledgment
*/ */
class ACKPacket : public ControlPacket class ACKPacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<ACKPacket>; using Ptr = std::shared_ptr<ACKPacket>;
ACKPacket() = default; ACKPacket() = default;
~ACKPacket() = default; ~ACKPacket() = default;
enum{ enum { ACK_CIF_SIZE = 7 * 4 };
ACK_CIF_SIZE = 7*4
};
std::string dump(); std::string dump();
///////ControlPacket override/////// ///////ControlPacket override///////
bool loadFromData(uint8_t *buf, size_t len) override; bool loadFromData(uint8_t *buf, size_t len) override;
...@@ -59,15 +55,14 @@ public: ...@@ -59,15 +55,14 @@ public:
uint32_t recv_rate; uint32_t recv_rate;
}; };
class ACKACKPacket : public ControlPacket {
class ACKACKPacket : public ControlPacket{
public: public:
using Ptr = std::shared_ptr<ACKACKPacket>; using Ptr = std::shared_ptr<ACKACKPacket>;
ACKACKPacket() = default; ACKACKPacket() = default;
~ACKACKPacket() = default; ~ACKACKPacket() = default;
///////ControlPacket override/////// ///////ControlPacket override///////
bool loadFromData(uint8_t *buf, size_t len) override{ bool loadFromData(uint8_t *buf, size_t len) override {
if(len < ControlPacket::HEADER_SIZE){ if (len < ControlPacket::HEADER_SIZE) {
return false; return false;
} }
_data = BufferRaw::create(); _data = BufferRaw::create();
...@@ -76,21 +71,20 @@ public: ...@@ -76,21 +71,20 @@ public:
ack_number = loadUint32(type_specific_info); ack_number = loadUint32(type_specific_info);
return true; return true;
} }
bool storeToData() override{ bool storeToData() override {
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->setCapacity(HEADER_SIZE); _data->setCapacity(HEADER_SIZE);
_data->setSize(HEADER_SIZE ); _data->setSize(HEADER_SIZE);
control_type = ControlPacket::ACKACK; control_type = ControlPacket::ACKACK;
sub_type = 0; sub_type = 0;
storeUint32(type_specific_info,ack_number); storeUint32(type_specific_info, ack_number);
storeToHeader(); storeToHeader();
return true; return true;
} }
uint32_t ack_number; uint32_t ack_number;
}; };
} //namespace SRT } // namespace SRT
#endif // ZLMEDIAKIT_SRT_ACK_H #endif // ZLMEDIAKIT_SRT_ACK_H
\ No newline at end of file
#ifndef ZLMEDIAKIT_SRT_COMMON_H #ifndef ZLMEDIAKIT_SRT_COMMON_H
#define ZLMEDIAKIT_SRT_COMMON_H #define ZLMEDIAKIT_SRT_COMMON_H
#if defined(_WIN32)
#include <Iphlpapi.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Iphlpapi.lib")
#else
#include <netdb.h>
#include <sys/socket.h>
#endif // defined(_WIN32)
#include <chrono> #include <chrono>
namespace SRT namespace SRT {
{
using SteadyClock = std::chrono::steady_clock; using SteadyClock = std::chrono::steady_clock;
using TimePoint = std::chrono::time_point<SteadyClock>; using TimePoint = std::chrono::time_point<SteadyClock>;
using Microseconds = std::chrono::microseconds; using Microseconds = std::chrono::microseconds;
using Milliseconds = std::chrono::milliseconds; using Milliseconds = std::chrono::milliseconds;
inline int64_t DurationCountMicroseconds( SteadyClock::duration dur){ inline int64_t DurationCountMicroseconds(SteadyClock::duration dur) {
return std::chrono::duration_cast<std::chrono::microseconds>(dur).count(); return std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
} }
...@@ -37,48 +47,38 @@ inline void storeUint32LE(uint8_t *buf, uint32_t val) { ...@@ -37,48 +47,38 @@ inline void storeUint32LE(uint8_t *buf, uint32_t val) {
buf[0] = val & 0xff; buf[0] = val & 0xff;
buf[1] = (val >> 8) & 0xff; buf[1] = (val >> 8) & 0xff;
buf[2] = (val >> 16) & 0xff; buf[2] = (val >> 16) & 0xff;
buf[3] = (val >>24) & 0xff; buf[3] = (val >> 24) & 0xff;
} }
inline void storeUint16LE(uint8_t *buf, uint16_t val) { inline void storeUint16LE(uint8_t *buf, uint16_t val) {
buf[0] = val & 0xff; buf[0] = val & 0xff;
buf[1] = (val>>8) & 0xff; buf[1] = (val >> 8) & 0xff;
} }
inline uint32_t srtVersion(int major, int minor, int patch) inline uint32_t srtVersion(int major, int minor, int patch) {
{ return patch + minor * 0x100 + major * 0x10000;
return patch + minor*0x100 + major*0x10000;
} }
class UTicker { class UTicker {
public: public:
UTicker() { UTicker() { _created = _begin = SteadyClock::now(); }
_created = _begin = SteadyClock::now();
}
~UTicker() { ~UTicker() {}
}
/** /**
* 获取创建时间,单位微妙 * 获取创建时间,单位微妙
*/ */
int64_t elapsedTime(TimePoint now) const { int64_t elapsedTime(TimePoint now) const { return DurationCountMicroseconds(now - _begin); }
return DurationCountMicroseconds(now - _begin);
}
/** /**
* 获取上次resetTime后至今的时间,单位毫秒 * 获取上次resetTime后至今的时间,单位毫秒
*/ */
int64_t createdTime(TimePoint now) const { int64_t createdTime(TimePoint now) const { return DurationCountMicroseconds(now - _created); }
return DurationCountMicroseconds(now - _created);
}
/** /**
* 重置计时器 * 重置计时器
*/ */
void resetTime(TimePoint now) { void resetTime(TimePoint now) { _begin = now; }
_begin = now;
}
private: private:
TimePoint _begin; TimePoint _begin;
...@@ -87,4 +87,4 @@ private: ...@@ -87,4 +87,4 @@ private:
} // namespace SRT } // namespace SRT
#endif //ZLMEDIAKIT_SRT_COMMON_H #endif // ZLMEDIAKIT_SRT_COMMON_H
\ No newline at end of file \ No newline at end of file
...@@ -2,18 +2,18 @@ ...@@ -2,18 +2,18 @@
namespace SRT { namespace SRT {
bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) { bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) {
if(buf == NULL || len != HSEXT_MSG_SIZE){ if (buf == NULL || len != HSEXT_MSG_SIZE) {
return false; return false;
} }
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->assign((char*)buf,len); _data->assign((char *)buf, len);
extension_length = 3; extension_length = 3;
HSExt::loadHeader(); HSExt::loadHeader();
assert(extension_type == SRT_CMD_HSREQ || extension_type == SRT_CMD_HSRSP); assert(extension_type == SRT_CMD_HSREQ || extension_type == SRT_CMD_HSRSP);
uint8_t* ptr = (uint8_t*)_data->data()+4; uint8_t *ptr = (uint8_t *)_data->data() + 4;
srt_version = loadUint32(ptr); srt_version = loadUint32(ptr);
ptr += 4; ptr += 4;
...@@ -27,105 +27,103 @@ bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) { ...@@ -27,105 +27,103 @@ bool HSExtMessage::loadFromData(uint8_t *buf, size_t len) {
ptr += 2; ptr += 2;
return true; return true;
}
} std::string HSExtMessage::dump() {
std::string HSExtMessage::dump(){
_StrPrinter printer; _StrPrinter printer;
printer << "srt version : "<<std::hex<<srt_version<<" srt flag : "<<std::hex<<srt_flag<<\ printer << "srt version : " << std::hex << srt_version << " srt flag : " << std::hex << srt_flag
" recv_tsbpd_delay="<<recv_tsbpd_delay<<" send_tsbpd_delay = "<<send_tsbpd_delay; << " recv_tsbpd_delay=" << recv_tsbpd_delay << " send_tsbpd_delay = " << send_tsbpd_delay;
return std::move(printer); return std::move(printer);
} }
bool HSExtMessage::storeToData() { bool HSExtMessage::storeToData() {
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->setCapacity(HSEXT_MSG_SIZE); _data->setCapacity(HSEXT_MSG_SIZE);
_data->setSize(HSEXT_MSG_SIZE); _data->setSize(HSEXT_MSG_SIZE);
extension_length = 3; extension_length = 3;
HSExt::storeHeader(); HSExt::storeHeader();
uint8_t* ptr = (uint8_t*)_data->data()+4; uint8_t *ptr = (uint8_t *)_data->data() + 4;
storeUint32(ptr,srt_version); storeUint32(ptr, srt_version);
ptr += 4; ptr += 4;
storeUint32(ptr,srt_flag); storeUint32(ptr, srt_flag);
ptr += 4; ptr += 4;
storeUint16(ptr,recv_tsbpd_delay); storeUint16(ptr, recv_tsbpd_delay);
ptr += 2; ptr += 2;
storeUint16(ptr,send_tsbpd_delay); storeUint16(ptr, send_tsbpd_delay);
ptr += 2; ptr += 2;
return true; return true;
} }
bool HSExtStreamID::loadFromData(uint8_t *buf, size_t len) { bool HSExtStreamID::loadFromData(uint8_t *buf, size_t len) {
if(buf == NULL || len < 4){ if (buf == NULL || len < 4) {
return false; return false;
} }
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->assign((char*)buf,len); _data->assign((char *)buf, len);
HSExt::loadHeader(); HSExt::loadHeader();
size_t content_size = extension_length*4; size_t content_size = extension_length * 4;
if(len < content_size+4){ if (len < content_size + 4) {
return false; return false;
} }
streamid.clear(); streamid.clear();
char* ptr = _data->data()+4; char *ptr = _data->data() + 4;
for(size_t i = 0; i<extension_length; ++i){ for (size_t i = 0; i < extension_length; ++i) {
streamid.push_back(*(ptr+3)); streamid.push_back(*(ptr + 3));
streamid.push_back(*(ptr+2)); streamid.push_back(*(ptr + 2));
streamid.push_back(*(ptr+1)); streamid.push_back(*(ptr + 1));
streamid.push_back(*(ptr)); streamid.push_back(*(ptr));
ptr+=4; ptr += 4;
} }
char zero = 0x00; char zero = 0x00;
if(streamid.back() == zero){ if (streamid.back() == zero) {
streamid.erase(streamid.find_first_of(zero),streamid.size()); streamid.erase(streamid.find_first_of(zero), streamid.size());
} }
return true; return true;
}
}
bool HSExtStreamID::storeToData() { bool HSExtStreamID::storeToData() {
size_t content_size = ((streamid.length()+4)+3)/4*4; size_t content_size = ((streamid.length() + 4) + 3) / 4 * 4;
_data = BufferRaw::create(); _data = BufferRaw::create();
_data->setCapacity(content_size); _data->setCapacity(content_size);
_data->setSize(content_size); _data->setSize(content_size);
extension_length = (content_size-4)/4; extension_length = (content_size - 4) / 4;
extension_type = SRT_CMD_SID; extension_type = SRT_CMD_SID;
HSExt::storeHeader(); HSExt::storeHeader();
auto ptr = _data->data()+4; auto ptr = _data->data() + 4;
memset(ptr,0,extension_length*4); memset(ptr, 0, extension_length * 4);
const char* src = streamid.c_str(); const char *src = streamid.c_str();
for(size_t i = 0; i< streamid.length()/4;++i){ for (size_t i = 0; i < streamid.length() / 4; ++i) {
*ptr = *(src+3+i*4); *ptr = *(src + 3 + i * 4);
ptr++; ptr++;
*ptr = *(src+2+i*4); *ptr = *(src + 2 + i * 4);
ptr++; ptr++;
*ptr = *(src+1+i*4); *ptr = *(src + 1 + i * 4);
ptr++; ptr++;
*ptr = *(src+0+i*4); *ptr = *(src + 0 + i * 4);
ptr++; ptr++;
} }
ptr += 3; ptr += 3;
size_t offset = streamid.length()/4*4; size_t offset = streamid.length() / 4 * 4;
for(size_t i = 0; i<streamid.length()%4;++i){ for (size_t i = 0; i < streamid.length() % 4; ++i) {
*ptr = *(src+offset+i); *ptr = *(src + offset + i);
ptr -= 1; ptr -= 1;
} }
return true; return true;
} }
std::string HSExtStreamID::dump(){ std::string HSExtStreamID::dump() {
_StrPrinter printer; _StrPrinter printer;
printer << " streamid : "<< streamid; printer << " streamid : " << streamid;
return std::move(printer); return std::move(printer);
} }
} // namespace SRT } // namespace SRT
\ No newline at end of file
...@@ -62,8 +62,6 @@ public: ...@@ -62,8 +62,6 @@ public:
uint32_t timestamp; uint32_t timestamp;
uint32_t dst_socket_id; uint32_t dst_socket_id;
TimePoint get_ts; // recv or send time
private: private:
BufferRaw::Ptr _data; BufferRaw::Ptr _data;
}; };
...@@ -182,17 +180,16 @@ public: ...@@ -182,17 +180,16 @@ public:
enum { HS_EXT_FILED_HSREQ = 0x00000001, HS_EXT_FILED_KMREQ = 0x00000002, HS_EXT_FILED_CONFIG = 0x00000004 }; enum { HS_EXT_FILED_HSREQ = 0x00000001, HS_EXT_FILED_KMREQ = 0x00000002, HS_EXT_FILED_CONFIG = 0x00000004 };
HandshakePacket() = default; HandshakePacket() = default;
~HandshakePacket() = default; ~HandshakePacket() = default;
static bool isHandshakePacket(uint8_t *buf, size_t len); static bool isHandshakePacket(uint8_t *buf, size_t len);
static uint32_t getHandshakeType(uint8_t *buf, size_t len); static uint32_t getHandshakeType(uint8_t *buf, size_t len);
static uint32_t getSynCookie(uint8_t *buf, size_t len); static uint32_t getSynCookie(uint8_t *buf, size_t len);
static uint32_t generateSynCookie(struct sockaddr_storage* addr,TimePoint ts,uint32_t current_cookie = 0, int correction = 0); static uint32_t
generateSynCookie(struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie = 0, int correction = 0);
void assignPeerIP(struct sockaddr_storage* addr); void assignPeerIP(struct sockaddr_storage *addr);
///////ControlPacket override/////// ///////ControlPacket override///////
bool loadFromData(uint8_t *buf, size_t len) override; bool loadFromData(uint8_t *buf, size_t len) override;
bool storeToData() override; bool storeToData() override;
...@@ -209,8 +206,9 @@ public: ...@@ -209,8 +206,9 @@ public:
uint8_t peer_ip_addr[16]; uint8_t peer_ip_addr[16];
std::vector<HSExt::Ptr> ext_list; std::vector<HSExt::Ptr> ext_list;
private: private:
bool loadExtMessage(uint8_t *buf,size_t len); bool loadExtMessage(uint8_t *buf, size_t len);
bool storeExtMessage(); bool storeExtMessage();
size_t getExtSize(); size_t getExtSize();
}; };
...@@ -229,8 +227,7 @@ private: ...@@ -229,8 +227,7 @@ private:
Figure 12: Keep-Alive control packet Figure 12: Keep-Alive control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-keep-alive https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-keep-alive
*/ */
class KeepLivePacket : public ControlPacket class KeepLivePacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<KeepLivePacket>; using Ptr = std::shared_ptr<KeepLivePacket>;
KeepLivePacket() = default; KeepLivePacket() = default;
...@@ -265,11 +262,10 @@ An SRT NAK packet is formatted as follows: ...@@ -265,11 +262,10 @@ An SRT NAK packet is formatted as follows:
Figure 14: NAK control packet Figure 14: NAK control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-nak-control-packet https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-nak-control-packet
*/ */
class NAKPacket : public ControlPacket class NAKPacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<NAKPacket>; using Ptr = std::shared_ptr<NAKPacket>;
using LostPair = std::pair<uint32_t,uint32_t>; using LostPair = std::pair<uint32_t, uint32_t>;
NAKPacket() = default; NAKPacket() = default;
~NAKPacket() = default; ~NAKPacket() = default;
std::string dump(); std::string dump();
...@@ -278,11 +274,11 @@ public: ...@@ -278,11 +274,11 @@ public:
bool storeToData() override; bool storeToData() override;
std::list<LostPair> lost_list; std::list<LostPair> lost_list;
private: private:
size_t getCIFSize(); size_t getCIFSize();
}; };
/* /*
0 1 2 3 0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
...@@ -302,9 +298,8 @@ private: ...@@ -302,9 +298,8 @@ private:
Figure 18: Drop Request control packet Figure 18: Drop Request control packet
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-message-drop-request https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-message-drop-request
*/ */
class MsgDropReqPacket : public ControlPacket class MsgDropReqPacket : public ControlPacket {
{ public:
public:
using Ptr = std::shared_ptr<MsgDropReqPacket>; using Ptr = std::shared_ptr<MsgDropReqPacket>;
MsgDropReqPacket() = default; MsgDropReqPacket() = default;
~MsgDropReqPacket() = default; ~MsgDropReqPacket() = default;
...@@ -332,8 +327,7 @@ class MsgDropReqPacket : public ControlPacket ...@@ -332,8 +327,7 @@ class MsgDropReqPacket : public ControlPacket
https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-shutdown https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-shutdown
*/ */
class ShutDownPacket : public ControlPacket class ShutDownPacket : public ControlPacket {
{
public: public:
using Ptr = std::shared_ptr<ShutDownPacket>; using Ptr = std::shared_ptr<ShutDownPacket>;
ShutDownPacket() = default; ShutDownPacket() = default;
...@@ -360,4 +354,4 @@ public: ...@@ -360,4 +354,4 @@ public:
}; };
} // namespace SRT } // namespace SRT
#endif //ZLMEDIAKIT_SRT_PACKET_H #endif // ZLMEDIAKIT_SRT_PACKET_H
\ No newline at end of file \ No newline at end of file
...@@ -4,69 +4,71 @@ namespace SRT { ...@@ -4,69 +4,71 @@ namespace SRT {
#define MAX_SEQ 0x7fffffff #define MAX_SEQ 0x7fffffff
#define MAX_TS 0xffffffff #define MAX_TS 0xffffffff
inline uint32_t genExpectedSeq(uint32_t seq){ inline uint32_t genExpectedSeq(uint32_t seq) {
return MAX_SEQ & seq; return MAX_SEQ & seq;
} }
inline bool isSeqEdge(uint32_t seq,uint32_t cap){ inline bool isSeqEdge(uint32_t seq, uint32_t cap) {
if(seq >(MAX_SEQ - cap)){ if (seq > (MAX_SEQ - cap)) {
return true; return true;
} }
return false; return false;
} }
inline bool isTSCycle(uint32_t first,uint32_t second){ inline bool isTSCycle(uint32_t first, uint32_t second) {
uint32_t diff; uint32_t diff;
if(first>second){ if (first > second) {
diff = first - second; diff = first - second;
}else{ } else {
diff = second - first; diff = second - first;
} }
if(diff > (MAX_TS>>1)){ if (diff > (MAX_TS >> 1)) {
return true; return true;
}else{ } else {
return false; return false;
} }
} }
PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency) PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency)
: _pkt_expected_seq(init_seq) : _pkt_expected_seq(init_seq)
, _pkt_cap(max_size) , _pkt_cap(max_size)
, _pkt_lantency(lantency) { , _pkt_lantency(lantency) {}
} void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt) {
void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt){
if (_pkt_expected_seq <= pkt->packet_seq_number) { if (_pkt_expected_seq <= pkt->packet_seq_number) {
auto diff = pkt->packet_seq_number - _pkt_expected_seq; auto diff = pkt->packet_seq_number - _pkt_expected_seq;
if(diff >= (MAX_SEQ>>1)){ if (diff >= (MAX_SEQ >> 1)) {
TraceL << "drop packet too later for cycle "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; TraceL << "drop packet too later for cycle "
<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number;
return; return;
}else{ } else {
_pkt_map.emplace(pkt->packet_seq_number, pkt); _pkt_map.emplace(pkt->packet_seq_number, pkt);
} }
} else { } else {
auto diff = _pkt_expected_seq - pkt->packet_seq_number; auto diff = _pkt_expected_seq - pkt->packet_seq_number;
if(diff >= (MAX_SEQ>>1)){ if (diff >= (MAX_SEQ >> 1)) {
_pkt_map.emplace(pkt->packet_seq_number, pkt); _pkt_map.emplace(pkt->packet_seq_number, pkt);
TraceL<<" cycle packet "<<"expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; TraceL << " cycle packet "
}else{ << "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number;
//TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; } else {
// TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" <<
// pkt->packet_seq_number;
} }
} }
} }
bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& out) { bool PacketQueue::inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out) {
tryInsertPkt(pkt); tryInsertPkt(pkt);
auto it = _pkt_map.find(_pkt_expected_seq); auto it = _pkt_map.find(_pkt_expected_seq);
while ( it != _pkt_map.end()) { while (it != _pkt_map.end()) {
out.push_back(it->second); out.push_back(it->second);
_pkt_map.erase(it); _pkt_map.erase(it);
_pkt_expected_seq = genExpectedSeq(_pkt_expected_seq+1); _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1);
it = _pkt_map.find(_pkt_expected_seq); it = _pkt_map.find(_pkt_expected_seq);
} }
while (_pkt_map.size() > _pkt_cap) { while (_pkt_map.size() > _pkt_cap) {
// 防止回环 // 防止回环
it = _pkt_map.find(_pkt_expected_seq); it = _pkt_map.find(_pkt_expected_seq);
if(it != _pkt_map.end()){ if (it != _pkt_map.end()) {
out.push_back(it->second); out.push_back(it->second);
_pkt_map.erase(it); _pkt_map.erase(it);
} }
...@@ -75,7 +77,7 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& ou ...@@ -75,7 +77,7 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& ou
while (timeLantency() > _pkt_lantency) { while (timeLantency() > _pkt_lantency) {
it = _pkt_map.find(_pkt_expected_seq); it = _pkt_map.find(_pkt_expected_seq);
if(it != _pkt_map.end()){ if (it != _pkt_map.end()) {
out.push_back(it->second); out.push_back(it->second);
_pkt_map.erase(it); _pkt_map.erase(it);
} }
...@@ -85,16 +87,16 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& ou ...@@ -85,16 +87,16 @@ bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& ou
return true; return true;
} }
bool PacketQueue::drop(uint32_t first, uint32_t last,std::list<DataPacket::Ptr>& out){ bool PacketQueue::drop(uint32_t first, uint32_t last, std::list<DataPacket::Ptr> &out) {
uint32_t end = genExpectedSeq(last+1); uint32_t end = genExpectedSeq(last + 1);
decltype(_pkt_map.end()) it; decltype(_pkt_map.end()) it;
for(uint32_t i =_pkt_expected_seq;i< end;){ for (uint32_t i = _pkt_expected_seq; i < end;) {
it = _pkt_map.find(i); it = _pkt_map.find(i);
if(it != _pkt_map.end()){ if (it != _pkt_map.end()) {
out.push_back(it->second); out.push_back(it->second);
_pkt_map.erase(it); _pkt_map.erase(it);
} }
i = genExpectedSeq(i+1); i = genExpectedSeq(i + 1);
} }
_pkt_expected_seq = end; _pkt_expected_seq = end;
return true; return true;
...@@ -108,15 +110,15 @@ uint32_t PacketQueue::timeLantency() { ...@@ -108,15 +110,15 @@ uint32_t PacketQueue::timeLantency() {
auto first = _pkt_map.begin()->second->timestamp; auto first = _pkt_map.begin()->second->timestamp;
auto last = _pkt_map.rbegin()->second->timestamp; auto last = _pkt_map.rbegin()->second->timestamp;
uint32_t dur; uint32_t dur;
if(last>first){ if (last > first) {
dur = last - first; dur = last - first;
}else{ } else {
dur = first - last; dur = first - last;
} }
if(dur > 0x80000000){ if (dur > 0x80000000) {
dur = MAX_TS - dur; dur = MAX_TS - dur;
WarnL<<"cycle dur "<<dur; WarnL << "cycle dur " << dur;
} }
return dur; return dur;
...@@ -124,16 +126,16 @@ uint32_t PacketQueue::timeLantency() { ...@@ -124,16 +126,16 @@ uint32_t PacketQueue::timeLantency() {
std::list<PacketQueue::LostPair> PacketQueue::getLostSeq() { std::list<PacketQueue::LostPair> PacketQueue::getLostSeq() {
std::list<PacketQueue::LostPair> re; std::list<PacketQueue::LostPair> re;
if(_pkt_map.empty()){ if (_pkt_map.empty()) {
return re; return re;
} }
if(getExpectedSize() == getSize()){ if (getExpectedSize() == getSize()) {
return re; return re;
} }
uint32_t end = 0; uint32_t end = 0;
uint32_t first,last; uint32_t first, last;
first = _pkt_map.begin()->second->packet_seq_number; first = _pkt_map.begin()->second->packet_seq_number;
last = _pkt_map.rbegin()->second->packet_seq_number; last = _pkt_map.rbegin()->second->packet_seq_number;
...@@ -149,70 +151,73 @@ std::list<PacketQueue::LostPair> PacketQueue::getLostSeq() { ...@@ -149,70 +151,73 @@ std::list<PacketQueue::LostPair> PacketQueue::getLostSeq() {
uint32_t i = _pkt_expected_seq; uint32_t i = _pkt_expected_seq;
bool finish = true; bool finish = true;
for(i = _pkt_expected_seq;i<=end;){ for (i = _pkt_expected_seq; i <= end;) {
if(_pkt_map.find(i) == _pkt_map.end()){ if (_pkt_map.find(i) == _pkt_map.end()) {
if(finish){ if (finish) {
finish = false; finish = false;
lost.first = i; lost.first = i;
lost.second = i+1; lost.second = i + 1;
}else{ } else {
lost.second = i+1; lost.second = i + 1;
} }
}else{ } else {
if(!finish){ if (!finish) {
finish = true; finish = true;
re.push_back(lost); re.push_back(lost);
} }
} }
i = genExpectedSeq(i+1); i = genExpectedSeq(i + 1);
} }
return re; return re;
} }
size_t PacketQueue::getSize(){ size_t PacketQueue::getSize() {
return _pkt_map.size(); return _pkt_map.size();
} }
size_t PacketQueue::getExpectedSize() { size_t PacketQueue::getExpectedSize() {
if(_pkt_map.empty()){ if (_pkt_map.empty()) {
return 0; return 0;
} }
uint32_t max = _pkt_map.rbegin()->first; uint32_t max = _pkt_map.rbegin()->first;
uint32_t min = _pkt_map.begin()->first; uint32_t min = _pkt_map.begin()->first;
if((max-min)>=(MAX_SEQ>>1)){ if ((max - min) >= (MAX_SEQ >> 1)) {
TraceL<<"cycle "<<"expected seq "<<_pkt_expected_seq<<" min "<<min<<" max "<<max<<" size "<<_pkt_map.size(); TraceL << "cycle "
return MAX_SEQ-_pkt_expected_seq+min+1; << "expected seq " << _pkt_expected_seq << " min " << min << " max " << max << " size "
}else{ << _pkt_map.size();
return max-_pkt_expected_seq+1; return MAX_SEQ - _pkt_expected_seq + min + 1;
} else {
return max - _pkt_expected_seq + 1;
} }
} }
size_t PacketQueue::getAvailableBufferSize(){ size_t PacketQueue::getAvailableBufferSize() {
auto size = getExpectedSize(); auto size = getExpectedSize();
if(_pkt_cap > size){ if (_pkt_cap > size) {
return _pkt_cap - size; return _pkt_cap - size;
} }
if(_pkt_cap > _pkt_map.size()){ if (_pkt_cap > _pkt_map.size()) {
return _pkt_cap - _pkt_map.size(); return _pkt_cap - _pkt_map.size();
} }
WarnL<<" cap "<<_pkt_cap<<" expected size "<<size<<" map size "<<_pkt_map.size(); WarnL << " cap " << _pkt_cap << " expected size " << size << " map size " << _pkt_map.size();
return _pkt_cap; return _pkt_cap;
} }
uint32_t PacketQueue::getExpectedSeq(){ uint32_t PacketQueue::getExpectedSeq() {
return _pkt_expected_seq; return _pkt_expected_seq;
} }
std::string PacketQueue::dump(){ std::string PacketQueue::dump() {
_StrPrinter printer; _StrPrinter printer;
if(_pkt_map.empty()){ if (_pkt_map.empty()) {
printer<<" expected seq :"<<_pkt_expected_seq; printer << " expected seq :" << _pkt_expected_seq;
}else{ } else {
printer<<" expected seq :"<<_pkt_expected_seq<<" size:"<<_pkt_map.size()<<" first:"<<_pkt_map.begin()->second->packet_seq_number; printer << " expected seq :" << _pkt_expected_seq << " size:" << _pkt_map.size()
printer<<" last:"<<_pkt_map.rbegin()->second->packet_seq_number; << " first:" << _pkt_map.begin()->second->packet_seq_number;
printer<<" latency:"<<timeLantency()/1e3; printer << " last:" << _pkt_map.rbegin()->second->packet_seq_number;
printer << " latency:" << timeLantency() / 1e3;
} }
return std::move(printer); return std::move(printer);
} }
......
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
#include "Packet.hpp" #include "Packet.hpp"
#include <algorithm> #include <algorithm>
#include <list> #include <list>
#include <memory>
#include <map> #include <map>
#include <memory>
#include <tuple> #include <tuple>
#include <utility> #include <utility>
...@@ -18,7 +18,7 @@ public: ...@@ -18,7 +18,7 @@ public:
PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency); PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency);
~PacketQueue() = default; ~PacketQueue() = default;
bool inputPacket(DataPacket::Ptr pkt,std::list<DataPacket::Ptr>& out); bool inputPacket(DataPacket::Ptr pkt, std::list<DataPacket::Ptr> &out);
uint32_t timeLantency(); uint32_t timeLantency();
std::list<LostPair> getLostSeq(); std::list<LostPair> getLostSeq();
...@@ -28,13 +28,14 @@ public: ...@@ -28,13 +28,14 @@ public:
size_t getAvailableBufferSize(); size_t getAvailableBufferSize();
uint32_t getExpectedSeq(); uint32_t getExpectedSeq();
bool drop(uint32_t first, uint32_t last,std::list<DataPacket::Ptr>& out); bool drop(uint32_t first, uint32_t last, std::list<DataPacket::Ptr> &out);
std::string dump(); std::string dump();
private: private:
void tryInsertPkt(DataPacket::Ptr pkt); void tryInsertPkt(DataPacket::Ptr pkt);
private:
private:
std::map<uint32_t, DataPacket::Ptr> _pkt_map; std::map<uint32_t, DataPacket::Ptr> _pkt_map;
uint32_t _pkt_expected_seq = 0; uint32_t _pkt_expected_seq = 0;
......
...@@ -66,7 +66,7 @@ uint32_t PacketSendQueue::timeLantency() { ...@@ -66,7 +66,7 @@ uint32_t PacketSendQueue::timeLantency() {
} else { } else {
dur = first - last; dur = first - last;
} }
if (dur > (0x01 << 31)) { if (dur > ((uint32_t)0x01 << 31)) {
TraceL << "cycle timeLantency " << dur; TraceL << "cycle timeLantency " << dur;
dur = 0xffffffff - dur; dur = 0xffffffff - dur;
} }
......
...@@ -16,9 +16,11 @@ public: ...@@ -16,9 +16,11 @@ public:
~PacketSendQueue() = default; ~PacketSendQueue() = default;
bool drop(uint32_t num); bool drop(uint32_t num);
bool inputPacket(DataPacket::Ptr pkt); bool inputPacket(DataPacket::Ptr pkt);
std::list<DataPacket::Ptr> findPacketBySeq(uint32_t start,uint32_t end); std::list<DataPacket::Ptr> findPacketBySeq(uint32_t start, uint32_t end);
private: private:
uint32_t timeLantency(); uint32_t timeLantency();
private: private:
std::list<DataPacket::Ptr> _pkt_cache; std::list<DataPacket::Ptr> _pkt_cache;
uint32_t _pkt_cap; uint32_t _pkt_cap;
......
...@@ -10,10 +10,10 @@ using namespace mediakit; ...@@ -10,10 +10,10 @@ using namespace mediakit;
SrtSession::SrtSession(const Socket::Ptr &sock) SrtSession::SrtSession(const Socket::Ptr &sock)
: UdpSession(sock) { : UdpSession(sock) {
socklen_t addr_len = sizeof(_peer_addr); socklen_t addr_len = sizeof(_peer_addr);
memset(&_peer_addr,0,addr_len); memset(&_peer_addr, 0, addr_len);
//TraceL<<"before addr len "<<addr_len; // TraceL<<"before addr len "<<addr_len;
getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len); getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
//TraceL<<"after addr len "<<addr_len<<" family "<<_peer_addr.ss_family; // TraceL<<"after addr len "<<addr_len<<" family "<<_peer_addr.ss_family;
} }
SrtSession::~SrtSession() { SrtSession::~SrtSession() {
...@@ -21,40 +21,40 @@ SrtSession::~SrtSession() { ...@@ -21,40 +21,40 @@ SrtSession::~SrtSession() {
} }
EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) { EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) {
uint8_t* data = (uint8_t*)buffer->data(); uint8_t *data = (uint8_t *)buffer->data();
size_t size = buffer->size(); size_t size = buffer->size();
if(DataPacket::isDataPacket(data,size)){ if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data,size); uint32_t socket_id = DataPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr; return trans ? trans->getPoller() : nullptr;
} }
if(HandshakePacket::isHandshakePacket(data,size)){ if (HandshakePacket::isHandshakePacket(data, size)) {
auto type = HandshakePacket::getHandshakeType(data,size); auto type = HandshakePacket::getHandshakeType(data, size);
if(type == HandshakePacket::HS_TYPE_INDUCTION){ if (type == HandshakePacket::HS_TYPE_INDUCTION) {
// 握手第一阶段 // 握手第一阶段
return nullptr; return nullptr;
}else if(type == HandshakePacket::HS_TYPE_CONCLUSION){ } else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
// 握手第二阶段 // 握手第二阶段
uint32_t sync_cookie = HandshakePacket::getSynCookie(data,size); uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie)); auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
return trans ? trans->getPoller() : nullptr; return trans ? trans->getPoller() : nullptr;
}else{ } else {
WarnL<<" not reach there"; WarnL << " not reach there";
} }
}else{ } else {
uint32_t socket_id = ControlPacket::getSocketID(data,size); uint32_t socket_id = ControlPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr; return trans ? trans->getPoller() : nullptr;
} }
return nullptr; return nullptr;
} }
void SrtSession::attachServer(const toolkit::Server &server){ void SrtSession::attachServer(const toolkit::Server &server) {
SockUtil::setRecvBuf(getSock()->rawFD(),1024 * 1024); SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024);
} }
void SrtSession::onRecv(const Buffer::Ptr &buffer) { void SrtSession::onRecv(const Buffer::Ptr &buffer) {
uint8_t* data = (uint8_t*)buffer->data(); uint8_t *data = (uint8_t *)buffer->data();
size_t size = buffer->size(); size_t size = buffer->size();
if (_find_transport) { if (_find_transport) {
...@@ -64,10 +64,10 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) { ...@@ -64,10 +64,10 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) {
if (DataPacket::isDataPacket(data, size)) { if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data, size); uint32_t socket_id = DataPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
if(trans){ if (trans) {
_transport = std::move(trans); _transport = std::move(trans);
}else{ } else {
WarnL<<" data packet not find transport "; WarnL << " data packet not find transport ";
} }
} }
...@@ -92,24 +92,24 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) { ...@@ -92,24 +92,24 @@ void SrtSession::onRecv(const Buffer::Ptr &buffer) {
} else { } else {
uint32_t socket_id = ControlPacket::getSocketID(data, size); uint32_t socket_id = ControlPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id)); auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
if(trans){ if (trans) {
_transport = std::move(trans); _transport = std::move(trans);
}else{ } else {
WarnL << " not find transport"; WarnL << " not find transport";
} }
} }
if(_transport){ if (_transport) {
_transport->setSession(shared_from_this()); _transport->setSession(shared_from_this());
} }
InfoP(this); InfoP(this);
} }
_ticker.resetTime(); _ticker.resetTime();
if(_transport){ if (_transport) {
_transport->inputSockData(data,size,&_peer_addr); _transport->inputSockData(data, size, &_peer_addr);
}else{ } else {
//WarnL<< "ingore data"; // WarnL<< "ingore data";
} }
} }
...@@ -125,15 +125,17 @@ void SrtSession::onError(const SockException &err) { ...@@ -125,15 +125,17 @@ void SrtSession::onError(const SockException &err) {
// 防止互相引用导致不释放 // 防止互相引用导致不释放
auto transport = std::move(_transport); auto transport = std::move(_transport);
getPoller()->async([transport,err] { getPoller()->async(
[transport, err] {
//延时减引用,防止使用transport对象时,销毁对象 //延时减引用,防止使用transport对象时,销毁对象
transport->onShutdown(err); transport->onShutdown(err);
}, false); },
false);
} }
void SrtSession::onManager() { void SrtSession::onManager() {
GET_CONFIG(float, timeoutSec, kTimeOutSec); GET_CONFIG(float, timeoutSec, kTimeOutSec);
if (_ticker.elapsedTime() > timeoutSec*1000) { if (_ticker.elapsedTime() > timeoutSec * 1000) {
shutdown(SockException(Err_timeout, "srt connection timeout")); shutdown(SockException(Err_timeout, "srt connection timeout"));
return; return;
} }
......
...@@ -24,8 +24,7 @@ private: ...@@ -24,8 +24,7 @@ private:
Ticker _ticker; Ticker _ticker;
struct sockaddr_storage _peer_addr; struct sockaddr_storage _peer_addr;
SrtTransport::Ptr _transport; SrtTransport::Ptr _transport;
}; };
} // namespace SRT } // namespace SRT
#endif //ZLMEDIAKIT_SRT_SESSION_H #endif // ZLMEDIAKIT_SRT_SESSION_H
\ No newline at end of file \ No newline at end of file
#ifndef ZLMEDIAKIT_SRT_TRANSPORT_H #ifndef ZLMEDIAKIT_SRT_TRANSPORT_H
#define ZLMEDIAKIT_SRT_TRANSPORT_H #define ZLMEDIAKIT_SRT_TRANSPORT_H
#include <mutex> #include <atomic>
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <atomic> #include <mutex>
#include "Network/Session.h" #include "Network/Session.h"
#include "Poller/EventPoller.h" #include "Poller/EventPoller.h"
...@@ -46,23 +46,22 @@ public: ...@@ -46,23 +46,22 @@ public:
void unregisterSelfHandshake(); void unregisterSelfHandshake();
void unregisterSelf(); void unregisterSelf();
protected: protected:
virtual void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr){}; virtual void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) {};
virtual void onSRTData(DataPacket::Ptr pkt){}; virtual void onSRTData(DataPacket::Ptr pkt) {};
virtual void onShutdown(const SockException &ex); virtual void onShutdown(const SockException &ex);
virtual bool isPusher(){ virtual bool isPusher() { return true; };
return true;
};
private: private:
void registerSelfHandshake(); void registerSelfHandshake();
void registerSelf(); void registerSelf();
void switchToOtherTransport(uint8_t *buf, int len,uint32_t socketid, struct sockaddr_storage *addr); void switchToOtherTransport(uint8_t *buf, int len, uint32_t socketid, struct sockaddr_storage *addr);
void handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr);
void handleHandshakeInduction(HandshakePacket& pkt,struct sockaddr_storage *addr); void handleHandshakeInduction(HandshakePacket &pkt, struct sockaddr_storage *addr);
void handleHandshakeConclusion(HandshakePacket& pkt,struct sockaddr_storage *addr); void handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr);
void handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr);
void handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr);
...@@ -75,26 +74,26 @@ private: ...@@ -75,26 +74,26 @@ private:
void handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr); void handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr);
void handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr);
void sendNAKPacket(std::list<PacketQueue::LostPair>& lost_list); void sendNAKPacket(std::list<PacketQueue::LostPair> &lost_list);
void sendACKPacket(); void sendACKPacket();
void sendLightACKPacket(); void sendLightACKPacket();
void sendKeepLivePacket(); void sendKeepLivePacket();
void sendShutDown(); void sendShutDown();
void sendMsgDropReq(uint32_t first ,uint32_t last); void sendMsgDropReq(uint32_t first, uint32_t last);
size_t getPayloadSize(); size_t getPayloadSize();
protected: protected:
void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); void sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush = false);
void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true); void sendControlPacket(ControlPacket::Ptr pkt, bool flush = true);
virtual void sendPacket(Buffer::Ptr pkt,bool flush = true); virtual void sendPacket(Buffer::Ptr pkt, bool flush = true);
virtual int getLantencyMul(){ virtual int getLantencyMul() { return 4; };
return 4;
};
private: private:
//当前选中的udp链接 //当前选中的udp链接
Session::Ptr _selected_session; Session::Ptr _selected_session;
//链接迁移前后使用过的udp链接 //链接迁移前后使用过的udp链接
std::unordered_map<Session *, std::weak_ptr<Session> > _history_sessions; std::unordered_map<Session *, std::weak_ptr<Session>> _history_sessions;
EventPoller::Ptr _poller; EventPoller::Ptr _poller;
...@@ -119,13 +118,13 @@ private: ...@@ -119,13 +118,13 @@ private:
PacketSendQueue::Ptr _send_buf; PacketSendQueue::Ptr _send_buf;
uint32_t _buf_delay = 120; uint32_t _buf_delay = 120;
PacketQueue::Ptr _recv_buf; PacketQueue::Ptr _recv_buf;
uint32_t _rtt = 100*1000; uint32_t _rtt = 100 * 1000;
uint32_t _rtt_variance =50*1000; uint32_t _rtt_variance = 50 * 1000;
uint32_t _light_ack_pkt_count = 0; uint32_t _light_ack_pkt_count = 0;
uint32_t _ack_number_count = 0; uint32_t _ack_number_count = 0;
uint32_t _last_ack_pkt_seq_num = 0; uint32_t _last_ack_pkt_seq_num = 0;
UTicker _ack_ticker; UTicker _ack_ticker;
std::map<uint32_t,TimePoint> _ack_send_timestamp; std::map<uint32_t, TimePoint> _ack_send_timestamp;
std::shared_ptr<PacketRecvRateContext> _pkt_recv_rate_context; std::shared_ptr<PacketRecvRateContext> _pkt_recv_rate_context;
std::shared_ptr<EstimatedLinkCapacityContext> _estimated_link_capacity_context; std::shared_ptr<EstimatedLinkCapacityContext> _estimated_link_capacity_context;
...@@ -137,7 +136,6 @@ private: ...@@ -137,7 +136,6 @@ private:
HandshakePacket::Ptr _handleshake_res; HandshakePacket::Ptr _handleshake_res;
ResourcePool<BufferRaw> _packet_pool; ResourcePool<BufferRaw> _packet_pool;
}; };
class SrtTransportManager { class SrtTransportManager {
...@@ -150,6 +148,7 @@ public: ...@@ -150,6 +148,7 @@ public:
void addHandshakeItem(const std::string &key, const SrtTransport::Ptr &ptr); void addHandshakeItem(const std::string &key, const SrtTransport::Ptr &ptr);
void removeHandshakeItem(const std::string &key); void removeHandshakeItem(const std::string &key);
SrtTransport::Ptr getHandshakeItem(const std::string &key); SrtTransport::Ptr getHandshakeItem(const std::string &key);
private: private:
SrtTransportManager() = default; SrtTransportManager() = default;
......
...@@ -10,60 +10,59 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) ...@@ -10,60 +10,59 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller)
SrtTransportImp::~SrtTransportImp() { SrtTransportImp::~SrtTransportImp() {
InfoP(this); InfoP(this);
uint64_t duration = _alive_ticker.createdTime() / 1000; uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info._vhost << "/" << _media_info._app << "/"
<< _media_info._vhost << "/" << _media_info._streamid << ")断开,耗时(s):" << duration;
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开,耗时(s):" << duration;
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_total_bytes >= iFlowThreshold * 1024) { if (_total_bytes >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this)); NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false,
static_cast<SockInfo &>(*this));
} }
} }
void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_storage *addr) { void SrtTransportImp::onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) {
// TODO parse streamid like this zlmediakit.com/live/test?token=1213444&type=push // TODO parse streamid like this zlmediakit.com/live/test?token=1213444&type=push
if(!_addr){ if (!_addr) {
_addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr))); _addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr)));
} }
_is_pusher = false; _is_pusher = false;
TraceL<<" stream id "<<streamid; TraceL << " stream id " << streamid;
if(streamid.empty()){ if (streamid.empty()) {
onShutdown(SockException(Err_shutdown, "streamid not empty")); onShutdown(SockException(Err_shutdown, "streamid not empty"));
return; return;
} }
_media_info.parse("srt://"+streamid); _media_info.parse("srt://" + streamid);
auto params = Parser::parseArgs(_media_info._param_strs); auto params = Parser::parseArgs(_media_info._param_strs);
if(params["type"] == "push"){ if (params["type"] == "push") {
_is_pusher = true; _is_pusher = true;
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this); _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this);
emitOnPublish(); emitOnPublish();
}else{ } else {
_is_pusher = false; _is_pusher = false;
emitOnPlay(); emitOnPlay();
} }
} }
void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) { void SrtTransportImp::onSRTData(DataPacket::Ptr pkt) {
if(!_is_pusher){ if (!_is_pusher) {
WarnP(this)<<"this is a player data ignore"; WarnP(this) << "this is a player data ignore";
return; return;
} }
if (_decoder) { if (_decoder) {
_decoder->input(reinterpret_cast<const uint8_t *>(pkt->payloadData()), pkt->payloadSize()); _decoder->input(reinterpret_cast<const uint8_t *>(pkt->payloadData()), pkt->payloadSize());
}else{ } else {
WarnP(this)<<" not reach this"; WarnP(this) << " not reach this";
} }
} }
void SrtTransportImp::onShutdown(const SockException &ex) { void SrtTransportImp::onShutdown(const SockException &ex) {
SrtTransport::onShutdown(ex); SrtTransport::onShutdown(ex);
} }
bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) {
if (!force && totalReaderCount(sender)) { if (!force && totalReaderCount(sender)) {
return false; return false;
} }
...@@ -81,19 +80,19 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){ ...@@ -81,19 +80,19 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force){
return true; return true;
} }
// 播放总人数 // 播放总人数
int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender){ int SrtTransportImp::totalReaderCount(mediakit::MediaSource &sender) {
return _muxer ? _muxer->totalReaderCount() : sender.readerCount(); return _muxer ? _muxer->totalReaderCount() : sender.readerCount();
} }
// 获取媒体源类型 // 获取媒体源类型
mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const{ mediakit::MediaOriginType SrtTransportImp::getOriginType(mediakit::MediaSource &sender) const {
return MediaOriginType::srt_push; return MediaOriginType::srt_push;
} }
// 获取媒体源url或者文件路径 // 获取媒体源url或者文件路径
std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const{ std::string SrtTransportImp::getOriginUrl(mediakit::MediaSource &sender) const {
return _media_info._full_url; return _media_info._full_url;
} }
// 获取媒体源客户端相关信息 // 获取媒体源客户端相关信息
std::shared_ptr<SockInfo> SrtTransportImp::getOriginSock(mediakit::MediaSource &sender) const{ std::shared_ptr<SockInfo> SrtTransportImp::getOriginSock(mediakit::MediaSource &sender) const {
return static_pointer_cast<SockInfo>(getSession()); return static_pointer_cast<SockInfo>(getSession());
} }
...@@ -105,50 +104,51 @@ void SrtTransportImp::emitOnPublish() { ...@@ -105,50 +104,51 @@ void SrtTransportImp::emitOnPublish() {
return; return;
} }
if (err.empty()) { if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost, strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(
strong_self->_media_info._app, strong_self->_media_info._vhost, strong_self->_media_info._app, strong_self->_media_info._streamid,
strong_self->_media_info._streamid, 0.0f, 0.0f, option);
option);
strong_self->_muxer->setMediaListener(strong_self); strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc(); strong_self->doCachedFunc();
InfoP(strong_self) << "允许 srt 推流"; InfoP(strong_self) << "允许 srt 推流";
} else { } else {
WarnP(strong_self) << "禁止 srt 推流:" << err; WarnP(strong_self) << "禁止 srt 推流:" << err;
strong_self->onShutdown(SockException(Err_refused,err)); strong_self->onShutdown(SockException(Err_refused, err));
} }
}; };
//触发推流鉴权事件 //触发推流鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker, static_cast<SockInfo &>(*this)); auto flag = NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker,
static_cast<SockInfo &>(*this));
if (!flag) { if (!flag) {
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
invoker("", ProtocolOption()); invoker("", ProtocolOption());
} }
} }
void SrtTransportImp::emitOnPlay() {
void SrtTransportImp::emitOnPlay(){
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
Broadcast::AuthInvoker invoker = [weak_self](const string &err){ Broadcast::AuthInvoker invoker = [weak_self](const string &err) {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
return; return;
} }
strong_self->getPoller()->async([strong_self,err]{ strong_self->getPoller()->async([strong_self, err] {
if(err != ""){ if (err != "") {
strong_self->onShutdown(SockException(Err_refused,err)); strong_self->onShutdown(SockException(Err_refused, err));
}else{ } else {
strong_self->doPlay(); strong_self->doPlay();
} }
}); });
}; };
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this)); auto flag = NoticeCenter::Instance().emitEvent(
if(!flag){ Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this));
if (!flag) {
doPlay(); doPlay();
} }
} }
void SrtTransportImp::doPlay(){ void SrtTransportImp::doPlay() {
//异步查找直播流 //异步查找直播流
std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); std::weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
...@@ -158,15 +158,15 @@ void SrtTransportImp::doPlay(){ ...@@ -158,15 +158,15 @@ void SrtTransportImp::doPlay(){
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
//本对象已经销毁 //本对象已经销毁
TraceL<<"本对象已经销毁"; TraceL << "本对象已经销毁";
return; return;
} }
if (!src) { if (!src) {
//未找到该流 //未找到该流
TraceL<<"未找到该流"; TraceL << "未找到该流";
strong_self->onShutdown(SockException(Err_shutdown)); strong_self->onShutdown(SockException(Err_shutdown));
} else { } else {
TraceL<<"找到该流"; TraceL << "找到该流";
auto ts_src = dynamic_pointer_cast<TSMediaSource>(src); auto ts_src = dynamic_pointer_cast<TSMediaSource>(src);
assert(ts_src); assert(ts_src);
ts_src->pause(false); ts_src->pause(false);
...@@ -236,9 +236,7 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) { ...@@ -236,9 +236,7 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
} }
auto frame_cached = Frame::getCacheAbleFrame(frame); auto frame_cached = Frame::getCacheAbleFrame(frame);
lock_guard<recursive_mutex> lck(_func_mtx); lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, frame_cached]() { _cached_func.emplace_back([this, frame_cached]() { _muxer->inputFrame(frame_cached); });
_muxer->inputFrame(frame_cached);
});
return true; return true;
} }
...@@ -248,9 +246,7 @@ bool SrtTransportImp::addTrack(const Track::Ptr &track) { ...@@ -248,9 +246,7 @@ bool SrtTransportImp::addTrack(const Track::Ptr &track) {
} }
lock_guard<recursive_mutex> lck(_func_mtx); lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, track]() { _cached_func.emplace_back([this, track]() { _muxer->addTrack(track); });
_muxer->addTrack(track);
});
return true; return true;
} }
...@@ -259,9 +255,7 @@ void SrtTransportImp::addTrackCompleted() { ...@@ -259,9 +255,7 @@ void SrtTransportImp::addTrackCompleted() {
_muxer->addTrackCompleted(); _muxer->addTrackCompleted();
} else { } else {
lock_guard<recursive_mutex> lck(_func_mtx); lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this]() { _cached_func.emplace_back([this]() { _muxer->addTrackCompleted(); });
_muxer->addTrackCompleted();
});
} }
} }
...@@ -273,10 +267,9 @@ void SrtTransportImp::doCachedFunc() { ...@@ -273,10 +267,9 @@ void SrtTransportImp::doCachedFunc() {
_cached_func.clear(); _cached_func.clear();
} }
int SrtTransportImp::getLantencyMul(){ int SrtTransportImp::getLantencyMul() {
GET_CONFIG(int, lantencyMul, kLantencyMul); GET_CONFIG(int, lantencyMul, kLantencyMul);
return lantencyMul; return lantencyMul;
} }
} // namespace SRT } // namespace SRT
\ No newline at end of file
#ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H #ifndef ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H #define ZLMEDIAKIT_SRT_TRANSPORT_IMP_H
#include <mutex>
#include "Common/MultiMediaSourceMuxer.h" #include "Common/MultiMediaSourceMuxer.h"
#include "Rtp/Decoder.h" #include "Rtp/Decoder.h"
#include "TS/TSMediaSource.h"
#include "SrtTransport.hpp" #include "SrtTransport.hpp"
#include "TS/TSMediaSource.h"
#include <mutex>
namespace SRT { namespace SRT {
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
using namespace std; using namespace std;
class SrtTransportImp class SrtTransportImp
: public SrtTransport : public SrtTransport
, public toolkit::SockInfo , public toolkit::SockInfo
...@@ -19,13 +18,11 @@ class SrtTransportImp ...@@ -19,13 +18,11 @@ class SrtTransportImp
public: public:
SrtTransportImp(const EventPoller::Ptr &poller); SrtTransportImp(const EventPoller::Ptr &poller);
~SrtTransportImp(); ~SrtTransportImp();
void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr){ void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) {
SrtTransport::inputSockData(buf,len,addr); SrtTransport::inputSockData(buf, len, addr);
_total_bytes += len; _total_bytes += len;
} }
void onSendTSData(const Buffer::Ptr &buffer, bool flush){ void onSendTSData(const Buffer::Ptr &buffer, bool flush) { SrtTransport::onSendTSData(buffer, flush); }
SrtTransport::onSendTSData(buffer,flush);
}
/// SockInfo override /// SockInfo override
std::string get_local_ip() override; std::string get_local_ip() override;
uint16_t get_local_port() override; uint16_t get_local_port() override;
...@@ -35,19 +32,17 @@ public: ...@@ -35,19 +32,17 @@ public:
protected: protected:
///////SrtTransport override/////// ///////SrtTransport override///////
void onHandShakeFinished(std::string& streamid,struct sockaddr_storage *addr) override; void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override;
void onSRTData(DataPacket::Ptr pkt) override; void onSRTData(DataPacket::Ptr pkt) override;
void onShutdown(const SockException &ex) override; void onShutdown(const SockException &ex) override;
int getLantencyMul() override; int getLantencyMul() override;
void sendPacket(Buffer::Ptr pkt,bool flush = true) override{ void sendPacket(Buffer::Ptr pkt, bool flush = true) override {
_total_bytes += pkt->size(); _total_bytes += pkt->size();
SrtTransport::sendPacket(pkt,flush); SrtTransport::sendPacket(pkt, flush);
}; };
bool isPusher() override{ bool isPusher() override { return _is_pusher; }
return _is_pusher;
}
///////MediaSourceEvent override/////// ///////MediaSourceEvent override///////
// 关闭 // 关闭
...@@ -62,7 +57,7 @@ protected: ...@@ -62,7 +57,7 @@ protected:
std::shared_ptr<SockInfo> getOriginSock(mediakit::MediaSource &sender) const override; std::shared_ptr<SockInfo> getOriginSock(mediakit::MediaSource &sender) const override;
bool inputFrame(const Frame::Ptr &frame) override; bool inputFrame(const Frame::Ptr &frame) override;
bool addTrack(const Track::Ptr & track) override; bool addTrack(const Track::Ptr &track) override;
void addTrackCompleted() override; void addTrackCompleted() override;
void resetTracks() override {}; void resetTracks() override {};
...@@ -85,7 +80,7 @@ private: ...@@ -85,7 +80,7 @@ private:
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
DecoderImp::Ptr _decoder; DecoderImp::Ptr _decoder;
std::recursive_mutex _func_mtx; std::recursive_mutex _func_mtx;
std::deque<std::function<void()> > _cached_func; std::deque<std::function<void()>> _cached_func;
}; };
} // namespace SRT } // namespace SRT
......
...@@ -2,12 +2,12 @@ ...@@ -2,12 +2,12 @@
#include "Statistic.hpp" #include "Statistic.hpp"
namespace SRT { namespace SRT {
void PacketRecvRateContext::inputPacket(TimePoint& ts) { void PacketRecvRateContext::inputPacket(TimePoint &ts) {
if(_pkt_map.size()>100){ if (_pkt_map.size() > 100) {
_pkt_map.erase(_pkt_map.begin()); _pkt_map.erase(_pkt_map.begin());
} }
auto tmp = DurationCountMicroseconds(ts - _start); auto tmp = DurationCountMicroseconds(ts - _start);
_pkt_map.emplace(tmp,tmp); _pkt_map.emplace(tmp, tmp);
} }
uint32_t PacketRecvRateContext::getPacketRecvRate() { uint32_t PacketRecvRateContext::getPacketRecvRate() {
if (_pkt_map.size() < 2) { if (_pkt_map.size() < 2) {
...@@ -27,13 +27,13 @@ uint32_t PacketRecvRateContext::getPacketRecvRate() { ...@@ -27,13 +27,13 @@ uint32_t PacketRecvRateContext::getPacketRecvRate() {
} }
double rate = 1e6 / (double)dur; double rate = 1e6 / (double)dur;
if(rate <=1000){ if (rate <= 1000) {
return 50000; return 50000;
} }
return rate; return rate;
} }
void EstimatedLinkCapacityContext::inputPacket(TimePoint& ts) { void EstimatedLinkCapacityContext::inputPacket(TimePoint &ts) {
if (_pkt_map.size() > 16) { if (_pkt_map.size() > 16) {
_pkt_map.erase(_pkt_map.begin()); _pkt_map.erase(_pkt_map.begin());
} }
...@@ -44,31 +44,30 @@ uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() { ...@@ -44,31 +44,30 @@ uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() {
decltype(_pkt_map.begin()) next; decltype(_pkt_map.begin()) next;
std::vector<int64_t> tmp; std::vector<int64_t> tmp;
for(auto it = _pkt_map.begin();it != _pkt_map.end();++it){ for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) {
next = it; next = it;
++next; ++next;
if(next != _pkt_map.end()){ if (next != _pkt_map.end()) {
tmp.push_back(next->first -it->first); tmp.push_back(next->first - it->first);
}else{ } else {
break; break;
} }
} }
std::sort(tmp.begin(),tmp.end()); std::sort(tmp.begin(), tmp.end());
if(tmp.empty()){ if (tmp.empty()) {
return 1000; return 1000;
} }
if(tmp.size()<16){ if (tmp.size() < 16) {
return 1000; return 1000;
} }
double dur =tmp[0]/1e6; double dur = tmp[0] / 1e6;
return (uint32_t)(1.0/dur);
return (uint32_t)(1.0 / dur);
} }
void RecvRateContext::inputPacket(TimePoint& ts, size_t size ) { void RecvRateContext::inputPacket(TimePoint &ts, size_t size) {
if (_pkt_map.size() > 100) { if (_pkt_map.size() > 100) {
_pkt_map.erase(_pkt_map.begin()); _pkt_map.erase(_pkt_map.begin());
} }
...@@ -77,19 +76,19 @@ void RecvRateContext::inputPacket(TimePoint& ts, size_t size ) { ...@@ -77,19 +76,19 @@ void RecvRateContext::inputPacket(TimePoint& ts, size_t size ) {
_pkt_map.emplace(tmp, tmp); _pkt_map.emplace(tmp, tmp);
} }
uint32_t RecvRateContext::getRecvRate() { uint32_t RecvRateContext::getRecvRate() {
if(_pkt_map.size()<2){ if (_pkt_map.size() < 2) {
return 0; return 0;
} }
auto first = _pkt_map.begin(); auto first = _pkt_map.begin();
auto last = _pkt_map.rbegin(); auto last = _pkt_map.rbegin();
double dur = (last->first - first->first)/1000000.0; double dur = (last->first - first->first) / 1000000.0;
size_t bytes = 0; size_t bytes = 0;
for(auto it : _pkt_map){ for (auto it : _pkt_map) {
bytes += it.second; bytes += it.second;
} }
double rate = (double)bytes/dur; double rate = (double)bytes / dur;
return (uint32_t)rate; return (uint32_t)rate;
} }
......
...@@ -8,39 +8,42 @@ ...@@ -8,39 +8,42 @@
namespace SRT { namespace SRT {
class PacketRecvRateContext { class PacketRecvRateContext {
public: public:
PacketRecvRateContext(TimePoint start):_start(start){}; PacketRecvRateContext(TimePoint start)
: _start(start) {};
~PacketRecvRateContext() = default; ~PacketRecvRateContext() = default;
void inputPacket(TimePoint& ts); void inputPacket(TimePoint &ts);
uint32_t getPacketRecvRate(); uint32_t getPacketRecvRate();
private: private:
std::map<int64_t,int64_t> _pkt_map; std::map<int64_t, int64_t> _pkt_map;
TimePoint _start; TimePoint _start;
}; };
class EstimatedLinkCapacityContext { class EstimatedLinkCapacityContext {
public: public:
EstimatedLinkCapacityContext(TimePoint start):_start(start){}; EstimatedLinkCapacityContext(TimePoint start)
: _start(start) {};
~EstimatedLinkCapacityContext() = default; ~EstimatedLinkCapacityContext() = default;
void inputPacket(TimePoint& ts); void inputPacket(TimePoint &ts);
uint32_t getEstimatedLinkCapacity(); uint32_t getEstimatedLinkCapacity();
private: private:
std::map<int64_t,int64_t> _pkt_map; std::map<int64_t, int64_t> _pkt_map;
TimePoint _start; TimePoint _start;
}; };
class RecvRateContext { class RecvRateContext {
public: public:
RecvRateContext(TimePoint start):_start(start){}; RecvRateContext(TimePoint start)
: _start(start) {};
~RecvRateContext() = default; ~RecvRateContext() = default;
void inputPacket(TimePoint& ts,size_t size); void inputPacket(TimePoint &ts, size_t size);
uint32_t getRecvRate(); uint32_t getRecvRate();
private: private:
std::map<int64_t,size_t> _pkt_map; std::map<int64_t, size_t> _pkt_map;
TimePoint _start; TimePoint _start;
}; };
} // namespace SRT } // namespace SRT
#endif // ZLMEDIAKIT_SRT_STATISTIC_H #endif // ZLMEDIAKIT_SRT_STATISTIC_H
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论