Commit f9f487bf by xiongziliang

删除过多的线程切换

parent 7539042d
...@@ -128,53 +128,49 @@ static inline void addHttpListener(){ ...@@ -128,53 +128,49 @@ static inline void addHttpListener(){
} }
//该api已被消费 //该api已被消费
consumed = true; consumed = true;
AsyncHttpApi api = it->second; //执行API
//异步执行该api,防止阻塞NoticeCenter Json::Value val;
EventPollerPool::Instance().getExecutor()->async([api, parser, invoker]() { val["code"] = API::Success;
//执行API HttpSession::KeyValue headerOut;
Json::Value val; auto allArgs = getAllArgs(parser);
val["code"] = API::Success; HttpSession::KeyValue &headerIn = parser.getValues();
HttpSession::KeyValue &headerIn = parser.getValues(); headerOut["Content-Type"] = "application/json; charset=utf-8";
HttpSession::KeyValue headerOut; if(api_debug){
auto allArgs = getAllArgs(parser); auto newInvoker = [invoker,parser,allArgs](const string &codeOut,
headerOut["Content-Type"] = "application/json; charset=utf-8"; const HttpSession::KeyValue &headerOut,
if(api_debug){ const string &contentOut){
auto newInvoker = [invoker,parser,allArgs](const string &codeOut, stringstream ss;
const HttpSession::KeyValue &headerOut, for(auto &pr : allArgs ){
const string &contentOut){ ss << pr.first << " : " << pr.second << "\r\n";
stringstream ss; }
for(auto &pr : allArgs ){
ss << pr.first << " : " << pr.second << "\r\n"; DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n"
} << "# content:\r\n" << parser.Content() << "\r\n"
<< "# args:\r\n" << ss.str()
DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" << "# response:\r\n"
<< "# content:\r\n" << parser.Content() << "\r\n" << contentOut << "\r\n";
<< "# args:\r\n" << ss.str()
<< "# response:\r\n" invoker(codeOut,headerOut,contentOut);
<< contentOut << "\r\n"; };
((HttpSession::HttpResponseInvoker &)invoker) = newInvoker;
invoker(codeOut,headerOut,contentOut); }
};
((HttpSession::HttpResponseInvoker &)invoker) = newInvoker;
}
try { try {
api(headerIn, headerOut, allArgs, val, invoker); it->second(headerIn, headerOut, allArgs, val, invoker);
} catch(ApiRetException &ex){ } catch(ApiRetException &ex){
val["code"] = ex.code(); val["code"] = ex.code();
val["msg"] = ex.what(); val["msg"] = ex.what();
invoker("200 OK", headerOut, val.toStyledString()); invoker("200 OK", headerOut, val.toStyledString());
} catch(SqlException &ex){ } catch(SqlException &ex){
val["code"] = API::SqlFailed; val["code"] = API::SqlFailed;
val["msg"] = StrPrinter << "操作数据库失败:" << ex.what() << ":" << ex.getSql(); val["msg"] = StrPrinter << "操作数据库失败:" << ex.what() << ":" << ex.getSql();
WarnL << ex.what() << ":" << ex.getSql(); WarnL << ex.what() << ":" << ex.getSql();
invoker("200 OK", headerOut, val.toStyledString()); invoker("200 OK", headerOut, val.toStyledString());
} catch (std::exception &ex) { } catch (std::exception &ex) {
val["code"] = API::OtherFailed; val["code"] = API::OtherFailed;
val["msg"] = ex.what(); val["msg"] = ex.what();
invoker("200 OK", headerOut, val.toStyledString()); invoker("200 OK", headerOut, val.toStyledString());
} }
});
}); });
} }
......
...@@ -168,11 +168,9 @@ void installWebHook(){ ...@@ -168,11 +168,9 @@ void installWebHook(){
body["ip"] = sender.get_peer_ip(); body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port(); body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier(); body["id"] = sender.getIdentifier();
EventPollerPool::Instance().getExecutor()->async([body,invoker](){ //执行hook
//执行hook do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){
do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){ invoker(err);
invoker(err);
});
}); });
}); });
...@@ -185,12 +183,9 @@ void installWebHook(){ ...@@ -185,12 +183,9 @@ void installWebHook(){
body["ip"] = sender.get_peer_ip(); body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port(); body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier(); body["id"] = sender.getIdentifier();
//异步执行该hook api,防止阻塞NoticeCenter //执行hook
EventPollerPool::Instance().getExecutor()->async([body,invoker](){ do_http_hook(hook_play,body,[invoker](const Value &obj,const string &err){
//执行hook invoker(err);
do_http_hook(hook_play,body,[invoker](const Value &obj,const string &err){
invoker(err);
});
}); });
}); });
...@@ -204,12 +199,8 @@ void installWebHook(){ ...@@ -204,12 +199,8 @@ void installWebHook(){
body["id"] = sender.getIdentifier(); body["id"] = sender.getIdentifier();
body["totalBytes"] = (Json::UInt64)totalBytes; body["totalBytes"] = (Json::UInt64)totalBytes;
body["duration"] = (Json::UInt64)totalDuration; body["duration"] = (Json::UInt64)totalDuration;
//执行hook
//流量统计事件 do_http_hook(hook_flowreport,body, nullptr);
EventPollerPool::Instance().getExecutor()->async([body,totalBytes](){
//执行hook
do_http_hook(hook_flowreport,body, nullptr);
});
}); });
...@@ -226,17 +217,14 @@ void installWebHook(){ ...@@ -226,17 +217,14 @@ void installWebHook(){
body["ip"] = sender.get_peer_ip(); body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port(); body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier(); body["id"] = sender.getIdentifier();
//执行hook
EventPollerPool::Instance().getExecutor()->async([body,invoker](){ do_http_hook(hook_rtsp_realm,body, [invoker](const Value &obj,const string &err){
//执行hook if(!err.empty()){
do_http_hook(hook_rtsp_realm,body, [invoker](const Value &obj,const string &err){ //如果接口访问失败,那么该rtsp流认证失败
if(!err.empty()){ invoker(unAuthedRealm);
//如果接口访问失败,那么该rtsp流认证失败 return;
invoker(unAuthedRealm); }
return; invoker(obj["realm"].asString());
}
invoker(obj["realm"].asString());
});
}); });
}); });
...@@ -254,17 +242,14 @@ void installWebHook(){ ...@@ -254,17 +242,14 @@ void installWebHook(){
body["user_name"] = user_name; body["user_name"] = user_name;
body["must_no_encrypt"] = must_no_encrypt; body["must_no_encrypt"] = must_no_encrypt;
body["realm"] = realm; body["realm"] = realm;
//执行hook
EventPollerPool::Instance().getExecutor()->async([body,invoker](){ do_http_hook(hook_rtsp_auth,body, [invoker](const Value &obj,const string &err){
//执行hook if(!err.empty()){
do_http_hook(hook_rtsp_auth,body, [invoker](const Value &obj,const string &err){ //认证失败
if(!err.empty()){ invoker(false,makeRandStr(12));
//认证失败 return;
invoker(false,makeRandStr(12)); }
return; invoker(obj["encrypted"].asBool(),obj["passwd"].asString());
}
invoker(obj["encrypted"].asBool(),obj["passwd"].asString());
});
}); });
}); });
...@@ -280,11 +265,8 @@ void installWebHook(){ ...@@ -280,11 +265,8 @@ void installWebHook(){
body["vhost"] = vhost; body["vhost"] = vhost;
body["app"] = app; body["app"] = app;
body["stream"] = stream; body["stream"] = stream;
//执行hook
EventPollerPool::Instance().getExecutor()->async([body](){ do_http_hook(hook_stream_chaned,body, nullptr);
//执行hook
do_http_hook(hook_stream_chaned,body, nullptr);
});
}); });
//监听播放失败(未找到特定的流)事件 //监听播放失败(未找到特定的流)事件
...@@ -296,11 +278,8 @@ void installWebHook(){ ...@@ -296,11 +278,8 @@ void installWebHook(){
body["ip"] = sender.get_peer_ip(); body["ip"] = sender.get_peer_ip();
body["port"] = sender.get_peer_port(); body["port"] = sender.get_peer_port();
body["id"] = sender.getIdentifier(); body["id"] = sender.getIdentifier();
//执行hook
EventPollerPool::Instance().getExecutor()->async([body](){ do_http_hook(hook_stream_not_found,body, nullptr);
//执行hook
do_http_hook(hook_stream_not_found,body, nullptr);
});
}); });
#ifdef ENABLE_MP4V2 #ifdef ENABLE_MP4V2
...@@ -320,11 +299,8 @@ void installWebHook(){ ...@@ -320,11 +299,8 @@ void installWebHook(){
body["app"] = info.strAppName; body["app"] = info.strAppName;
body["stream"] = info.strStreamId; body["stream"] = info.strStreamId;
body["vhost"] = info.strVhost; body["vhost"] = info.strVhost;
//执行hook
EventPollerPool::Instance().getExecutor()->async([body](){ do_http_hook(hook_record_mp4,body, nullptr);
//执行hook
do_http_hook(hook_record_mp4,body, nullptr);
});
}); });
#endif //ENABLE_MP4V2 #endif //ENABLE_MP4V2
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论