Commit e2ce16c7 by xzl

Http服务改成异步回复模式

parent f71a32bc
...@@ -106,14 +106,14 @@ void HttpSession::onRecv(const Socket::Buffer::Ptr&pBuf) { ...@@ -106,14 +106,14 @@ void HttpSession::onRecv(const Socket::Buffer::Ptr&pBuf) {
onePkt = m_strRcvBuf.substr(0, index + 4); onePkt = m_strRcvBuf.substr(0, index + 4);
m_strRcvBuf.erase(0, index + 4); m_strRcvBuf.erase(0, index + 4);
switch (parserHttpReq(onePkt)) { switch (parserHttpReq(onePkt)) {
case 0: case Http_failed:
//失败 //失败
shutdown(); shutdown();
return; return;
case 1: case Http_success:
//成功 //成功
break; break;
case 2: case Http_moreData:
//需要更多数据,恢复数据并退出 //需要更多数据,恢复数据并退出
m_strRcvBuf = onePkt + m_strRcvBuf; m_strRcvBuf = onePkt + m_strRcvBuf;
m_parser.Clear(); m_parser.Clear();
...@@ -122,14 +122,14 @@ void HttpSession::onRecv(const Socket::Buffer::Ptr&pBuf) { ...@@ -122,14 +122,14 @@ void HttpSession::onRecv(const Socket::Buffer::Ptr&pBuf) {
} }
m_parser.Clear(); m_parser.Clear();
} }
inline int HttpSession::parserHttpReq(const string &str) { inline HttpSession::HttpCode HttpSession::parserHttpReq(const string &str) {
m_parser.Parse(str.data()); m_parser.Parse(str.data());
string cmd = m_parser.Method(); string cmd = m_parser.Method();
auto it = g_mapCmdIndex.find(cmd); auto it = g_mapCmdIndex.find(cmd);
if (it == g_mapCmdIndex.end()) { if (it == g_mapCmdIndex.end()) {
WarnL << cmd; WarnL << cmd;
sendResponse("403 Forbidden", makeHttpHeader(true), ""); sendResponse("403 Forbidden", makeHttpHeader(true), "");
return false; return Http_failed;
} }
auto fun = it->second; auto fun = it->second;
return (this->*fun)(); return (this->*fun)();
...@@ -147,34 +147,35 @@ void HttpSession::onManager() { ...@@ -147,34 +147,35 @@ void HttpSession::onManager() {
} }
} }
inline int HttpSession::Handle_Req_GET() { inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
string strUrl = strCoding::UrlUTF8Decode(m_parser.Url()); string strUrl = strCoding::UrlUTF8Decode(m_parser.Url());
string strFile = m_strPath + strUrl; string strFile = m_strPath + strUrl;
string strConType = m_parser["Connection"]; string strConType = m_parser["Connection"];
static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as<uint32_t>(); static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as<uint32_t>();
bool bClose = (strcasecmp(strConType.data(),"close") == 0) && ( ++m_iReqCnt < reqCnt); bool bClose = (strcasecmp(strConType.data(),"close") == 0) && ( ++m_iReqCnt < reqCnt);
HttpCode eHttpCode = bClose ? Http_failed : Http_success;
if (strFile.back() == '/') { if (strFile.back() == '/') {
//index the folder //index the folder
string strMeun; string strMeun;
if (!makeMeun(strFile, strMeun)) { if (!makeMeun(strFile, strMeun)) {
sendNotFound(bClose); sendNotFound(bClose);
return !bClose; return eHttpCode;
} }
sendResponse("200 OK", makeHttpHeader(bClose,strMeun.size() ), strMeun); sendResponse("200 OK", makeHttpHeader(bClose,strMeun.size() ), strMeun);
return !bClose; return eHttpCode;
} }
//download the file //download the file
struct stat tFileStat; struct stat tFileStat;
if (0 != stat(strFile.data(), &tFileStat)) { if (0 != stat(strFile.data(), &tFileStat)) {
sendNotFound(bClose); sendNotFound(bClose);
return !bClose; return eHttpCode;
} }
TimeTicker(); TimeTicker();
FILE *pFile = fopen(strFile.data(), "rb"); FILE *pFile = fopen(strFile.data(), "rb");
if (pFile == NULL) { if (pFile == NULL) {
sendNotFound(bClose); sendNotFound(bClose);
return !bClose; return eHttpCode;
} }
auto &strRange = m_parser["Range"]; auto &strRange = m_parser["Range"];
...@@ -200,7 +201,7 @@ inline int HttpSession::Handle_Req_GET() { ...@@ -200,7 +201,7 @@ inline int HttpSession::Handle_Req_GET() {
sendResponse(pcHttpResult, httpHeader, ""); sendResponse(pcHttpResult, httpHeader, "");
if (iRangeEnd - iRangeStart < 0) { if (iRangeEnd - iRangeStart < 0) {
//file is empty! //file is empty!
return !bClose; return eHttpCode;
} }
//send the file //send the file
...@@ -251,7 +252,7 @@ inline int HttpSession::Handle_Req_GET() { ...@@ -251,7 +252,7 @@ inline int HttpSession::Handle_Req_GET() {
}; };
onFlush(); onFlush();
sock->setOnFlush(onFlush); sock->setOnFlush(onFlush);
return true; return Http_success;
} }
inline bool HttpSession::makeMeun(const string &strFullPath, string &strRet) { inline bool HttpSession::makeMeun(const string &strFullPath, string &strRet) {
...@@ -365,45 +366,63 @@ inline HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iC ...@@ -365,45 +366,63 @@ inline HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iC
} }
return headerOut; return headerOut;
} }
inline int HttpSession::Handle_Req_POST() { inline HttpSession::HttpCode HttpSession::Handle_Req_POST() {
int iContentLen = atoi(m_parser["Content-Length"].data()); int iContentLen = atoi(m_parser["Content-Length"].data());
if (!iContentLen) { if (!iContentLen) {
return false; return Http_failed;
} }
if ((int) m_strRcvBuf.size() < iContentLen) { if ((int) m_strRcvBuf.size() < iContentLen) {
return 2; //需要更多数据 return Http_moreData; //需要更多数据
} }
auto strContent = m_strRcvBuf.substr(0, iContentLen); auto strContent = m_strRcvBuf.substr(0, iContentLen);
m_strRcvBuf.erase(0, iContentLen); m_strRcvBuf.erase(0, iContentLen);
string strUrl = strCoding::UrlUTF8Decode(m_parser.Url()); string strUrl = strCoding::UrlUTF8Decode(m_parser.Url());
string strConType = m_parser["Connection"]; string strConType = m_parser["Connection"];
static string charSet = mINI::Instance()[Config::Http::kCharSet];
static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as<uint32_t>(); static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as<uint32_t>();
bool bClose = (strcasecmp(strConType.data(),"close") == 0) && ( ++m_iReqCnt < reqCnt); bool bClose = (strcasecmp(strConType.data(),"close") == 0) && ( ++m_iReqCnt < reqCnt);
m_parser.setUrl(strUrl); m_parser.setUrl(strUrl);
m_parser.setContent(strContent); m_parser.setContent(strContent);
auto headerOut=makeHttpHeader(bClose);
static string notFound = mINI::Instance()[Config::Http::kNotFound]; weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
string strContentOut = notFound; HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut,
string strCodeOut = "404 Not Found"; const KeyValue &headerOut,
NoticeCenter::Instance().emitEvent( const string &contentOut){
Config::Broadcast::kBroadcastHttpRequest, auto strongSelf = weakSelf.lock();
m_parser,strCodeOut,headerOut,strContentOut); if(!strongSelf) {
return;
if(strContentOut.size()){ }
headerOut.emplace("Content-Type",StrPrinter<<"text/json; charset=" << charSet <<endl); strongSelf->async([weakSelf,bClose,codeOut,headerOut,contentOut]() {
headerOut.emplace("Content-Length",StrPrinter<<strContentOut.size()<<endl); auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->responseDelay(bClose,
const_cast<string &>(codeOut),
const_cast<KeyValue &>(headerOut),
const_cast<string &>(contentOut));
if(bClose){
strongSelf->shutdown();
}
});
};
if(!NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastHttpRequest,m_parser,invoker)){
invoker("404 Not Found",KeyValue(),"");
} }
sendResponse(strCodeOut.data(), headerOut, strContentOut); return Http_success;
return !bClose; }
void HttpSession::responseDelay(bool bClose,string &codeOut,KeyValue &headerOut, string &contentOut){
if(codeOut.empty()){
sendNotFound(bClose);
return;
}
auto headerOther=makeHttpHeader(bClose,contentOut.size(),"text/json");
headerOut.insert(headerOther.begin(), headerOther.end());
sendResponse(codeOut.data(), headerOut, contentOut);
} }
inline void HttpSession::sendNotFound(bool bClose) { inline void HttpSession::sendNotFound(bool bClose) {
static string notFound = mINI::Instance()[Config::Http::kNotFound]; static string notFound = mINI::Instance()[Config::Http::kNotFound];
sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound.data()); sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound);
} }
} /* namespace Http */ } /* namespace Http */
......
...@@ -11,16 +11,23 @@ ...@@ -11,16 +11,23 @@
#include "config.h" #include "config.h"
#include "Rtsp/Rtsp.h" #include "Rtsp/Rtsp.h"
#include "Network/TcpLimitedSession.h" #include "Network/TcpLimitedSession.h"
#include <functional>
using namespace std; using namespace std;
using namespace ZL::Network; using namespace ZL::Network;
using namespace ZL::Network;
namespace ZL { namespace ZL {
namespace Http { namespace Http {
class HttpSession: public TcpLimitedSession<MAX_TCP_SESSION> { class HttpSession: public TcpLimitedSession<MAX_TCP_SESSION> {
public: public:
typedef map<string,string> KeyValue; typedef map<string,string> KeyValue;
typedef std::function<void(const string &codeOut,
const KeyValue &headerOut,
const string &contentOut)> HttpResponseInvoker;
HttpSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock); HttpSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock);
virtual ~HttpSession(); virtual ~HttpSession();
...@@ -28,21 +35,30 @@ public: ...@@ -28,21 +35,30 @@ public:
void onError(const SockException &err) override; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
private: private:
typedef int (HttpSession::*HttpCMDHandle)(); typedef enum
{
Http_success = 0,
Http_failed = 1,
Http_moreData = 2,
} HttpCode;
typedef HttpSession::HttpCode (HttpSession::*HttpCMDHandle)();
static unordered_map<string, HttpCMDHandle> g_mapCmdIndex; static unordered_map<string, HttpCMDHandle> g_mapCmdIndex;
Parser m_parser; Parser m_parser;
string m_strPath; string m_strPath;
string m_strRcvBuf; string m_strRcvBuf;
Ticker m_ticker; Ticker m_ticker;
uint32_t m_iReqCnt = 0; uint32_t m_iReqCnt = 0;
inline int parserHttpReq(const string &);
inline int Handle_Req_GET(); inline HttpCode parserHttpReq(const string &);
inline int Handle_Req_POST(); inline HttpCode Handle_Req_GET();
inline HttpCode Handle_Req_POST();
inline bool makeMeun(const string &strFullPath, string &strRet); inline bool makeMeun(const string &strFullPath, string &strRet);
inline void sendNotFound(bool bClose); inline void sendNotFound(bool bClose);
inline void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); inline void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent);
inline KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); inline static KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html");
void responseDelay(bool bClose,string &codeOut,KeyValue &headerOut, string &contentOut);
}; };
} /* namespace Http */ } /* namespace Http */
......
...@@ -13,7 +13,7 @@ using namespace ZL::Thread; ...@@ -13,7 +13,7 @@ using namespace ZL::Thread;
class MediaSender { class MediaSender {
public: public:
static ThreadPool & sendThread() { static ThreadPool & sendThread() {
static ThreadPool pool(1, ThreadPool::PRIORITY_HIGHEST); static ThreadPool pool(1);
return pool; return pool;
} }
private: private:
......
...@@ -38,10 +38,20 @@ namespace Config { ...@@ -38,10 +38,20 @@ namespace Config {
////////////广播名称/////////// ////////////广播名称///////////
namespace Broadcast { namespace Broadcast {
extern const char kBroadcastRtspSessionPlay[]; extern const char kBroadcastRtspSessionPlay[];
#define BroadcastRtspSessionPlayArgs const char *app,const char *stream
extern const char kBroadcastRtspSrcRegisted[]; extern const char kBroadcastRtspSrcRegisted[];
#define BroadcastRtspSrcRegistedArgs const char *app,const char *stream
extern const char kBroadcastRtmpSrcRegisted[]; extern const char kBroadcastRtmpSrcRegisted[];
#define BroadcastRtmpSrcRegistedArgs const char *app,const char *stream
extern const char kBroadcastRecordMP4[]; extern const char kBroadcastRecordMP4[];
#define BroadcastRecordMP4Args const Mp4Info &info
extern const char kBroadcastHttpRequest[]; extern const char kBroadcastHttpRequest[];
#define BroadcastHttpRequestArgs const Parser &parser,HttpSession::HttpResponseInvoker &invoker
} //namespace Broadcast } //namespace Broadcast
//代理失败最大重试次数 //代理失败最大重试次数
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论