Commit e5ca3aa0 by ziyue

格式化代码

parents e415230e 020b8b85
#ifndef ZLMEDIAKIT_SRT_COMMON_H #ifndef ZLMEDIAKIT_SRT_COMMON_H
#define ZLMEDIAKIT_SRT_COMMON_H #define ZLMEDIAKIT_SRT_COMMON_H
#if defined(_WIN32)
#include <winsock2.h>
#include <ws2tcpip.h>
#include <Iphlpapi.h>
#pragma comment (lib, "Ws2_32.lib")
#pragma comment(lib,"Iphlpapi.lib")
#else
#include <netdb.h>
#include <sys/socket.h>
#endif // defined(_WIN32)
#include <chrono> #include <chrono>
namespace SRT { namespace SRT {
using SteadyClock = std::chrono::steady_clock; using SteadyClock = std::chrono::steady_clock;
using TimePoint = std::chrono::time_point<SteadyClock>; using TimePoint = std::chrono::time_point<SteadyClock>;
......

#if defined(_WIN32)
#include <winsock2.h>
#include <ws2tcpip.h>
#include <Iphlpapi.h>
#pragma comment (lib, "Ws2_32.lib")
#pragma comment(lib,"Iphlpapi.lib")
#else
#include <sys/socket.h>
#include <netdb.h>
#endif // defined(_WIN32)
#include <atomic>
#include "Util/logger.h"
#include "Util/MD5.h" #include "Util/MD5.h"
#include "Util/logger.h"
#include <atomic>
#include "Packet.hpp" #include "Packet.hpp"
namespace SRT { namespace SRT {
...@@ -342,7 +331,6 @@ size_t HandshakePacket::getExtSize() { ...@@ -342,7 +331,6 @@ size_t HandshakePacket::getExtSize() {
} }
return size; return size;
} }
bool HandshakePacket::storeToData() { bool HandshakePacket::storeToData() {
_data = BufferRaw::create(); _data = BufferRaw::create();
for (auto ex : ext_list) { for (auto ex : ext_list) {
...@@ -445,15 +433,11 @@ uint32_t HandshakePacket::generateSynCookie( ...@@ -445,15 +433,11 @@ uint32_t HandshakePacket::generateSynCookie(
// SYN cookie // SYN cookie
char clienthost[NI_MAXHOST]; char clienthost[NI_MAXHOST];
char clientport[NI_MAXSERV]; char clientport[NI_MAXSERV];
getnameinfo((struct sockaddr*)addr, getnameinfo(
sizeof(struct sockaddr_storage), (struct sockaddr *)addr, sizeof(struct sockaddr_storage), clienthost, sizeof(clienthost), clientport,
clienthost, sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV);
sizeof(clienthost), int64_t timestamp = (DurationCountMicroseconds(SteadyClock::now() - ts) / 60000000) + distractor.load()
clientport, + correction; // secret changes every one minute
sizeof(clientport),
NI_NUMERICHOST | NI_NUMERICSERV);
int64_t timestamp = (DurationCountMicroseconds(SteadyClock::now() - ts) / 60000000) + distractor.load() +
correction; // secret changes every one minute
std::stringstream cookiestr; std::stringstream cookiestr;
cookiestr << clienthost << ":" << clientport << ":" << timestamp; cookiestr << clienthost << ":" << clientport << ":" << timestamp;
union { union {
......
...@@ -63,8 +63,6 @@ public: ...@@ -63,8 +63,6 @@ public:
uint32_t timestamp; uint32_t timestamp;
uint32_t dst_socket_id; uint32_t dst_socket_id;
TimePoint get_ts; // recv or send time
private: private:
BufferRaw::Ptr _data; BufferRaw::Ptr _data;
}; };
...@@ -189,7 +187,8 @@ public: ...@@ -189,7 +187,8 @@ public:
static bool isHandshakePacket(uint8_t *buf, size_t len); static bool isHandshakePacket(uint8_t *buf, size_t len);
static uint32_t getHandshakeType(uint8_t *buf, size_t len); static uint32_t getHandshakeType(uint8_t *buf, size_t len);
static uint32_t getSynCookie(uint8_t *buf, size_t len); static uint32_t getSynCookie(uint8_t *buf, size_t len);
static uint32_t generateSynCookie(struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie = 0, int correction = 0); static uint32_t
generateSynCookie(struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie = 0, int correction = 0);
void assignPeerIP(struct sockaddr_storage *addr); void assignPeerIP(struct sockaddr_storage *addr);
///////ControlPacket override/////// ///////ControlPacket override///////
...@@ -358,4 +357,4 @@ public: ...@@ -358,4 +357,4 @@ public:
} // namespace SRT } // namespace SRT
#endif //ZLMEDIAKIT_SRT_PACKET_H #endif // ZLMEDIAKIT_SRT_PACKET_H
\ No newline at end of file \ No newline at end of file
...@@ -68,7 +68,7 @@ uint32_t PacketSendQueue::timeLatency() { ...@@ -68,7 +68,7 @@ uint32_t PacketSendQueue::timeLatency() {
} else { } else {
dur = first - last; dur = first - last;
} }
if (dur > (0x01 << 31)) { if (dur > ((uint32_t)0x01 << 31)) {
TraceL << "cycle timeLatency " << dur; TraceL << "cycle timeLatency " << dur;
dur = 0xffffffff - dur; dur = 0xffffffff - dur;
} }
......
...@@ -25,7 +25,6 @@ public: ...@@ -25,7 +25,6 @@ public:
private: private:
uint32_t timeLatency(); uint32_t timeLatency();
private: private:
uint32_t _pkt_cap; uint32_t _pkt_cap;
uint32_t _pkt_latency; uint32_t _pkt_latency;
......
#include <stdlib.h> #include <stdlib.h>
#include "Util/onceToken.h"
#include "Ack.hpp" #include "Ack.hpp"
#include "Packet.hpp" #include "Packet.hpp"
#include "SrtTransport.hpp" #include "SrtTransport.hpp"
#include "Util/onceToken.h"
namespace SRT { namespace SRT {
#define SRT_FIELD "srt." #define SRT_FIELD "srt."
...@@ -13,7 +14,6 @@ const std::string kPort = SRT_FIELD "port"; ...@@ -13,7 +14,6 @@ const std::string kPort = SRT_FIELD "port";
const std::string kLatencyMul = SRT_FIELD "latencyMul"; const std::string kLatencyMul = SRT_FIELD "latencyMul";
static std::atomic<uint32_t> s_srt_socket_id_generate { 125 }; static std::atomic<uint32_t> s_srt_socket_id_generate { 125 };
//////////// SrtTransport ////////////////////////// //////////// SrtTransport //////////////////////////
SrtTransport::SrtTransport(const EventPoller::Ptr &poller) SrtTransport::SrtTransport(const EventPoller::Ptr &poller)
: _poller(poller) { : _poller(poller) {
...@@ -432,7 +432,6 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora ...@@ -432,7 +432,6 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
DataPacket::Ptr pkt = std::make_shared<DataPacket>(); DataPacket::Ptr pkt = std::make_shared<DataPacket>();
pkt->loadFromData(buf, len); pkt->loadFromData(buf, len);
pkt->get_ts = _now;
std::list<DataPacket::Ptr> list; std::list<DataPacket::Ptr> list;
//TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\ //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
//" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
......
...@@ -45,7 +45,7 @@ public: ...@@ -45,7 +45,7 @@ public:
virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush); virtual void onSendTSData(const Buffer::Ptr &buffer, bool flush);
std::string getIdentifier(); std::string getIdentifier();
void unregisterSelf(); void unregisterSelf();
void unregisterSelfHandshake(); void unregisterSelfHandshake();
protected: protected:
......
...@@ -9,16 +9,15 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) : SrtTransport( ...@@ -9,16 +9,15 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) : SrtTransport(
SrtTransportImp::~SrtTransportImp() { SrtTransportImp::~SrtTransportImp() {
InfoP(this); InfoP(this);
uint64_t duration = _alive_ticker.createdTime() / 1000; uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info._vhost << "/" << _media_info._app << "/"
<< _media_info._vhost << "/" << _media_info._streamid << ")断开,耗时(s):" << duration;
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开,耗时(s):" << duration;
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_total_bytes >= iFlowThreshold * 1024) { if (_total_bytes >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this)); NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false,
static_cast<SockInfo &>(*this));
} }
} }
...@@ -71,7 +70,6 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) { ...@@ -71,7 +70,6 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender, bool force) {
<< sender.getVhost() << "/" << sender.getVhost() << "/"
<< sender.getApp() << "/" << sender.getApp() << "/"
<< sender.getId() << " " << force; << sender.getId() << " " << force;
weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this()); weak_ptr<SrtTransportImp> weak_self = static_pointer_cast<SrtTransportImp>(shared_from_this());
getPoller()->async([weak_self, err]() { getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
...@@ -112,10 +110,9 @@ void SrtTransportImp::emitOnPublish() { ...@@ -112,10 +110,9 @@ void SrtTransportImp::emitOnPublish() {
return; return;
} }
if (err.empty()) { if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost, strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(
strong_self->_media_info._app, strong_self->_media_info._vhost, strong_self->_media_info._app, strong_self->_media_info._streamid,
strong_self->_media_info._streamid, 0.0f, 0.0f, option);
option);
strong_self->_muxer->setMediaListener(strong_self); strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc(); strong_self->doCachedFunc();
InfoP(strong_self) << "允许 srt 推流"; InfoP(strong_self) << "允许 srt 推流";
...@@ -126,7 +123,9 @@ void SrtTransportImp::emitOnPublish() { ...@@ -126,7 +123,9 @@ void SrtTransportImp::emitOnPublish() {
}; };
//触发推流鉴权事件 //触发推流鉴权事件
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker, static_cast<SockInfo &>(*this)); auto flag = NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker,
static_cast<SockInfo &>(*this));
if (!flag) { if (!flag) {
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
invoker("", ProtocolOption()); invoker("", ProtocolOption());
...@@ -149,7 +148,8 @@ void SrtTransportImp::emitOnPlay() { ...@@ -149,7 +148,8 @@ void SrtTransportImp::emitOnPlay() {
}); });
}; };
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this)); auto flag = NoticeCenter::Instance().emitEvent(
Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this));
if (!flag) { if (!flag) {
doPlay(); doPlay();
} }
......
...@@ -12,7 +12,6 @@ namespace SRT { ...@@ -12,7 +12,6 @@ namespace SRT {
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
class SrtTransportImp class SrtTransportImp
: public SrtTransport : public SrtTransport
, public toolkit::SockInfo , public toolkit::SockInfo
...@@ -27,7 +26,6 @@ public: ...@@ -27,7 +26,6 @@ public:
_total_bytes += len; _total_bytes += len;
} }
void onSendTSData(const Buffer::Ptr &buffer, bool flush) override { SrtTransport::onSendTSData(buffer, flush); } void onSendTSData(const Buffer::Ptr &buffer, bool flush) override { SrtTransport::onSendTSData(buffer, flush); }
/// SockInfo override /// SockInfo override
std::string get_local_ip() override; std::string get_local_ip() override;
uint16_t get_local_port() override; uint16_t get_local_port() override;
...@@ -86,7 +84,7 @@ private: ...@@ -86,7 +84,7 @@ private:
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
DecoderImp::Ptr _decoder; DecoderImp::Ptr _decoder;
std::recursive_mutex _func_mtx; std::recursive_mutex _func_mtx;
std::deque<std::function<void()> > _cached_func; std::deque<std::function<void()>> _cached_func;
}; };
} // namespace SRT } // namespace SRT
......
...@@ -9,7 +9,8 @@ namespace SRT { ...@@ -9,7 +9,8 @@ namespace SRT {
class PacketRecvRateContext { class PacketRecvRateContext {
public: public:
PacketRecvRateContext(TimePoint start): _start(start) {}; PacketRecvRateContext(TimePoint start)
: _start(start) {};
~PacketRecvRateContext() = default; ~PacketRecvRateContext() = default;
void inputPacket(TimePoint &ts); void inputPacket(TimePoint &ts);
uint32_t getPacketRecvRate(); uint32_t getPacketRecvRate();
...@@ -33,7 +34,8 @@ private: ...@@ -33,7 +34,8 @@ private:
class RecvRateContext { class RecvRateContext {
public: public:
RecvRateContext(TimePoint start): _start(start) {}; RecvRateContext(TimePoint start)
: _start(start) {};
~RecvRateContext() = default; ~RecvRateContext() = default;
void inputPacket(TimePoint &ts, size_t size); void inputPacket(TimePoint &ts, size_t size);
uint32_t getRecvRate(); uint32_t getRecvRate();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论