Commit 82da99ee by xiongguangjie

add shutdown packet and flow report

parent b9f66ca1
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <vector> #include <vector>
#include "Network/Buffer.h" #include "Network/Buffer.h"
#include "Util/logger.h"
#include "Common.hpp" #include "Common.hpp"
#include "HSExt.hpp" #include "HSExt.hpp"
...@@ -312,6 +313,32 @@ class MsgDropReqPacket : public ControlPacket ...@@ -312,6 +313,32 @@ class MsgDropReqPacket : public ControlPacket
uint32_t last_pkt_seq_num; uint32_t last_pkt_seq_num;
}; };
class ShutDownPacket : public ControlPacket
{
public:
using Ptr = std::shared_ptr<ShutDownPacket>;
ShutDownPacket() = default;
~ShutDownPacket() = default;
///////ControlPacket override///////
bool loadFromData(uint8_t *buf, size_t len) override {
if (len < HEADER_SIZE) {
WarnL << "data size" << len << " less " << HEADER_SIZE;
return false;
}
_data = BufferRaw::create();
_data->assign((char *)buf, len);
return loadHeader();
};
bool storeToData() override {
control_type = ControlPacket::SHUTDOWN;
sub_type = 0;
_data = BufferRaw::create();
_data->setCapacity(HEADER_SIZE);
_data->setSize(HEADER_SIZE);
return storeToHeader();
};
};
} // namespace SRT } // namespace SRT
#endif //ZLMEDIAKIT_SRT_PACKET_H #endif //ZLMEDIAKIT_SRT_PACKET_H
\ No newline at end of file
...@@ -329,6 +329,14 @@ void SrtTransport::sendNAKPacket(std::list<PacketQueue::LostPair>& lost_list){ ...@@ -329,6 +329,14 @@ void SrtTransport::sendNAKPacket(std::list<PacketQueue::LostPair>& lost_list){
TraceL<<"send NAK "<<pkt->dump(); TraceL<<"send NAK "<<pkt->dump();
sendControlPacket(pkt,true); sendControlPacket(pkt,true);
} }
void SrtTransport::sendShutDown(){
ShutDownPacket::Ptr pkt = std::make_shared<ShutDownPacket>();
pkt->dst_socket_id = _peer_socket_id;
pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
pkt->storeToData();
sendControlPacket(pkt,true);
}
void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr){ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr){
DataPacket::Ptr pkt = std::make_shared<DataPacket>(); DataPacket::Ptr pkt = std::make_shared<DataPacket>();
pkt->loadFromData(buf,len); pkt->loadFromData(buf,len);
...@@ -423,6 +431,7 @@ void SrtTransport::unregisterSelf() { ...@@ -423,6 +431,7 @@ void SrtTransport::unregisterSelf() {
} }
void SrtTransport::onShutdown(const SockException &ex){ void SrtTransport::onShutdown(const SockException &ex){
sendShutDown();
WarnL << ex.what(); WarnL << ex.what();
unregisterSelfHandshake(); unregisterSelfHandshake();
unregisterSelf(); unregisterSelf();
......
...@@ -36,7 +36,7 @@ public: ...@@ -36,7 +36,7 @@ public:
* @param len 数据长度 * @param len 数据长度
* @param addr 数据来源地址 * @param addr 数据来源地址
*/ */
void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr); virtual void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr);
std::string getIdentifier(); std::string getIdentifier();
...@@ -72,6 +72,7 @@ private: ...@@ -72,6 +72,7 @@ private:
void sendACKPacket(); void sendACKPacket();
void sendLightACKPacket(); void sendLightACKPacket();
void sendKeepLivePacket(); void sendKeepLivePacket();
void sendShutDown();
protected: protected:
void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false);
void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true); void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true);
......
...@@ -9,6 +9,18 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller) ...@@ -9,6 +9,18 @@ SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller)
SrtTransportImp::~SrtTransportImp() { SrtTransportImp::~SrtTransportImp() {
InfoP(this); InfoP(this);
uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << "srt 推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")断开,耗时(s):" << duration;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_total_bytes >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
}
} }
void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_storage *addr) { void SrtTransportImp::onHandShakeFinished(std::string &streamid,struct sockaddr_storage *addr) {
......
...@@ -17,7 +17,10 @@ class SrtTransportImp ...@@ -17,7 +17,10 @@ class SrtTransportImp
public: public:
SrtTransportImp(const EventPoller::Ptr &poller); SrtTransportImp(const EventPoller::Ptr &poller);
~SrtTransportImp(); ~SrtTransportImp();
void inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr){
SrtTransport::inputSockData(buf,len,addr);
_total_bytes += len;
}
/// SockInfo override /// SockInfo override
std::string get_local_ip() override; std::string get_local_ip() override;
uint16_t get_local_port() override; uint16_t get_local_port() override;
...@@ -58,7 +61,8 @@ private: ...@@ -58,7 +61,8 @@ private:
private: private:
bool _is_pusher = true; bool _is_pusher = true;
MediaInfo _media_info; MediaInfo _media_info;
uint64_t _total_bytes = 0;
Ticker _alive_ticker;
std::unique_ptr<sockaddr_storage> _addr; std::unique_ptr<sockaddr_storage> _addr;
// for pusher // for pusher
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论