Commit b8729dc2 by xiongguangjie

Merge branch 'master' into dev

parents 43d7479c a64383a2
......@@ -50,8 +50,8 @@ BraceWrapping:
BreakConstructorInitializers: BeforeComma
# 继承过长需要换行时也在 `,` 前
BreakInheritanceList: BeforeComma
# 列宽 120
ColumnLimit: 120
# 列宽 160
ColumnLimit: 160
# c++11 括号内起始/结束无空格, false 会加上
Cpp11BracedListStyle: false
# 命名空间后的注释会修正为: // namespace_name
......
......@@ -153,7 +153,7 @@ file(GLOB ToolKit_SRC_LIST
${ToolKit_ROOT}/src/*/*.c)
if(IOS)
list(APPEND ToolKit_SRC_LIST
${ToolKit_ROOT}/Network/Socket_ios.mm)
${ToolKit_ROOT}/src/Network/Socket_ios.mm)
endif()
###################################################################
......
ZLToolKit @ 7e40c751
Subproject commit ca26e43a5f62986bb8a007226e0bad148d154abc
Subproject commit 7e40c751659d5c1ec623699732284c12e0a4feb8
media-server @ cdbb3d6b
Subproject commit 5aa9884660df1c193d730a90835af36ee411668c
Subproject commit cdbb3d6b9ea254f454c6e466c5962af5ace01199
......@@ -308,7 +308,7 @@ API_EXPORT void API_CALL mk_webrtc_get_answer_sdp2(void *user_data, on_user_data
WebRtcPluginManager::Instance().getAnswerSdp(*session, type, WebRtcArgsUrl(url),
[offer_str, session, ptr, cb](const WebRtcInterface &exchanger) mutable {
try {
auto sdp_answer = const_cast<WebRtcInterface &>(exchanger).getAnswerSdp(offer_str);
auto sdp_answer = exchangeSdp(exchanger, offer_str);
cb(ptr.get(), sdp_answer.data(), nullptr);
} catch (std::exception &ex) {
cb(ptr.get(), nullptr, ex.what());
......
......@@ -16,7 +16,7 @@ using namespace std;
using namespace toolkit;
using namespace mediakit;
class MediaHelper : public MediaSourceEvent , public std::enable_shared_from_this<MediaHelper> {
class MediaHelper: public MediaSourceEvent, public std::enable_shared_from_this<MediaHelper> {
public:
using Ptr = std::shared_ptr<MediaHelper>;
MediaHelper(const char *vhost, const char *app, const char *stream, float duration, const ProtocolOption &option) {
......@@ -27,20 +27,16 @@ public:
~MediaHelper() = default;
void attachEvent(){
_channel->setMediaListener(shared_from_this());
}
void attachEvent() { _channel->setMediaListener(shared_from_this()); }
DevChannel::Ptr &getChannel(){
return _channel;
}
DevChannel::Ptr &getChannel() { return _channel; }
void setOnClose(on_mk_media_close cb, std::shared_ptr<void> user_data){
void setOnClose(on_mk_media_close cb, std::shared_ptr<void> user_data) {
_on_close = cb;
_on_close_data = std::move(user_data);
}
void setOnSeek(on_mk_media_seek cb, std::shared_ptr<void> user_data){
void setOnSeek(on_mk_media_seek cb, std::shared_ptr<void> user_data) {
_on_seek = cb;
_on_seek_data = std::move(user_data);
}
......@@ -55,7 +51,7 @@ public:
_on_speed_data = std::move(user_data);
}
void setOnRegist(on_mk_media_source_regist cb, std::shared_ptr<void> user_data){
void setOnRegist(on_mk_media_source_regist cb, std::shared_ptr<void> user_data) {
_on_regist = cb;
_on_regist_data = std::move(user_data);
}
......@@ -97,15 +93,13 @@ protected:
return _on_speed(_on_speed_data.get(), speed);
}
void onRegist(MediaSource &sender, bool regist) override{
void onRegist(MediaSource &sender, bool regist) override {
if (_on_regist) {
_on_regist(_on_regist_data.get(), (mk_media_source)&sender, regist);
}
}
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) {
return _poller;
}
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override { return _poller; }
private:
EventPoller::Ptr _poller;
......@@ -122,7 +116,7 @@ private:
std::shared_ptr<void> _on_regist_data;
};
API_EXPORT void API_CALL mk_media_set_on_close(mk_media ctx, on_mk_media_close cb, void *user_data){
API_EXPORT void API_CALL mk_media_set_on_close(mk_media ctx, on_mk_media_close cb, void *user_data) {
mk_media_set_on_close2(ctx, cb, user_data, nullptr);
}
......@@ -159,14 +153,14 @@ API_EXPORT void API_CALL mk_media_set_on_speed(mk_media ctx, on_mk_media_speed c
mk_media_set_on_speed2(ctx, cb, user_data, nullptr);
}
API_EXPORT void API_CALL mk_media_set_on_speed2(mk_media ctx, on_mk_media_speed cb, void *user_data, on_user_data_free user_data_free){
API_EXPORT void API_CALL mk_media_set_on_speed2(mk_media ctx, on_mk_media_speed cb, void *user_data, on_user_data_free user_data_free) {
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
std::shared_ptr<void> ptr(user_data, user_data_free ? user_data_free : [](void *) {});
(*obj)->setOnSpeed(cb, std::move(ptr));
}
API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source_regist cb, void *user_data){
API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source_regist cb, void *user_data) {
mk_media_set_on_regist2(ctx, cb, user_data, nullptr);
}
......@@ -177,7 +171,7 @@ API_EXPORT void API_CALL mk_media_set_on_regist2(mk_media ctx, on_mk_media_sourc
(*obj)->setOnRegist(cb, std::move(ptr));
}
API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx){
API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx) {
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
return (*obj)->getChannel()->totalReaderCount();
......@@ -209,7 +203,7 @@ API_EXPORT void API_CALL mk_media_release(mk_media ctx) {
delete obj;
}
API_EXPORT int API_CALL mk_media_init_video(mk_media ctx, int codec_id, int width, int height, float fps, int bit_rate){
API_EXPORT int API_CALL mk_media_init_video(mk_media ctx, int codec_id, int width, int height, float fps, int bit_rate) {
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
VideoInfo info;
......@@ -221,7 +215,7 @@ API_EXPORT int API_CALL mk_media_init_video(mk_media ctx, int codec_id, int widt
return (*obj)->getChannel()->initVideo(info);
}
API_EXPORT int API_CALL mk_media_init_audio(mk_media ctx, int codec_id, int sample_rate, int channels, int sample_bit){
API_EXPORT int API_CALL mk_media_init_audio(mk_media ctx, int codec_id, int sample_rate, int channels, int sample_bit) {
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
AudioInfo info;
......@@ -232,19 +226,19 @@ API_EXPORT int API_CALL mk_media_init_audio(mk_media ctx, int codec_id, int samp
return (*obj)->getChannel()->initAudio(info);
}
API_EXPORT void API_CALL mk_media_init_track(mk_media ctx, mk_track track){
API_EXPORT void API_CALL mk_media_init_track(mk_media ctx, mk_track track) {
assert(ctx && track);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
(*obj)->getChannel()->addTrack(*((Track::Ptr *) track));
}
API_EXPORT void API_CALL mk_media_init_complete(mk_media ctx){
API_EXPORT void API_CALL mk_media_init_complete(mk_media ctx) {
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
(*obj)->getChannel()->addTrackCompleted();
}
API_EXPORT int API_CALL mk_media_input_frame(mk_media ctx, mk_frame frame){
API_EXPORT int API_CALL mk_media_input_frame(mk_media ctx, mk_frame frame) {
assert(ctx && frame);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
return (*obj)->getChannel()->inputFrame(*((Frame::Ptr *) frame));
......@@ -274,13 +268,13 @@ API_EXPORT int API_CALL mk_media_input_aac(mk_media ctx, const void *data, int l
return (*obj)->getChannel()->inputAAC((const char *) data, len, dts, (char *) adts);
}
API_EXPORT int API_CALL mk_media_input_pcm(mk_media ctx, void *data , int len, uint64_t pts){
API_EXPORT int API_CALL mk_media_input_pcm(mk_media ctx, void *data, int len, uint64_t pts) {
assert(ctx && data && len > 0);
MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx;
return (*obj)->getChannel()->inputPCM((char*)data, len, pts);
}
API_EXPORT int API_CALL mk_media_input_audio(mk_media ctx, const void* data, int len, uint64_t dts){
API_EXPORT int API_CALL mk_media_input_audio(mk_media ctx, const void *data, int len, uint64_t dts) {
assert(ctx && data && len > 0);
MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx;
return (*obj)->getChannel()->inputAudio((const char*)data, len, dts);
......@@ -290,7 +284,8 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u
mk_media_start_send_rtp2(ctx, dst_url, dst_port, ssrc, is_udp, cb, user_data, nullptr);
}
API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data, on_user_data_free user_data_free){
API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_send_rtp_result cb, void *user_data,
on_user_data_free user_data_free) {
assert(ctx && dst_url && ssrc);
MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx;
......@@ -312,7 +307,7 @@ API_EXPORT void API_CALL mk_media_start_send_rtp2(mk_media ctx, const char *dst_
});
}
API_EXPORT void API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc){
API_EXPORT void API_CALL mk_media_stop_send_rtp(mk_media ctx, const char *ssrc) {
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *)ctx;
// sender参数无用
......
......@@ -24,7 +24,7 @@ public:
MediaPlayerForC(){
_player = std::make_shared<MediaPlayer>();
}
~MediaPlayerForC(){}
~MediaPlayerForC() = default;
MediaPlayer *operator->(){
return _player.get();
......
......@@ -112,10 +112,10 @@ API_EXPORT uint16_t API_CALL mk_sock_info_local_port(const mk_sock_info ctx){
}
////////////////////////////////////////////////////////////////////////////////////////
API_EXPORT mk_sock_info API_CALL mk_tcp_session_get_sock_info(const mk_tcp_session ctx){
API_EXPORT mk_sock_info API_CALL mk_tcp_session_get_sock_info(const mk_tcp_session ctx) {
assert(ctx);
SessionForC *session = (SessionForC *)ctx;
return (mk_sock_info)session;
return reinterpret_cast<mk_sock_info>(static_cast<SockInfo *>(session));
}
API_EXPORT void API_CALL mk_tcp_session_shutdown(const mk_tcp_session ctx,int err,const char *err_msg){
......@@ -320,7 +320,7 @@ TcpClientForC::Ptr *mk_tcp_client_create_l(mk_tcp_client_events *events, mk_tcp_
API_EXPORT mk_sock_info API_CALL mk_tcp_client_get_sock_info(const mk_tcp_client ctx){
assert(ctx);
TcpClientForC::Ptr *client = (TcpClientForC::Ptr *)ctx;
return (mk_sock_info)(SockInfo *)client->get();
return reinterpret_cast<mk_sock_info>(static_cast<SockInfo *>(client->get()));
}
API_EXPORT mk_tcp_client API_CALL mk_tcp_client_create(mk_tcp_client_events *events, mk_tcp_type type){
......
......@@ -8,6 +8,7 @@ EXPOSE 443/tcp
EXPOSE 10000/udp
EXPOSE 10000/tcp
EXPOSE 8000/udp
EXPOSE 8000/tcp
EXPOSE 9000/udp
# ADD sources.list /etc/apt/sources.list
......@@ -82,4 +83,4 @@ COPY --from=build /opt/media/ZLMediaKit/release/linux/${MODEL}/MediaServer /opt/
COPY --from=build /opt/media/ZLMediaKit/release/linux/${MODEL}/config.ini /opt/media/conf/
COPY --from=build /opt/media/ZLMediaKit/www/ /opt/media/bin/www/
ENV PATH /opt/media/bin:$PATH
CMD ["sh","-c","./MediaServer -s default.pem -c ../conf/config.ini"]
CMD ["sh","-c","./MediaServer -s default.pem -c ../conf/config.ini -l 0"]
......@@ -1489,6 +1489,42 @@
"response": []
},
{
"name": "更新RTP服务器过滤SSRC(updateRtpServerSSRC)",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/updateRtpServerSSRC?secret={{ZLMediaKit_secret}}&stream_id=test&ssrc=123456",
"host": [
"{{ZLMediaKit_URL}}"
],
"path": [
"index",
"api",
"updateRtpServerSSRC"
],
"query": [
{
"key": "secret",
"value": "{{ZLMediaKit_secret}}",
"description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数"
},
{
"key": "stream_id",
"value": "test",
"description": "该端口绑定的流id"
},
{
"key": "ssrc",
"value": "123456",
"description": "十进制ssrc"
}
]
}
},
"response": []
},
{
"name": "暂停RTP超时检查(pauseRtpCheck)",
"request": {
"method": "GET",
......
......@@ -247,7 +247,7 @@ static inline void addHttpListener(){
size = body->remainSize();
}
LogContextCapture log(getLogger(), LDebug, __FILE__, "http api debug", __LINE__);
LogContextCapture log(getLogger(), toolkit::LTrace, __FILE__, "http api debug", __LINE__);
log << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n";
log << "# header:\r\n";
......@@ -966,7 +966,7 @@ void installWebApi() {
//开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试
pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) {
if (ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what();
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex;
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
s_proxyPusherMap.erase(key);
}
......@@ -975,7 +975,7 @@ void installWebApi() {
//被主动关闭推流
pusher->setOnClose([key, url](const SockException &ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex.what();
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex;
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
s_proxyPusherMap.erase(key);
});
......@@ -1182,6 +1182,18 @@ void installWebApi() {
val["hit"] = 1;
});
api_regist("/index/api/updateRtpServerSSRC",[](API_ARGS_MAP){
CHECK_SECRET();
CHECK_ARGS("stream_id", "ssrc");
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
auto it = s_rtpServerMap.find(allArgs["stream_id"]);
if (it == s_rtpServerMap.end()) {
throw ApiRetException("RtpServer not found by stream_id", API::NotFound);
}
it->second->updateSSRC(allArgs["ssrc"]);
});
api_regist("/index/api/listRtpServer",[](API_ARGS_MAP){
CHECK_SECRET();
......@@ -1595,7 +1607,7 @@ void installWebApi() {
auto offer = allArgs.getArgs();
CHECK(!offer.empty(), "http body(webrtc offer sdp) is empty");
WebRtcPluginManager::Instance().getAnswerSdp(*(static_cast<Session *>(&sender)), type,
WebRtcPluginManager::Instance().getAnswerSdp(static_cast<Session&>(sender), type,
WebRtcArgsImp(allArgs, sender.getIdentifier()),
[invoker, val, offer, headerOut](const WebRtcInterface &exchanger) mutable {
//设置返回类型
......@@ -1604,7 +1616,7 @@ void installWebApi() {
headerOut["Access-Control-Allow-Origin"] = "*";
try {
val["sdp"] = const_cast<WebRtcInterface &>(exchanger).getAnswerSdp(offer);
val["sdp"] = exchangeSdp(exchanger, offer);
val["id"] = exchanger.getIdentifier();
val["type"] = "answer";
invoker(200, headerOut, val.toStyledString());
......@@ -1620,7 +1632,7 @@ void installWebApi() {
auto offer = allArgs.getArgs();
CHECK(!offer.empty(), "http body(webrtc offer sdp) is empty");
WebRtcPluginManager::Instance().getAnswerSdp(*(static_cast<Session *>(&sender)), type,
WebRtcPluginManager::Instance().getAnswerSdp(static_cast<Session&>(sender), type,
WebRtcArgsImp(allArgs, sender.getIdentifier()),
[invoker, offer, headerOut](const WebRtcInterface &exchanger) mutable {
// 设置跨域
......@@ -1628,7 +1640,7 @@ void installWebApi() {
try {
// 设置返回类型
headerOut["Content-Type"] = "application/sdp";
invoker(201, headerOut, const_cast<WebRtcInterface &>(exchanger).getAnswerSdp(offer));
invoker(201, headerOut, exchangeSdp(exchanger, offer));
} catch (std::exception &ex) {
headerOut["Content-Type"] = "text/plain";
invoker(406, headerOut, ex.what());
......
......@@ -14,10 +14,10 @@
#include "Util/NoticeCenter.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Http/HttpSession.h"
#include "Http/HttpRequester.h"
#include "Network/Session.h"
#include "Rtsp/RtspSession.h"
#include "Http/HttpSession.h"
#include "WebHook.h"
#include "WebApi.h"
......@@ -29,33 +29,33 @@ using namespace mediakit;
namespace Hook {
#define HOOK_FIELD "hook."
const string kEnable = HOOK_FIELD"enable";
const string kTimeoutSec = HOOK_FIELD"timeoutSec";
const string kOnPublish = HOOK_FIELD"on_publish";
const string kOnPlay = HOOK_FIELD"on_play";
const string kOnFlowReport = HOOK_FIELD"on_flow_report";
const string kOnRtspRealm = HOOK_FIELD"on_rtsp_realm";
const string kOnRtspAuth = HOOK_FIELD"on_rtsp_auth";
const string kOnStreamChanged = HOOK_FIELD"on_stream_changed";
const string kOnStreamNotFound = HOOK_FIELD"on_stream_not_found";
const string kOnRecordMp4 = HOOK_FIELD"on_record_mp4";
const string kOnRecordTs = HOOK_FIELD"on_record_ts";
const string kOnShellLogin = HOOK_FIELD"on_shell_login";
const string kOnStreamNoneReader = HOOK_FIELD"on_stream_none_reader";
const string kOnHttpAccess = HOOK_FIELD"on_http_access";
const string kOnServerStarted = HOOK_FIELD"on_server_started";
const string kOnServerKeepalive = HOOK_FIELD"on_server_keepalive";
const string kOnSendRtpStopped = HOOK_FIELD"on_send_rtp_stopped";
const string kOnRtpServerTimeout = HOOK_FIELD"on_rtp_server_timeout";
const string kAdminParams = HOOK_FIELD"admin_params";
const string kAliveInterval = HOOK_FIELD"alive_interval";
const string kRetry = HOOK_FIELD"retry";
const string kRetryDelay = HOOK_FIELD"retry_delay";
onceToken token([](){
const string kEnable = HOOK_FIELD "enable";
const string kTimeoutSec = HOOK_FIELD "timeoutSec";
const string kOnPublish = HOOK_FIELD "on_publish";
const string kOnPlay = HOOK_FIELD "on_play";
const string kOnFlowReport = HOOK_FIELD "on_flow_report";
const string kOnRtspRealm = HOOK_FIELD "on_rtsp_realm";
const string kOnRtspAuth = HOOK_FIELD "on_rtsp_auth";
const string kOnStreamChanged = HOOK_FIELD "on_stream_changed";
const string kOnStreamNotFound = HOOK_FIELD "on_stream_not_found";
const string kOnRecordMp4 = HOOK_FIELD "on_record_mp4";
const string kOnRecordTs = HOOK_FIELD "on_record_ts";
const string kOnShellLogin = HOOK_FIELD "on_shell_login";
const string kOnStreamNoneReader = HOOK_FIELD "on_stream_none_reader";
const string kOnHttpAccess = HOOK_FIELD "on_http_access";
const string kOnServerStarted = HOOK_FIELD "on_server_started";
const string kOnServerKeepalive = HOOK_FIELD "on_server_keepalive";
const string kOnSendRtpStopped = HOOK_FIELD "on_send_rtp_stopped";
const string kOnRtpServerTimeout = HOOK_FIELD "on_rtp_server_timeout";
const string kAdminParams = HOOK_FIELD "admin_params";
const string kAliveInterval = HOOK_FIELD "alive_interval";
const string kRetry = HOOK_FIELD "retry";
const string kRetryDelay = HOOK_FIELD "retry_delay";
static onceToken token([]() {
mINI::Instance()[kEnable] = false;
mINI::Instance()[kTimeoutSec] = 10;
//默认hook地址设置为空,采用默认行为(例如不鉴权)
// 默认hook地址设置为空,采用默认行为(例如不鉴权)
mINI::Instance()[kOnPublish] = "";
mINI::Instance()[kOnPlay] = "";
mINI::Instance()[kOnFlowReport] = "";
......@@ -76,8 +76,8 @@ onceToken token([](){
mINI::Instance()[kAliveInterval] = 30.0;
mINI::Instance()[kRetry] = 1;
mINI::Instance()[kRetryDelay] = 3.0;
},nullptr);
}//namespace Hook
});
} // namespace Hook
namespace Cluster {
#define CLUSTER_FIELD "cluster."
......@@ -91,19 +91,18 @@ static onceToken token([]() {
mINI::Instance()[kRetryCount] = 3;
});
}//namespace Cluster
} // namespace Cluster
static void parse_http_response(const SockException &ex, const Parser &res,
const function<void(const Value &,const string &,const bool &)> &fun){
static void parse_http_response(const SockException &ex, const Parser &res, const function<void(const Value &, const string &, bool)> &fun) {
bool should_retry = true;
if (ex) {
auto errStr = StrPrinter << "[network err]:" << ex.what() << endl;
fun(Json::nullValue, errStr,should_retry);
auto errStr = StrPrinter << "[network err]:" << ex << endl;
fun(Json::nullValue, errStr, should_retry);
return;
}
if (res.Url() != "200") {
auto errStr = StrPrinter << "[bad http status code]:" << res.Url() << endl;
fun(Json::nullValue, errStr,should_retry);
fun(Json::nullValue, errStr, should_retry);
return;
}
Value result;
......@@ -112,45 +111,45 @@ static void parse_http_response(const SockException &ex, const Parser &res,
ss >> result;
} catch (std::exception &ex) {
auto errStr = StrPrinter << "[parse json failed]:" << ex.what() << endl;
fun(Json::nullValue, errStr,should_retry);
fun(Json::nullValue, errStr, should_retry);
return;
}
auto code = result["code"];
if (!code.isInt64()) {
auto errStr = StrPrinter << "[json code]:" << "code not int :"<<code<< endl;
fun(Json::nullValue, errStr,should_retry);
auto errStr = StrPrinter << "[json code]:" << "code not int:" << code << endl;
fun(Json::nullValue, errStr, should_retry);
return;
}
should_retry = false;
if(code.asInt64() != 0){
auto errStr = StrPrinter << "[auth failed]: code:" <<code<<" msg:"<<result["msg"]<<endl;
fun(Json::nullValue, errStr,should_retry);
if (code.asInt64() != 0) {
auto errStr = StrPrinter << "[auth failed]: code:" << code << " msg:" << result["msg"] << endl;
fun(Json::nullValue, errStr, should_retry);
return;
}
try {
fun(result, "",should_retry);
fun(result, "", should_retry);
} catch (std::exception &ex) {
auto errStr = StrPrinter << "[do hook invoker failed]:" << ex.what() << endl;
//如果还是抛异常,那么再上抛异常
fun(Json::nullValue, errStr,should_retry);
// 如果还是抛异常,那么再上抛异常
fun(Json::nullValue, errStr, should_retry);
}
}
string to_string(const Value &value){
string to_string(const Value &value) {
return value.toStyledString();
}
string to_string(const HttpArgs &value){
string to_string(const HttpArgs &value) {
return value.make();
}
const char *getContentType(const Value &value){
const char *getContentType(const Value &value) {
return "application/json";
}
const char *getContentType(const HttpArgs &value){
const char *getContentType(const HttpArgs &value) {
return "application/x-www-form-urlencoded";
}
......@@ -182,8 +181,8 @@ void do_http_hook(const string &url, const ArgsType &body, const function<void(c
}
Ticker ticker;
requester->startRequester(url, [url, func, bodyStr, body, requester, ticker, retry](const SockException &ex, const Parser &res) mutable {
onceToken token(nullptr, [&]() mutable { requester.reset(); });
parse_http_response(ex, res, [&](const Value &obj, const string &err,const bool &should_retry) {
onceToken token(nullptr, [&]() mutable { requester.reset(); });
parse_http_response(ex, res, [&](const Value &obj, const string &err, bool should_retry) {
if (!err.empty()) {
// hook失败
WarnL << "hook " << url << " " << ticker.elapsedTime() << "ms,failed" << err << ":" << bodyStr;
......@@ -193,12 +192,12 @@ void do_http_hook(const string &url, const ArgsType &body, const function<void(c
do_http_hook(url, body, func, retry);
return 0;
});
//重试不需要触发回调
// 重试不需要触发回调
return;
}
} else if (ticker.elapsedTime() > 500) {
//hook成功,但是hook响应超过500ms,打印警告日志
// hook成功,但是hook响应超过500ms,打印警告日志
DebugL << "hook " << url << " " << ticker.elapsedTime() << "ms,success:" << bodyStr;
}
......@@ -214,7 +213,7 @@ void do_http_hook(const string &url, const ArgsType &body, const function<void(c
do_http_hook(url, body, func, hook_retry);
}
static ArgsType make_json(const MediaInfo &args){
static ArgsType make_json(const MediaInfo &args) {
ArgsType body;
body["schema"] = args._schema;
body[VHOST_KEY] = args._vhost;
......@@ -224,19 +223,19 @@ static ArgsType make_json(const MediaInfo &args){
return body;
}
static void reportServerStarted(){
GET_CONFIG(bool,hook_enable,Hook::kEnable);
GET_CONFIG(string,hook_server_started,Hook::kOnServerStarted);
if(!hook_enable || hook_server_started.empty()){
static void reportServerStarted() {
GET_CONFIG(bool, hook_enable, Hook::kEnable);
GET_CONFIG(string, hook_server_started, Hook::kOnServerStarted);
if (!hook_enable || hook_server_started.empty()) {
return;
}
ArgsType body;
for (auto &pr : mINI::Instance()) {
body[pr.first] = (string &) pr.second;
body[pr.first] = (string &)pr.second;
}
//执行hook
do_http_hook(hook_server_started,body, nullptr);
// 执行hook
do_http_hook(hook_server_started, body, nullptr);
}
// 服务器定时保活定时器
......@@ -249,11 +248,11 @@ static void reportServerKeepalive() {
}
GET_CONFIG(float, alive_interval, Hook::kAliveInterval);
g_keepalive_timer = std::make_shared<Timer>(alive_interval, []() {
g_keepalive_timer = std::make_shared<Timer>(alive_interval,[]() {
getStatisticJson([](const Value &data) mutable {
ArgsType body;
body["data"] = data;
//执行hook
// 执行hook
do_http_hook(hook_server_keepalive, body, nullptr);
});
return true;
......@@ -268,13 +267,11 @@ static string getPullUrl(const string &origin_fmt, const MediaInfo &info) {
WarnL << "get origin url failed, origin_fmt:" << origin_fmt;
return "";
}
//告知源站这是来自边沿站的拉流请求,如果未找到流请立即返回拉流失败
// 告知源站这是来自边沿站的拉流请求,如果未找到流请立即返回拉流失败
return string(url) + '?' + kEdgeServerParam + '&' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs;
}
static void pullStreamFromOrigin(const vector<string>& urls, size_t index, size_t failed_cnt, const MediaInfo &args,
const function<void()> &closePlayer) {
static void pullStreamFromOrigin(const vector<string> &urls, size_t index, size_t failed_cnt, const MediaInfo &args, const function<void()> &closePlayer) {
GET_CONFIG(float, cluster_timeout_sec, Cluster::kTimeoutSec);
GET_CONFIG(int, retry_count, Cluster::kRetryCount);
......@@ -283,17 +280,16 @@ static void pullStreamFromOrigin(const vector<string>& urls, size_t index, size_
InfoL << "pull stream from origin, failed_cnt: " << failed_cnt << ", timeout_sec: " << timeout_sec << ", url: " << url;
ProtocolOption option;
option.enable_hls = option.enable_hls || (args._schema == HLS_SCHEMA);
option.enable_hls = option.enable_hls || (args._schema == HLS_SCHEMA);
option.enable_mp4 = false;
addStreamProxy(args._vhost, args._app, args._streamid, url, retry_count, option, Rtsp::RTP_TCP, timeout_sec,
[=](const SockException &ex, const string &key) mutable {
addStreamProxy(args._vhost, args._app, args._streamid, url, retry_count, option, Rtsp::RTP_TCP, timeout_sec, [=](const SockException &ex, const string &key) mutable {
if (!ex) {
return;
}
//拉流失败
// 拉流失败
if (++failed_cnt == urls.size()) {
//已经重试所有源站了
// 已经重试所有源站了
WarnL << "pull stream from origin final failed: " << url;
closePlayer();
return;
......@@ -319,38 +315,38 @@ static mINI jsonToMini(const Value &obj) {
return ret;
}
void installWebHook(){
GET_CONFIG(bool,hook_enable,Hook::kEnable);
GET_CONFIG(string,hook_adminparams,Hook::kAdminParams);
void installWebHook() {
GET_CONFIG(bool, hook_enable, Hook::kEnable);
GET_CONFIG(string, hook_adminparams, Hook::kAdminParams);
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) {
GET_CONFIG(string,hook_publish,Hook::kOnPublish);
GET_CONFIG(string, hook_publish, Hook::kOnPublish);
if (!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1") {
invoker("", ProtocolOption());
return;
}
//异步执行该hook api,防止阻塞NoticeCenter
// 异步执行该hook api,防止阻塞NoticeCenter
auto body = make_json(args);
body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier();
body["originType"] = (int) type;
body["originType"] = (int)type;
body["originTypeStr"] = getOriginTypeString(type);
//执行hook
// 执行hook
do_http_hook(hook_publish, body, [invoker](const Value &obj, const string &err) mutable {
if (err.empty()) {
//推流鉴权成功
// 推流鉴权成功
invoker(err, ProtocolOption(jsonToMini(obj)));
} else {
//推流鉴权失败
// 推流鉴权失败
invoker(err, ProtocolOption());
}
});
});
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastMediaPlayed,[](BroadcastMediaPlayedArgs){
GET_CONFIG(string,hook_play,Hook::kOnPlay);
if(!hook_enable || args._param_strs == hook_adminparams || hook_play.empty() || sender.get_peer_ip() == "127.0.0.1"){
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastMediaPlayed, [](BroadcastMediaPlayedArgs) {
GET_CONFIG(string, hook_play, Hook::kOnPlay);
if (!hook_enable || args._param_strs == hook_adminparams || hook_play.empty() || sender.get_peer_ip() == "127.0.0.1") {
invoker("");
return;
}
......@@ -358,15 +354,13 @@ void installWebHook(){
body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier();
//执行hook
do_http_hook(hook_play,body,[invoker](const Value &obj,const string &err){
invoker(err);
});
// 执行hook
do_http_hook(hook_play, body, [invoker](const Value &obj, const string &err) { invoker(err); });
});
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){
GET_CONFIG(string,hook_flowreport,Hook::kOnFlowReport);
if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1"){
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastFlowReport, [](BroadcastFlowReportArgs) {
GET_CONFIG(string, hook_flowreport, Hook::kOnFlowReport);
if (!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1") {
return;
}
auto body = make_json(args);
......@@ -376,18 +370,17 @@ void installWebHook(){
body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier();
//执行hook
do_http_hook(hook_flowreport,body, nullptr);
// 执行hook
do_http_hook(hook_flowreport, body, nullptr);
});
static const string unAuthedRealm = "unAuthedRealm";
//监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastOnGetRtspRealm,[](BroadcastOnGetRtspRealmArgs){
GET_CONFIG(string,hook_rtsp_realm,Hook::kOnRtspRealm);
if(!hook_enable || args._param_strs == hook_adminparams || hook_rtsp_realm.empty() || sender.get_peer_ip() == "127.0.0.1"){
//无需认证
// 监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastOnGetRtspRealm, [](BroadcastOnGetRtspRealmArgs) {
GET_CONFIG(string, hook_rtsp_realm, Hook::kOnRtspRealm);
if (!hook_enable || args._param_strs == hook_adminparams || hook_rtsp_realm.empty() || sender.get_peer_ip() == "127.0.0.1") {
// 无需认证
invoker("");
return;
}
......@@ -395,10 +388,10 @@ void installWebHook(){
body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier();
//执行hook
do_http_hook(hook_rtsp_realm,body, [invoker](const Value &obj,const string &err){
if(!err.empty()){
//如果接口访问失败,那么该rtsp流认证失败
// 执行hook
do_http_hook(hook_rtsp_realm, body, [invoker](const Value &obj, const string &err) {
if (!err.empty()) {
// 如果接口访问失败,那么该rtsp流认证失败
invoker(unAuthedRealm);
return;
}
......@@ -406,12 +399,12 @@ void installWebHook(){
});
});
//监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastOnRtspAuth,[](BroadcastOnRtspAuthArgs){
GET_CONFIG(string,hook_rtsp_auth,Hook::kOnRtspAuth);
if(unAuthedRealm == realm || !hook_enable || hook_rtsp_auth.empty()){
//认证失败
invoker(false,makeRandStr(12));
// 监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastOnRtspAuth, [](BroadcastOnRtspAuthArgs) {
GET_CONFIG(string, hook_rtsp_auth, Hook::kOnRtspAuth);
if (unAuthedRealm == realm || !hook_enable || hook_rtsp_auth.empty()) {
// 认证失败
invoker(false, makeRandStr(12));
return;
}
auto body = make_json(args);
......@@ -421,22 +414,21 @@ void installWebHook(){
body["user_name"] = user_name;
body["must_no_encrypt"] = must_no_encrypt;
body["realm"] = realm;
//执行hook
do_http_hook(hook_rtsp_auth,body, [invoker](const Value &obj,const string &err){
if(!err.empty()){
//认证失败
invoker(false,makeRandStr(12));
// 执行hook
do_http_hook(hook_rtsp_auth, body, [invoker](const Value &obj, const string &err) {
if (!err.empty()) {
// 认证失败
invoker(false, makeRandStr(12));
return;
}
invoker(obj["encrypted"].asBool(),obj["passwd"].asString());
invoker(obj["encrypted"].asBool(), obj["passwd"].asString());
});
});
//监听rtsp、rtmp源注册或注销事件
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastMediaChanged,[](BroadcastMediaChangedArgs){
GET_CONFIG(string,hook_stream_chaned,Hook::kOnStreamChanged);
if(!hook_enable || hook_stream_chaned.empty()){
// 监听rtsp、rtmp源注册或注销事件
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastMediaChanged, [](BroadcastMediaChangedArgs) {
GET_CONFIG(string, hook_stream_chaned, Hook::kOnStreamChanged);
if (!hook_enable || hook_stream_chaned.empty()) {
return;
}
ArgsType body;
......@@ -450,8 +442,8 @@ void installWebHook(){
body["stream"] = sender.getId();
body["regist"] = bRegist;
}
//执行hook
do_http_hook(hook_stream_chaned,body, nullptr);
// 执行hook
do_http_hook(hook_stream_chaned, body, nullptr);
});
GET_CONFIG_FUNC(vector<string>, origin_urls, Cluster::kOriginUrl, [](const string &str) {
......@@ -465,10 +457,10 @@ void installWebHook(){
return ret;
});
//监听播放失败(未找到特定的流)事件
// 监听播放失败(未找到特定的流)事件
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastNotFoundStream, [](BroadcastNotFoundStreamArgs) {
if (!origin_urls.empty()) {
//设置了源站,那么尝试溯源
// 设置了源站,那么尝试溯源
static atomic<uint8_t> s_index { 0 };
pullStreamFromOrigin(origin_urls, s_index.load(), 0, args, closePlayer);
++s_index;
......@@ -476,7 +468,7 @@ void installWebHook(){
}
if (start_with(args._param_strs, kEdgeServerParam)) {
//源站收到来自边沿站的溯源请求,流不存在时立即返回拉流失败
// 源站收到来自边沿站的溯源请求,流不存在时立即返回拉流失败
closePlayer();
return;
}
......@@ -498,14 +490,14 @@ void installWebHook(){
}
};
//执行hook
// 执行hook
do_http_hook(hook_stream_not_found, body, res_cb);
});
static auto getRecordInfo = [](const RecordInfo &info) {
ArgsType body;
body["start_time"] = (Json::UInt64) info.start_time;
body["file_size"] = (Json::UInt64) info.file_size;
body["start_time"] = (Json::UInt64)info.start_time;
body["file_size"] = (Json::UInt64)info.file_size;
body["time_len"] = info.time_len;
body["file_path"] = info.file_path;
body["file_name"] = info.file_name;
......@@ -518,19 +510,19 @@ void installWebHook(){
};
#ifdef ENABLE_MP4
//录制mp4文件成功后广播
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastRecordMP4,[](BroadcastRecordMP4Args){
GET_CONFIG(string,hook_record_mp4,Hook::kOnRecordMp4);
// 录制mp4文件成功后广播
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastRecordMP4, [](BroadcastRecordMP4Args) {
GET_CONFIG(string, hook_record_mp4, Hook::kOnRecordMp4);
if (!hook_enable || hook_record_mp4.empty()) {
return;
}
//执行hook
// 执行hook
do_http_hook(hook_record_mp4, getRecordInfo(info), nullptr);
});
#endif //ENABLE_MP4
#endif // ENABLE_MP4
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastRecordTs, [](BroadcastRecordTsArgs) {
GET_CONFIG(string,hook_record_ts,Hook::kOnRecordTs);
GET_CONFIG(string, hook_record_ts, Hook::kOnRecordTs);
if (!hook_enable || hook_record_ts.empty()) {
return;
}
......@@ -538,9 +530,9 @@ void installWebHook(){
do_http_hook(hook_record_ts, getRecordInfo(info), nullptr);
});
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastShellLogin,[](BroadcastShellLoginArgs){
GET_CONFIG(string,hook_shell_login,Hook::kOnShellLogin);
if(!hook_enable || hook_shell_login.empty() || sender.get_peer_ip() == "127.0.0.1"){
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastShellLogin, [](BroadcastShellLoginArgs) {
GET_CONFIG(string, hook_shell_login, Hook::kOnShellLogin);
if (!hook_enable || hook_shell_login.empty() || sender.get_peer_ip() == "127.0.0.1") {
invoker("");
return;
}
......@@ -551,21 +543,19 @@ void installWebHook(){
body["user_name"] = user_name;
body["passwd"] = passwd;
//执行hook
do_http_hook(hook_shell_login,body, [invoker](const Value &,const string &err){
invoker(err);
});
// 执行hook
do_http_hook(hook_shell_login, body, [invoker](const Value &, const string &err) { invoker(err); });
});
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs) {
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastStreamNoneReader, [](BroadcastStreamNoneReaderArgs) {
if (!origin_urls.empty()) {
//边沿站无人观看时立即停止溯源
// 边沿站无人观看时立即停止溯源
sender.close(false);
WarnL << "无人观看主动关闭流:" << sender.getOriginUrl();
return;
}
GET_CONFIG(string,hook_stream_none_reader,Hook::kOnStreamNoneReader);
if(!hook_enable || hook_stream_none_reader.empty()){
GET_CONFIG(string, hook_stream_none_reader, Hook::kOnStreamNoneReader);
if (!hook_enable || hook_stream_none_reader.empty()) {
return;
}
......@@ -575,11 +565,11 @@ void installWebHook(){
body["app"] = sender.getApp();
body["stream"] = sender.getId();
weak_ptr<MediaSource> weakSrc = sender.shared_from_this();
//执行hook
do_http_hook(hook_stream_none_reader,body, [weakSrc](const Value &obj,const string &err){
// 执行hook
do_http_hook(hook_stream_none_reader, body, [weakSrc](const Value &obj, const string &err) {
bool flag = obj["close"].asBool();
auto strongSrc = weakSrc.lock();
if(!flag || !err.empty() || !strongSrc){
if (!flag || !err.empty() || !strongSrc) {
return;
}
strongSrc->close(false);
......@@ -603,7 +593,7 @@ void installWebHook(){
body["originUrl"] = sender.getOriginUrl(MediaSource::NullMediaSource());
body["msg"] = ex.what();
body["err"] = ex.getErrCode();
//执行hook
// 执行hook
do_http_hook(hook_send_rtp_stopped, body, nullptr);
});
......@@ -615,24 +605,24 @@ void installWebHook(){
* 4、cookie中记录的url参数是否跟本次url参数一致,如果一致直接返回客户端错误码
* 5、触发kBroadcastHttpAccess事件
*/
//开发者应该通过该事件判定http客户端是否有权限访问http服务器上的特定文件
//ZLMediaKit会记录本次鉴权的结果至cookie
//如果鉴权成功,在cookie有效期内,那么下次客户端再访问授权目录时,ZLMediaKit会直接返回文件
//如果鉴权失败,在cookie有效期内,如果http url参数不变(否则会立即再次触发鉴权事件),ZLMediaKit会直接返回错误码
//如果用户客户端不支持cookie,那么ZLMediaKit会根据url参数查找cookie并追踪用户,
//如果没有url参数,客户端又不支持cookie,那么会根据ip和端口追踪用户
//追踪用户的目的是为了缓存上次鉴权结果,减少鉴权次数,提高性能
NoticeCenter::Instance().addListener(&web_hook_tag,Broadcast::kBroadcastHttpAccess,[](BroadcastHttpAccessArgs){
GET_CONFIG(string,hook_http_access,Hook::kOnHttpAccess);
if(sender.get_peer_ip() == "127.0.0.1" || parser.Params() == hook_adminparams){
//如果是本机或超级管理员访问,那么不做访问鉴权;权限有效期1个小时
invoker("","",60 * 60);
// 开发者应该通过该事件判定http客户端是否有权限访问http服务器上的特定文件
// ZLMediaKit会记录本次鉴权的结果至cookie
// 如果鉴权成功,在cookie有效期内,那么下次客户端再访问授权目录时,ZLMediaKit会直接返回文件
// 如果鉴权失败,在cookie有效期内,如果http url参数不变(否则会立即再次触发鉴权事件),ZLMediaKit会直接返回错误码
// 如果用户客户端不支持cookie,那么ZLMediaKit会根据url参数查找cookie并追踪用户,
// 如果没有url参数,客户端又不支持cookie,那么会根据ip和端口追踪用户
// 追踪用户的目的是为了缓存上次鉴权结果,减少鉴权次数,提高性能
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastHttpAccess, [](BroadcastHttpAccessArgs) {
GET_CONFIG(string, hook_http_access, Hook::kOnHttpAccess);
if (sender.get_peer_ip() == "127.0.0.1" || parser.Params() == hook_adminparams) {
// 如果是本机或超级管理员访问,那么不做访问鉴权;权限有效期1个小时
invoker("", "", 60 * 60);
return;
}
if(!hook_enable || hook_http_access.empty()){
//未开启http文件访问鉴权,那么允许访问,但是每次访问都要鉴权;
//因为后续随时都可能开启鉴权(重载配置文件后可能重新开启鉴权)
invoker("","",0);
if (!hook_enable || hook_http_access.empty()) {
// 未开启http文件访问鉴权,那么允许访问,但是每次访问都要鉴权;
// 因为后续随时都可能开启鉴权(重载配置文件后可能重新开启鉴权)
invoker("", "", 0);
return;
}
......@@ -643,20 +633,20 @@ void installWebHook(){
body["path"] = path;
body["is_dir"] = is_dir;
body["params"] = parser.Params();
for(auto &pr : parser.getHeader()){
for (auto &pr : parser.getHeader()) {
body[string("header.") + pr.first] = pr.second;
}
//执行hook
do_http_hook(hook_http_access,body, [invoker](const Value &obj,const string &err){
if(!err.empty()){
//如果接口访问失败,那么仅限本次没有访问http服务器的权限
invoker(err,"",0);
// 执行hook
do_http_hook(hook_http_access, body, [invoker](const Value &obj, const string &err) {
if (!err.empty()) {
// 如果接口访问失败,那么仅限本次没有访问http服务器的权限
invoker(err, "", 0);
return;
}
//err参数代表不能访问的原因,空则代表可以访问
//path参数是该客户端能访问或被禁止的顶端目录,如果path为空字符串,则表述为当前目录
//second参数规定该cookie超时时间,如果second为0,本次鉴权结果不缓存
invoker(obj["err"].asString(),obj["path"].asString(),obj["second"].asInt());
// err参数代表不能访问的原因,空则代表可以访问
// path参数是该客户端能访问或被禁止的顶端目录,如果path为空字符串,则表述为当前目录
// second参数规定该cookie超时时间,如果second为0,本次鉴权结果不缓存
invoker(obj["err"].asString(), obj["path"].asString(), obj["second"].asInt());
});
});
......@@ -675,14 +665,14 @@ void installWebHook(){
do_http_hook(rtp_server_timeout, body);
});
//汇报服务器重新启动
// 汇报服务器重新启动
reportServerStarted();
//定时上报保活
// 定时上报保活
reportServerKeepalive();
}
void unInstallWebHook(){
void unInstallWebHook() {
g_keepalive_timer.reset();
NoticeCenter::Instance().delListener(&web_hook_tag);
}
......@@ -123,7 +123,7 @@ public:
(*_parser) << Option('l',/*该选项简称,如果是\x00则说明无简称*/
"level",/*该选项全称,每个选项必须有全称;不得为null或空字符串*/
Option::ArgRequired,/*该选项后面必须跟值*/
to_string(LTrace).data(),/*该选项默认值*/
to_string(LDebug).data(),/*该选项默认值*/
false,/*该选项是否必须赋值,如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/
"日志等级,LTrace~LError(0~4)",/*该选项说明文字*/
nullptr);
......@@ -160,6 +160,14 @@ public:
"启动事件触发线程数",/*该选项说明文字*/
nullptr);
(*_parser) << Option(0,/*该选项简称,如果是\x00则说明无简称*/
"affinity",/*该选项全称,每个选项必须有全称;不得为null或空字符串*/
Option::ArgRequired,/*该选项后面必须跟值*/
to_string(1).data(),/*该选项默认值*/
false,/*该选项是否必须赋值,如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/
"是否启动cpu亲和性设置",/*该选项说明文字*/
nullptr);
#if defined(ENABLE_VERSION)
(*_parser) << Option('v', "version", Option::ArgNone, nullptr, false, "显示版本号",
[](const std::shared_ptr<ostream> &stream, const string &arg) -> bool {
......@@ -200,15 +208,16 @@ int start_main(int argc,char *argv[]) {
g_ini_file = cmd_main["config"];
string ssl_file = cmd_main["ssl"];
int threads = cmd_main["threads"];
bool affinity = cmd_main["affinity"];
//设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>("ConsoleChannel", logLevel));
#ifndef ANDROID
#if !defined(ANDROID)
auto fileChannel = std::make_shared<FileChannel>("FileChannel", exeDir() + "log/", logLevel);
//日志最多保存天数
// 日志最多保存天数
fileChannel->setMaxDay(cmd_main["max_day"]);
Logger::Instance().add(fileChannel);
#endif//
#endif // !defined(ANDROID)
#if !defined(_WIN32)
pid_t pid = getpid();
......@@ -252,24 +261,27 @@ int start_main(int argc,char *argv[]) {
uint16_t httpsPort = mINI::Instance()[Http::kSSLPort];
uint16_t rtpPort = mINI::Instance()[RtpProxy::kPort];
//设置poller线程数,该函数必须在使用ZLToolKit网络相关对象之前调用才能生效
//设置poller线程数和cpu亲和性,该函数必须在使用ZLToolKit网络相关对象之前调用才能生效
//如果需要调用getSnap和addFFmpegSource接口,可以关闭cpu亲和性
EventPollerPool::setPoolSize(threads);
EventPollerPool::enableCpuAffinity(affinity);
//简单的telnet服务器,可用于服务器调试,但是不能使用23端口,否则telnet上了莫名其妙的现象
//测试方法:telnet 127.0.0.1 9000
auto shellSrv = std::make_shared<TcpServer>();
//rtsp[s]服务器, 可用于诸如亚马逊echo show这样的设备访问
auto rtspSrv = std::make_shared<TcpServer>();;
auto rtspSSLSrv = std::make_shared<TcpServer>();;
auto rtspSrv = std::make_shared<TcpServer>();
auto rtspSSLSrv = std::make_shared<TcpServer>();
//rtmp[s]服务器
auto rtmpSrv = std::make_shared<TcpServer>();;
auto rtmpsSrv = std::make_shared<TcpServer>();;
auto rtmpSrv = std::make_shared<TcpServer>();
auto rtmpsSrv = std::make_shared<TcpServer>();
//http[s]服务器
auto httpSrv = std::make_shared<TcpServer>();;
auto httpsSrv = std::make_shared<TcpServer>();;
auto httpSrv = std::make_shared<TcpServer>();
auto httpsSrv = std::make_shared<TcpServer>();
#if defined(ENABLE_RTPPROXY)
//GB28181 rtp推流端口,支持UDP/TCP
......
......@@ -270,7 +270,7 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE
if (auto strong_self = weak_self.lock()) {
// 可能归属线程发生变更
strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() {
WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex.what();
WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex;
strong_self->_rtp_sender.erase(ssrc);
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex);
});
......@@ -393,12 +393,11 @@ void MultiMediaSourceMuxer::createGopCacheIfNeed() {
return;
}
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
_ring = std::make_shared<RingType>(1024, [weak_self](int size) {
auto strong_self = weak_self.lock();
if (strong_self) {
auto src = std::make_shared<MediaSourceForMuxer>(weak_self.lock());
_ring = std::make_shared<RingType>(1024, [weak_self, src](int size) {
if (auto strong_self = weak_self.lock()) {
// 切换到归属线程
strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() {
auto src = std::make_shared<MediaSourceForMuxer>(strong_self);
strong_self->onReaderChanged(*src, strong_self->totalReaderCount());
});
}
......
......@@ -24,9 +24,8 @@ namespace mediakit{
class Track : public FrameDispatcher , public CodecInfo{
public:
using Ptr = std::shared_ptr<Track>;
Track(){}
virtual ~Track(){}
Track() = default;
virtual ~Track() = default;
/**
* 是否准备好,准备好才能获取譬如sps pps等信息
......@@ -177,8 +176,8 @@ private:
class TrackSource{
public:
TrackSource(){}
virtual ~TrackSource(){}
TrackSource() = default;
virtual ~TrackSource() = default;
/**
* 获取全部的Track
......
......@@ -118,7 +118,7 @@ void HlsPlayer::fetchSegment() {
return;
}
if (err) {
WarnL << "Download ts segment " << url << " failed:" << err.what();
WarnL << "Download ts segment " << url << " failed:" << err;
if (err.getErrCode() == Err_timeout) {
strong_self->_timeout_multiple = MAX(strong_self->_timeout_multiple + 1, MAX_TIMEOUT_MULTIPLE);
}else{
......
......@@ -189,7 +189,7 @@ public:
_data = map_addr.get() + offset;
_size = size;
}
~BufferMmap() override {};
~BufferMmap() override = default;
//返回数据长度
char *data() const override { return _data; }
size_t size() const override { return _size; }
......
......@@ -30,9 +30,9 @@ namespace mediakit {
class HttpBody : public std::enable_shared_from_this<HttpBody>{
public:
using Ptr = std::shared_ptr<HttpBody>;
HttpBody(){}
HttpBody() = default;
virtual ~HttpBody(){}
virtual ~HttpBody() = default;
/**
* 剩余数据大小,如果返回-1, 那么就不设置content-length
......@@ -151,7 +151,7 @@ public:
* @param boundary boundary字符串
*/
HttpMultiFormBody(const HttpArgs &args,const std::string &filePath,const std::string &boundary = "0xKhTmLbOuNdArY");
virtual ~HttpMultiFormBody(){}
virtual ~HttpMultiFormBody() = default;
int64_t remainSize() override ;
toolkit::Buffer::Ptr readData(size_t size) override;
......
......@@ -27,8 +27,8 @@ class HttpCookie {
public:
using Ptr = std::shared_ptr<HttpCookie>;
friend class HttpCookieStorage;
HttpCookie(){}
~HttpCookie(){}
HttpCookie() = default;
~HttpCookie() = default;
void setPath(const std::string &path);
void setHost(const std::string &host);
......@@ -52,12 +52,14 @@ private:
*/
class HttpCookieStorage{
public:
~HttpCookieStorage(){}
~HttpCookieStorage() = default;
static HttpCookieStorage &Instance();
void set(const HttpCookie::Ptr &cookie);
std::vector<HttpCookie::Ptr> get(const std::string &host,const std::string &path);
private:
HttpCookieStorage(){};
HttpCookieStorage() = default;
private:
std::unordered_map<std::string/*host*/, std::map<std::string/*cookie path*/,std::map<std::string/*cookie_key*/, HttpCookie::Ptr> > > _all_cookie;
std::mutex _mtx_cookie;
......
......@@ -24,8 +24,8 @@ public:
typedef std::function<void(int code, const StrCaseMap &headerOut, const HttpBody::Ptr &body)> HttpResponseInvokerLambda0;
typedef std::function<void(int code, const StrCaseMap &headerOut, const std::string &body)> HttpResponseInvokerLambda1;
HttpResponseInvokerImp(){}
~HttpResponseInvokerImp(){}
HttpResponseInvokerImp() = default;
~HttpResponseInvokerImp() = default;
template<typename C>
HttpResponseInvokerImp(const C &c):HttpResponseInvokerImp(typename toolkit::function_traits<C>::stl_function_type(c)) {}
HttpResponseInvokerImp(const HttpResponseInvokerLambda0 &lambda);
......
......@@ -18,8 +18,8 @@ namespace mediakit {
class HttpRequestSplitter {
public:
HttpRequestSplitter(){};
virtual ~HttpRequestSplitter(){};
HttpRequestSplitter() = default;
virtual ~HttpRequestSplitter() = default;
/**
* 添加数据
......
......@@ -24,14 +24,11 @@ using namespace toolkit;
namespace mediakit {
HttpSession::HttpSession(const Socket::Ptr &pSock) : Session(pSock) {
TraceP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Http::kKeepAliveSecond);
pSock->setSendTimeOutSecond(keep_alive_sec);
}
HttpSession::~HttpSession() {
TraceP(this);
}
HttpSession::~HttpSession() = default;
void HttpSession::Handle_Req_HEAD(ssize_t &content_len){
//暂时全部返回200 OK,因为HTTP GET存在按需生成流的操作,所以不能按照HTTP GET的流程返回
......@@ -104,7 +101,7 @@ void HttpSession::onError(const SockException& err) {
uint64_t duration = _ticker.createdTime() / 1000;
WarnP(this) << "FLV/TS/FMP4播放器("
<< _mediaInfo.shortUrl()
<< ")断开:" << err.what()
<< ")断开:" << err
<< ",耗时(s):" << duration;
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
......@@ -114,9 +111,6 @@ void HttpSession::onError(const SockException& err) {
}
return;
}
//http客户端
TraceP(this) << err.what();
}
void HttpSession::onManager() {
......
......@@ -37,7 +37,7 @@ public:
template <typename... ArgsType>
ClientTypeImp(ArgsType &&...args) : ClientType(std::forward<ArgsType>(args)...) {}
~ClientTypeImp() override {};
~ClientTypeImp() override = default;
protected:
/**
......
......@@ -82,7 +82,7 @@ template<typename Creator, typename HttpSessionType = mediakit::HttpSession, med
class WebSocketSessionBase : public HttpSessionType {
public:
WebSocketSessionBase(const toolkit::Socket::Ptr &pSock) : HttpSessionType(pSock){}
virtual ~WebSocketSessionBase(){}
virtual ~WebSocketSessionBase() = default;
//收到eof或其他导致脱离TcpServer事件的回调
void onError(const toolkit::SockException &err) override{
......@@ -248,7 +248,7 @@ template<typename SessionType,typename HttpSessionType = mediakit::HttpSession,
class WebSocketSession : public WebSocketSessionBase<SessionCreator<SessionType>,HttpSessionType,DataType>{
public:
WebSocketSession(const toolkit::Socket::Ptr &pSock) : WebSocketSessionBase<SessionCreator<SessionType>,HttpSessionType,DataType>(pSock){}
virtual ~WebSocketSession(){}
virtual ~WebSocketSession() = default;
};
#endif //ZLMEDIAKIT_WEBSOCKETSESSION_H
......@@ -51,7 +51,7 @@ public:
//根据内存地址设置掩码随机数
_mask.assign((uint8_t*)(&ptr), (uint8_t*)(&ptr) + 4);
}
virtual ~WebSocketHeader(){}
virtual ~WebSocketHeader() = default;
public:
bool _fin;
......@@ -71,7 +71,7 @@ public:
WebSocketBuffer(WebSocketHeader::Type headType, bool fin, ARGS &&...args)
: toolkit::BufferString(std::forward<ARGS>(args)...), _fin(fin), _head_type(headType){}
~WebSocketBuffer() override {}
~WebSocketBuffer() override = default;
WebSocketHeader::Type headType() const { return _head_type; }
......@@ -84,8 +84,8 @@ private:
class WebSocketSplitter : public WebSocketHeader{
public:
WebSocketSplitter(){}
virtual ~WebSocketSplitter(){}
WebSocketSplitter() = default;
virtual ~WebSocketSplitter() = default;
/**
* 输入数据以便解包webSocket数据以及处理粘包问题
......
......@@ -100,7 +100,13 @@ void PlayerProxy::play(const string &strUrlTmp) {
strongSelf->_on_close(err);
}
});
MediaPlayer::play(strUrlTmp);
try {
MediaPlayer::play(strUrlTmp);
} catch (std::exception &ex) {
ErrorL << ex.what();
_on_play_result(SockException(Err_other, ex.what()));
return;
}
_pull_url = strUrlTmp;
setDirectProxy();
}
......
......@@ -121,6 +121,18 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
});
break;
}
case CodecJPEG:{
int64_t dts_out, pts_out;
track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
mp4_writer_write(_mov_writter.get(),
track_info.track_id,
frame->data(),
frame->size(),
pts_out,
dts_out,
frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0);
break;
}
default: {
int64_t dts_out, pts_out;
......
......@@ -86,7 +86,7 @@ namespace mediakit {
class MpegMuxer : public MediaSinkInterface {
public:
MpegMuxer(bool is_ps) {};
MpegMuxer(bool is_ps) = default;
~MpegMuxer() override = default;
bool addTrack(const Track::Ptr &track) override { return false; }
void resetTracks() override {}
......
......@@ -242,11 +242,9 @@ vector<RtcpHeader *> RtcpHeader::loadFromBytes(char *data, size_t len) {
class BufferRtcp : public Buffer {
public:
BufferRtcp(std::shared_ptr<RtcpHeader> rtcp) { _rtcp = std::move(rtcp); }
~BufferRtcp() override {}
~BufferRtcp() override = default;
char *data() const override { return (char *)_rtcp.get(); }
size_t size() const override { return _rtcp->getSize(); }
private:
......@@ -551,7 +549,7 @@ const void *RtcpFB::getFciPtr() const {
size_t RtcpFB::getFciSize() const {
auto fci_len = (ssize_t)getSize() - getPaddingSize() - sizeof(RtcpFB);
CHECK(fci_len >= 0);
CHECK(getSize() >= getPaddingSize() + sizeof(RtcpFB));
return fci_len;
}
......
......@@ -16,7 +16,7 @@ TitleMeta::TitleMeta(float dur_sec, size_t fileSize, const std::map<std::string,
{
_metadata.set("duration", dur_sec);
_metadata.set("fileSize", (int)fileSize);
_metadata.set("server", kServerName);
_metadata.set("title", std::string("Streamed by ") + kServerName);
for (auto &pr : header) {
_metadata.set(pr.first, pr.second);
}
......
......@@ -207,8 +207,8 @@ class Metadata : public CodecInfo{
public:
using Ptr = std::shared_ptr<Metadata>;
Metadata():_metadata(AMF_OBJECT){}
virtual ~Metadata(){}
Metadata(): _metadata(AMF_OBJECT) {}
virtual ~Metadata() = default;
const AMFValue &getMetadata() const{
return _metadata;
}
......@@ -239,7 +239,7 @@ public:
using Ptr = std::shared_ptr<VideoMeta>;
VideoMeta(const VideoTrack::Ptr &video);
virtual ~VideoMeta(){}
virtual ~VideoMeta() = default;
CodecId getCodecId() const override{
return _codecId;
......@@ -253,8 +253,7 @@ public:
using Ptr = std::shared_ptr<AudioMeta>;
AudioMeta(const AudioTrack::Ptr &audio);
virtual ~AudioMeta(){}
virtual ~AudioMeta() = default;
CodecId getCodecId() const override{
return _codecId;
......
......@@ -99,7 +99,7 @@ public:
*/
virtual void setMetaData(const AMFValue &metadata) {
_metadata = metadata;
_metadata.set("server", kServerName);
_metadata.set("title", std::string("Streamed by ") + kServerName);
_have_video = _metadata["videocodecid"];
_have_audio = _metadata["audiocodecid"];
if (_ring) {
......
......@@ -101,7 +101,7 @@ void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshake_done) {
return;
}
WarnL << ex.getErrCode() << " " << ex.what();
WarnL << ex.getErrCode() << " " << ex;
if (!handshake_done) {
//开始播放阶段
_play_timer.reset();
......
......@@ -181,13 +181,13 @@ void RtmpProtocol::sendRequest(int cmd, const string& str) {
class BufferPartial : public Buffer {
public:
BufferPartial(const Buffer::Ptr &buffer, size_t offset, size_t size){
BufferPartial(const Buffer::Ptr &buffer, size_t offset, size_t size) {
_buffer = buffer;
_data = buffer->data() + offset;
_size = size;
}
~BufferPartial() override{}
~BufferPartial() override = default;
char *data() const override {
return _data;
......
......@@ -19,14 +19,11 @@ using namespace toolkit;
namespace mediakit {
RtmpSession::RtmpSession(const Socket::Ptr &sock) : Session(sock) {
DebugP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);
sock->setSendTimeOutSecond(keep_alive_sec);
}
RtmpSession::~RtmpSession() {
DebugP(this);
}
RtmpSession::~RtmpSession() = default;
void RtmpSession::onError(const SockException& err) {
bool is_player = !_push_src_ownership;
......
......@@ -336,7 +336,7 @@ void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr> > rtp_list) {
void RtpSender::onErr(const SockException &ex) {
_is_connect = false;
WarnL << "send rtp connection lost: " << ex.what();
WarnL << "send rtp connection lost: " << ex;
onClose(ex);
}
......
......@@ -102,7 +102,8 @@ public:
process->setOnDetach(std::move(strong_self->_on_detach));
}
if (!process) { // process 未创建,触发rtp server 超时事件
NoticeCenter::Instance().emitEvent(Broadcast::KBroadcastRtpServerTimeout,strong_self->_local_port,strong_self->_stream_id,(int)strong_self->_tcp_mode,strong_self->_re_use_port,strong_self->_ssrc);
NoticeCenter::Instance().emitEvent(Broadcast::KBroadcastRtpServerTimeout, strong_self->_local_port, strong_self->_stream_id,
(int)strong_self->_tcp_mode, strong_self->_re_use_port, strong_self->_ssrc);
}
}
return 0;
......@@ -198,11 +199,14 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
helper->startRtcp();
helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_audio);
bool bind_peer_addr = false;
rtp_socket->setOnRead([rtp_socket, helper, ssrc, bind_peer_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable {
auto ssrc_ptr = std::make_shared<uint32_t>(ssrc);
_ssrc = ssrc_ptr;
rtp_socket->setOnRead([rtp_socket, helper, ssrc_ptr, bind_peer_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable {
RtpHeader *header = (RtpHeader *)buf->data();
auto rtp_ssrc = ntohl(header->ssrc);
auto ssrc = *ssrc_ptr;
if (ssrc && rtp_ssrc != ssrc) {
WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc;
WarnL << "ssrc mismatched, rtp dropped: " << rtp_ssrc << " != " << ssrc;
} else {
if (!bind_peer_addr) {
//绑定对方ip+端口,防止多个设备或一个设备多次推流从而日志报ssrc不匹配问题
......@@ -213,19 +217,11 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
}
});
} else {
#if 1
//单端口多线程接收多个流,根据ssrc区分流
udp_server = std::make_shared<UdpServer>(rtp_socket->getPoller());
(*udp_server)[RtpSession::kOnlyAudio] = only_audio;
udp_server->start<RtpSession>(local_port, local_ip);
rtp_socket = nullptr;
#else
//单端口单线程接收多个流
auto &ref = RtpSelector::Instance();
rtp_socket->setOnRead([&ref, rtp_socket](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
ref.inputRtp(rtp_socket, buf->data(), buf->size(), addr);
});
#endif
}
_on_cleanup = [rtp_socket, stream_id]() {
......@@ -264,7 +260,7 @@ void RtpServer::connectToServer(const std::string &url, uint16_t port, const fun
return;
}
if (err) {
WarnL << "连接到服务器 " << url << ":" << port << " 失败 " << err.what();
WarnL << "连接到服务器 " << url << ":" << port << " 失败 " << err;
} else {
InfoL << "连接到服务器 " << url << ":" << port << " 成功";
strong_self->onConnect();
......@@ -288,5 +284,15 @@ void RtpServer::onConnect() {
});
}
void RtpServer::updateSSRC(uint32_t ssrc) {
if (_ssrc) {
*_ssrc = ssrc;
}
if (_tcp_server) {
(*_tcp_server)[RtpSession::kSSRC] = ssrc;
}
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)
......@@ -64,6 +64,11 @@ public:
*/
void setOnDetach(std::function<void()> cb);
/**
* 更新ssrc
*/
void updateSSRC(uint32_t ssrc);
private:
// tcp主动模式连接服务器成功回调
void onConnect();
......@@ -72,6 +77,7 @@ protected:
toolkit::Socket::Ptr _rtp_socket;
toolkit::UdpServer::Ptr _udp_server;
toolkit::TcpServer::Ptr _tcp_server;
std::shared_ptr<uint32_t> _ssrc;
std::shared_ptr<RtcpHelper> _rtcp_helper;
std::function<void()> _on_cleanup;
......
......@@ -36,7 +36,6 @@ void RtpSession::setParams(mINI &ini) {
}
RtpSession::RtpSession(const Socket::Ptr &sock) : Session(sock) {
DebugP(this);
socklen_t addr_len = sizeof(_addr);
getpeername(sock->rawFD(), (struct sockaddr *)&_addr, &addr_len);
_is_udp = sock->sockType() == SockNum::Sock_UDP;
......@@ -47,7 +46,6 @@ RtpSession::RtpSession(const Socket::Ptr &sock) : Session(sock) {
}
RtpSession::~RtpSession() {
DebugP(this);
if(_process){
RtpSelector::Instance().delProcess(_stream_id,_process.get());
}
......@@ -62,7 +60,7 @@ void RtpSession::onRecv(const Buffer::Ptr &data) {
}
void RtpSession::onError(const SockException &err) {
WarnP(this) << _stream_id << " " << err.what();
WarnP(this) << _stream_id << " " << err;
}
void RtpSession::onManager() {
......@@ -130,7 +128,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
uint32_t rtp_ssrc = 0;
RtpSelector::getSSRC(data, len, rtp_ssrc);
if (rtp_ssrc != _ssrc) {
WarnP(this) << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << _ssrc;
WarnP(this) << "ssrc mismatched, rtp dropped: " << rtp_ssrc << " != " << _ssrc;
return;
}
_process->inputRtp(false, getSock(), data, len, (struct sockaddr *)&_addr);
......
......@@ -13,12 +13,8 @@
#include "RtpSplitter.h"
namespace mediakit{
static const char kEHOME_MAGIC[] = "\x01\x00\x01\x00";
static const int kEHOME_OFFSET = 256;
RtpSplitter::RtpSplitter() {}
RtpSplitter::~RtpSplitter() {}
ssize_t RtpSplitter::onRecvHeader(const char *data,size_t len){
//忽略偏移量
data += _offset;
......@@ -38,11 +34,10 @@ static bool isEhome(const char *data, size_t len){
if (len < 4) {
return false;
}
if((data[0] == 0x01) && (data[1] == 0x00) && (data[2] >=0x01)){
if ((data[0] == 0x01) && (data[1] == 0x00) && (data[2] >= 0x01)) {
return true;
}
return false;
//return memcmp(data, kEHOME_MAGIC, sizeof(kEHOME_MAGIC) - 1) == 0;
}
const char *RtpSplitter::onSearchPacketTail(const char *data, size_t len) {
......
......@@ -18,8 +18,8 @@ namespace mediakit{
class RtpSplitter : public HttpRequestSplitter{
public:
RtpSplitter();
~RtpSplitter() override;
RtpSplitter() = default;
~RtpSplitter() override = default;
protected:
/**
......
......@@ -25,7 +25,7 @@ class TSSegment : public HttpRequestSplitter {
public:
typedef std::function<void(const char *data,size_t len)> onSegment;
TSSegment(size_t size = TS_PACKET_SIZE) : _size(size){}
~TSSegment(){}
~TSSegment() = default;
void setOnSegment(onSegment cb);
static bool isTSPacket(const char *data, size_t len);
......
......@@ -74,7 +74,7 @@ public:
_interleaved = interleaved;
}
virtual ~RtpInfo() {}
virtual ~RtpInfo() = default;
//返回rtp负载最大长度
size_t getMaxSize() const {
......
......@@ -237,9 +237,9 @@ class SdpParser {
public:
using Ptr = std::shared_ptr<SdpParser>;
SdpParser() {}
SdpParser() = default;
SdpParser(const std::string &sdp) { load(sdp); }
~SdpParser() {}
~SdpParser() = default;
void load(const std::string &sdp);
bool available() const;
......@@ -268,7 +268,7 @@ public:
_payload_type = payload_type;
}
virtual ~Sdp(){}
virtual ~Sdp() = default;
/**
* 获取sdp字符串
......
......@@ -53,14 +53,11 @@ static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
static recursive_mutex g_mtxGetter;
RtspSession::RtspSession(const Socket::Ptr &sock) : Session(sock) {
DebugP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
sock->setSendTimeOutSecond(keep_alive_sec);
}
RtspSession::~RtspSession() {
DebugP(this);
}
RtspSession::~RtspSession() = default;
void RtspSession::onError(const SockException &err) {
bool is_player = !_push_src_ownership;
......@@ -454,7 +451,6 @@ void RtspSession::handleReq_Describe(const Parser &parser) {
}
void RtspSession::onAuthSuccess() {
TraceP(this);
weak_ptr<RtspSession> weak_self = dynamic_pointer_cast<RtspSession>(shared_from_this());
MediaSource::findAsync(_media_info, weak_self.lock(), [weak_self](const MediaSource::Ptr &src){
auto strong_self = weak_self.lock();
......
......@@ -18,8 +18,8 @@ namespace mediakit{
class RtspSplitter : public HttpRequestSplitter{
public:
RtspSplitter(){}
virtual ~RtspSplitter(){}
RtspSplitter() = default;
virtual ~RtspSplitter() = default;
/**
* 是否允许接收rtp包
......
......@@ -25,13 +25,10 @@ static onceToken s_token([]() {
}, nullptr);
ShellSession::ShellSession(const Socket::Ptr &_sock) : Session(_sock) {
DebugP(this);
pleaseInputUser();
}
ShellSession::~ShellSession() {
DebugP(this);
}
ShellSession::~ShellSession() = default;
void ShellSession::onRecv(const Buffer::Ptr&buf) {
//DebugL << hexdump(buf->data(), buf->size());
......@@ -60,7 +57,7 @@ void ShellSession::onRecv(const Buffer::Ptr&buf) {
}
void ShellSession::onError(const SockException &err){
WarnP(this) << err.what();
WarnP(this) << err;
}
void ShellSession::onManager() {
......
......@@ -16,9 +16,7 @@ SrtSession::SrtSession(const Socket::Ptr &sock)
// TraceL<<"after addr len "<<addr_len<<" family "<<_peer_addr.ss_family;
}
SrtSession::~SrtSession() {
InfoP(this);
}
SrtSession::~SrtSession() = default;
EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) {
uint8_t *data = (uint8_t *)buffer->data();
......@@ -119,7 +117,7 @@ void SrtSession::onError(const SockException &err) {
// udp链接超时,但是srt链接不一定超时,因为可能存在udp链接迁移的情况
//在udp链接迁移时,新的SrtSession对象将接管SrtSession对象的生命周期
//本SrtSession对象将在超时后自动销毁
WarnP(this) << err.what();
WarnP(this) << err;
if (!_transport) {
return;
......
......@@ -6,10 +6,10 @@
namespace SRT {
SrtTransportImp::SrtTransportImp(const EventPoller::Ptr &poller)
: SrtTransport(poller) {}
: SrtTransport(poller) {
}
SrtTransportImp::~SrtTransportImp() {
InfoP(this);
uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << (_is_pusher ? "srt 推流器(" : "srt 播放器(") << _media_info.shortUrl() << ")断开,耗时(s):" << duration;
......
......@@ -33,7 +33,7 @@ protected:
}
//被动断开连接回调
void onErr(const SockException &ex) override {
WarnL << ex.what();
WarnL << ex;
}
//tcp连接成功后每2秒触发一次该事件
void onManager() override {
......@@ -42,7 +42,7 @@ protected:
}
//连接服务器结果回调
void onConnect(const SockException &ex) override{
DebugL << ex.what();
DebugL << ex;
}
//数据全部发送完毕后回调
......
......@@ -51,9 +51,7 @@ WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) {
_over_tcp = sock->sockType() == SockNum::Sock_TCP;
}
WebRtcSession::~WebRtcSession() {
InfoP(this);
}
WebRtcSession::~WebRtcSession() = default;
void WebRtcSession::attachServer(const Server &server) {
_server = std::dynamic_pointer_cast<toolkit::TcpServer>(const_cast<Server &>(server).shared_from_this());
......@@ -71,7 +69,7 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) {
if (!transport->getPoller()->isCurrentThread()) {
auto sock = Socket::createSocket(transport->getPoller(), false);
//1、克隆socket(fd不变),切换poller线程到WebRtcTransport所在线程
sock->cloneFromPeerSocket(*(getSock()));
sock->cloneSocket(*(getSock()));
auto server = _server;
std::string str(data, len);
sock->getPoller()->async([sock, server, str](){
......@@ -105,7 +103,7 @@ void WebRtcSession::onError(const SockException &err) {
//udp链接超时,但是rtc链接不一定超时,因为可能存在链接迁移的情况
//在udp链接迁移时,新的WebRtcSession对象将接管WebRtcTransport对象的生命周期
//本WebRtcSession对象将在超时后自动销毁
WarnP(this) << err.what();
WarnP(this) << err;
if (!_transport) {
return;
......
......@@ -1054,7 +1054,7 @@ void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx
}
void WebRtcTransportImp::onShutdown(const SockException &ex) {
WarnL << ex.what();
WarnL << ex;
unrefSelf();
for (auto &tuple : _ice_server->GetTuples()) {
tuple->shutdown(ex);
......@@ -1131,6 +1131,10 @@ void WebRtcPluginManager::registerPlugin(const string &type, Plugin cb) {
_map_creator[type] = std::move(cb);
}
std::string exchangeSdp(const WebRtcInterface &exchanger, const std::string& offer) {
return const_cast<WebRtcInterface &>(exchanger).getAnswerSdp(offer);
}
void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, const WebRtcArgs &args, const onCreateRtc &cb) {
lock_guard<mutex> lck(_mtx_creator);
auto it = _map_creator.find(type);
......
......@@ -43,6 +43,8 @@ public:
virtual const std::string &getIdentifier() const = 0;
};
std::string exchangeSdp(const WebRtcInterface &exchanger, const std::string& offer);
class WebRtcException : public WebRtcInterface {
public:
WebRtcException(const SockException &ex) : _ex(ex) {};
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论