Commit e91c26c0 by xiongziliang

实现hls的流量汇报事件

parent e951efc6
...@@ -166,8 +166,7 @@ typedef struct { ...@@ -166,8 +166,7 @@ typedef struct {
void (API_CALL *on_mk_flow_report)(const mk_media_info url_info, void (API_CALL *on_mk_flow_report)(const mk_media_info url_info,
uint64_t total_bytes, uint64_t total_bytes,
uint64_t total_seconds, uint64_t total_seconds,
int is_player, int is_player);
const mk_tcp_session sender);
} mk_events; } mk_events;
......
...@@ -151,8 +151,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ ...@@ -151,8 +151,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){
s_events.on_mk_flow_report((mk_media_info) &args, s_events.on_mk_flow_report((mk_media_info) &args,
totalBytes, totalBytes,
totalDuration, totalDuration,
isPlayer, isPlayer);
(mk_tcp_session) &sender);
} }
}); });
......
...@@ -376,14 +376,14 @@ void API_CALL on_mk_shell_login(const char *user_name, ...@@ -376,14 +376,14 @@ void API_CALL on_mk_shell_login(const char *user_name,
void API_CALL on_mk_flow_report(const mk_media_info url_info, void API_CALL on_mk_flow_report(const mk_media_info url_info,
uint64_t total_bytes, uint64_t total_bytes,
uint64_t total_seconds, uint64_t total_seconds,
int is_player, int is_player) {
const mk_tcp_session sender) { log_printf(LOG_LEV,"%s/%s/%s/%s, url params: %s,"
log_printf(LOG_LEV,"client info, local: %s:%d, peer: %s:%d\n"
"total_bytes: %d, total_seconds: %d, is_player: %d", "total_bytes: %d, total_seconds: %d, is_player: %d",
mk_tcp_session_local_ip(sender), mk_media_info_get_schema(url_info),
mk_tcp_session_local_port(sender), mk_media_info_get_vhost(url_info),
mk_tcp_session_peer_ip(sender), mk_media_info_get_app(url_info),
mk_tcp_session_peer_port(sender), mk_media_info_get_stream(url_info),
mk_media_info_get_params(url_info),
(int)total_bytes, (int)total_seconds, (int)is_player); (int)total_bytes, (int)total_seconds, (int)is_player);
} }
......
...@@ -269,16 +269,20 @@ void installWebHook(){ ...@@ -269,16 +269,20 @@ void installWebHook(){
}); });
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){
if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1"){ if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty()){
return; return;
} }
auto body = make_json(args); auto body = make_json(args);
body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier();
body["totalBytes"] = (Json::UInt64)totalBytes; body["totalBytes"] = (Json::UInt64)totalBytes;
body["duration"] = (Json::UInt64)totalDuration; body["duration"] = (Json::UInt64)totalDuration;
body["player"] = isPlayer; body["player"] = isPlayer;
body["schema"] = args._schema;
body["vhost"] = args._vhost;
body["app"] = args._app;
body["stream"] = args._streamid;
body["params"] = args._param_strs;
//执行hook //执行hook
do_http_hook(hook_flowreport,body, nullptr); do_http_hook(hook_flowreport,body, nullptr);
}); });
......
...@@ -127,7 +127,7 @@ extern const string kBroadcastShellLogin; ...@@ -127,7 +127,7 @@ extern const string kBroadcastShellLogin;
//停止rtsp/rtmp/http-flv会话后流量汇报事件广播 //停止rtsp/rtmp/http-flv会话后流量汇报事件广播
extern const string kBroadcastFlowReport; extern const string kBroadcastFlowReport;
#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer,TcpSession &sender #define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer
//未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 //未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了
extern const string kBroadcastNotFoundStream; extern const string kBroadcastNotFoundStream;
......
...@@ -46,6 +46,7 @@ static const string kAccessErrKey = "kAccessErrKey"; ...@@ -46,6 +46,7 @@ static const string kAccessErrKey = "kAccessErrKey";
static const string kAccessHls = "kAccessHls"; static const string kAccessHls = "kAccessHls";
static const string kHlsSuffix = "/hls.m3u8"; static const string kHlsSuffix = "/hls.m3u8";
static const string kHlsData = "kHlsData"; static const string kHlsData = "kHlsData";
static const string kHlsHaveFindMediaSource = "kHlsHaveFindMediaSource";
static const string &getContentType(const char *name) { static const string &getContentType(const char *name) {
const char *dot; const char *dot;
...@@ -306,6 +307,8 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI ...@@ -306,6 +307,8 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI
if(is_hls){ if(is_hls){
//hls相关信息 //hls相关信息
(*cookie)[kHlsData].set<HlsCookieData>(mediaInfo); (*cookie)[kHlsData].set<HlsCookieData>(mediaInfo);
//hls未查找MediaSource
(*cookie)[kHlsHaveFindMediaSource].set<bool>(false);
} }
callback(errMsg, cookie); callback(errMsg, cookie);
}else{ }else{
...@@ -370,49 +373,64 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo ...@@ -370,49 +373,64 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo
weak_ptr<TcpSession> weakSession = sender.shared_from_this(); weak_ptr<TcpSession> weakSession = sender.shared_from_this();
//判断是否有权限访问该文件 //判断是否有权限访问该文件
canAccessPath(sender, parser, mediaInfo, false, [cb, strFile, parser, is_hls, mediaInfo, weakSession , file_exist](const string &errMsg, const HttpServerCookie::Ptr &cookie) { canAccessPath(sender, parser, mediaInfo, false, [cb, strFile, parser, is_hls, mediaInfo, weakSession , file_exist](const string &errMsg, const HttpServerCookie::Ptr &cookie) {
if (!errMsg.empty()) { auto strongSession = weakSession.lock();
//文件鉴权失败 if(!strongSession){
StrCaseMap headerOut; //http客户端已经断开,不需要回复
if (cookie) { return;
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); }
} if (!errMsg.empty()) {
cb("401 Unauthorized", "text/html", headerOut, std::make_shared<HttpStringBody>(errMsg)); //文件鉴权失败
return; StrCaseMap headerOut;
if (cookie) {
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
cb("401 Unauthorized", "text/html", headerOut, std::make_shared<HttpStringBody>(errMsg));
return;
}
auto response_file = [](const HttpServerCookie::Ptr &cookie, const HttpFileManager::invoker &cb, const string &strFile, const Parser &parser) { auto response_file = [](const HttpServerCookie::Ptr &cookie, const HttpFileManager::invoker &cb, const string &strFile, const Parser &parser) {
StrCaseMap httpHeader; StrCaseMap httpHeader;
if (cookie) { if (cookie) {
httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
}
HttpSession::HttpResponseInvoker invoker = [&](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) {
if (cookie) {
cookie->getLock();
auto is_hls = (*cookie)[kAccessHls].get<bool>();
if (is_hls) {
(*cookie)[kHlsData].get<HlsCookieData>().addByteUsage(body->remainSize());
} }
HttpSession::HttpResponseInvoker invoker = [&](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) { }
if (cookie) { cb(codeOut.data(), getContentType(strFile.data()), headerOut, body);
cookie->getLock();
auto is_hls = (*cookie)[kAccessHls].get<bool>();
if (is_hls) {
(*cookie)[kHlsData].get<HlsCookieData>().addByteUsage(body->remainSize());
}
}
cb(codeOut.data(), getContentType(strFile.data()), headerOut, body);
};
invoker.responseFile(parser.getValues(), httpHeader, strFile);
}; };
invoker.responseFile(parser.getValues(), httpHeader, strFile);
if (file_exist || !is_hls) { };
//不是hls或者文件存在,直接回复文件或404
response_file(cookie, cb, strFile, parser); //如果程序未正常退出,会残余上次的hls文件,所以判断hls直播是否存在的关键不是文件存在与否
} else { //而是应该判断HlsMediaSource是否已注册,但是这样会每次获取m3u8文件时都会用MediaSource::findAsync判断一次
//hls文件不存在,我们等待其生成并延后回复 //会导致程序性能低下,所以我们应该在cookie声明周期的第一次判断HlsMediaSource是否已经注册,后续通过文件存在与否判断
auto strongSession = weakSession.lock(); if (!is_hls) {
if(!strongSession){ //不是hls,直接回复文件或404
//http客户端已经断开,不需要回复 response_file(cookie, cb, strFile, parser);
return; } else {
bool have_find_media_src = false;
if(cookie){
have_find_media_src = (*cookie)[kHlsHaveFindMediaSource].get<bool>();
if(!have_find_media_src){
(*cookie)[kHlsHaveFindMediaSource].set<bool>(true);
} }
MediaSource::findAsync(mediaInfo, strongSession, [response_file, cookie, cb, strFile, parser](const MediaSource::Ptr &src) {
//hls已经生成或者超时后仍未生成,那么不管怎么样都返回客户端
response_file(cookie, cb, strFile, parser);
});
} }
if(have_find_media_src){
//之前该cookie已经通过MediaSource::findAsync查找过了,所以现在只以文件系统查找结果为准
response_file(cookie, cb, strFile, parser);
return;
}
//hls文件不存在,我们等待其生成并延后回复
MediaSource::findAsync(mediaInfo, strongSession, [response_file, cookie, cb, strFile, parser](const MediaSource::Ptr &src) {
//hls已经生成或者超时后仍未生成,那么不管怎么样都返回客户端
response_file(cookie, cb, strFile, parser);
});
}
}); });
} }
......
...@@ -104,7 +104,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -104,7 +104,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
void HttpSession::onError(const SockException& err) { void HttpSession::onError(const SockException& err) {
if(_is_flv_stream){ if(_is_flv_stream){
//flv播放器 //flv播放器
WarnP(this) << "播放器(" WarnP(this) << "FLV播放器("
<< _mediaInfo._vhost << "/" << _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/" << _mediaInfo._app << "/"
<< _mediaInfo._streamid << _mediaInfo._streamid
...@@ -112,12 +112,7 @@ void HttpSession::onError(const SockException& err) { ...@@ -112,12 +112,7 @@ void HttpSession::onError(const SockException& err) {
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, _ticker.createdTime()/1000, true);
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
true,
*this);
} }
return; return;
} }
......
...@@ -42,21 +42,27 @@ void HlsCookieData::addReaderCount(){ ...@@ -42,21 +42,27 @@ void HlsCookieData::addReaderCount(){
_src = src; _src = src;
} }
} }
} }
HlsCookieData::~HlsCookieData() { HlsCookieData::~HlsCookieData() {
if(_added){ if (_added) {
auto src = _src.lock(); auto src = _src.lock();
if(src){ if (src) {
src->modifyReaderCount(false); src->modifyReaderCount(false);
} }
auto duration = (_ticker.createdTime() - _ticker.elapsedTime()) / 1000;
WarnL << "HLS播放器(" << _info._vhost << "/" << _info._app << "/" << _info._streamid << ")断开,播放时间:" << duration;
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_bytes > iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true);
}
} }
} }
void HlsCookieData::addByteUsage(uint64_t bytes) { void HlsCookieData::addByteUsage(uint64_t bytes) {
addReaderCount(); addReaderCount();
_bytes += bytes; _bytes += bytes;
_ticker.resetTime();
} }
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#define ZLMEDIAKIT_HLSMEDIASOURCE_H #define ZLMEDIAKIT_HLSMEDIASOURCE_H
#include <atomic> #include <atomic>
#include "Util/TimeTicker.h"
#include "Common/MediaSource.h" #include "Common/MediaSource.h"
namespace mediakit{ namespace mediakit{
...@@ -43,6 +44,7 @@ private: ...@@ -43,6 +44,7 @@ private:
MediaInfo _info; MediaInfo _info;
bool _added = false; bool _added = false;
weak_ptr<HlsMediaSource> _src; weak_ptr<HlsMediaSource> _src;
Ticker _ticker;
}; };
class HlsMediaSource : public MediaSource { class HlsMediaSource : public MediaSource {
......
...@@ -45,7 +45,7 @@ RtmpSession::~RtmpSession() { ...@@ -45,7 +45,7 @@ RtmpSession::~RtmpSession() {
void RtmpSession::onError(const SockException& err) { void RtmpSession::onError(const SockException& err) {
bool isPlayer = !_pPublisherSrc; bool isPlayer = !_pPublisherSrc;
WarnP(this) << (isPlayer ? "播放器(" : "推流器(") WarnP(this) << (isPlayer ? "RTMP播放器(" : "RTMP推流器(")
<< _mediaInfo._vhost << "/" << _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/" << _mediaInfo._app << "/"
<< _mediaInfo._streamid << _mediaInfo._streamid
...@@ -55,12 +55,7 @@ void RtmpSession::onError(const SockException& err) { ...@@ -55,12 +55,7 @@ void RtmpSession::onError(const SockException& err) {
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, _ticker.createdTime()/1000, isPlayer);
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
isPlayer,
*this);
} }
} }
......
...@@ -86,7 +86,7 @@ RtspSession::~RtspSession() { ...@@ -86,7 +86,7 @@ RtspSession::~RtspSession() {
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
bool isPlayer = !_pushSrc; bool isPlayer = !_pushSrc;
WarnP(this) << (isPlayer ? "播放器(" : "推流器(") WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(")
<< _mediaInfo._vhost << "/" << _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/" << _mediaInfo._app << "/"
<< _mediaInfo._streamid << _mediaInfo._streamid
...@@ -106,12 +106,7 @@ void RtspSession::onError(const SockException& err) { ...@@ -106,12 +106,7 @@ void RtspSession::onError(const SockException& err) {
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, _ticker.createdTime()/1000, isPlayer);
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
isPlayer,
*this);
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论