Commit 67644a7b by xiongziliang

优化http分包器性能

parent 10ef7582
...@@ -7,48 +7,68 @@ ...@@ -7,48 +7,68 @@
#include "Util/util.h" #include "Util/util.h"
using namespace ZL::Util; using namespace ZL::Util;
void HttpRequestSplitter::input(const string &data) { void HttpRequestSplitter::input(const char *data,uint64_t len) {
if(_remain_data.empty()){ const char *ptr = data;
_remain_data = data; if(!_remain_data.empty()){
}else{ _remain_data.append(data,len);
_remain_data.append(data); data = ptr = _remain_data.data();
len = _remain_data.size();
} }
splitPacket: splitPacket:
//数据按照请求头处理 //数据按照请求头处理
size_t index; char *index = nullptr;
while (_content_len == 0 && (index = _remain_data.find("\r\n\r\n")) != std::string::npos ) { while (_content_len == 0 && (index = strstr(ptr,"\r\n\r\n")) != nullptr) {
//_content_len == 0,这是请求头 //_content_len == 0,这是请求头
_content_len = onRecvHeader(_remain_data.substr(0, index + 4)); _content_len = onRecvHeader(ptr, index - ptr + 4);
_remain_data.erase(0, index + 4); ptr = index + 4;
} }
if(_remain_data.empty()){ uint64_t remain = len - (ptr - data);
if(remain <= 0){
//没有剩余数据,清空缓存
_remain_data.clear();
return; return;
} }
if(_content_len == 0){
//尚未找到http头,缓存定位到剩余数据部分
_remain_data.assign(ptr,remain);
return;
}
//已经找到http头了
if(_content_len > 0){ if(_content_len > 0){
//数据按照固定长度content处理 //数据按照固定长度content处理
if(_remain_data.size() < _content_len){ if(remain < _content_len){
//数据不够 //数据不够,缓存定位到剩余数据部分
_remain_data.assign(ptr,remain);
return; return;
} }
//收到content数据,并且接受content完毕 //收到content数据,并且接受content完毕
onRecvContent(_remain_data.substr(0,_content_len)); onRecvContent(ptr,_content_len);
_remain_data.erase(0,_content_len);
remain -= _content_len;
ptr += _content_len;
//content处理完毕,后面数据当做请求头处理 //content处理完毕,后面数据当做请求头处理
_content_len = 0; _content_len = 0;
if(!_remain_data.empty()){ if(remain > 0){
//还有数据没有处理完毕 //还有数据没有处理完毕
_remain_data.assign(ptr,remain);
data = ptr = (char *)_remain_data.data();
len = _remain_data.size();
goto splitPacket; goto splitPacket;
} }
}else{ return;
//数据按照不固定长度content处理
onRecvContent(_remain_data);
_remain_data.clear();
} }
//_content_len < 0;数据按照不固定长度content处理
onRecvContent(ptr,remain);//消费掉所有剩余数据
_remain_data.clear();
} }
void HttpRequestSplitter::setContentLen(int64_t content_len) { void HttpRequestSplitter::setContentLen(int64_t content_len) {
......
...@@ -16,25 +16,29 @@ public: ...@@ -16,25 +16,29 @@ public:
/** /**
* 添加数据 * 添加数据
* @param data 需要添加的数据 * @param data 需要添加的数据
* @param len 数据长度
*/ */
void input(const string &data); void input(const char *data,uint64_t len);
protected: protected:
/** /**
* 收到请求头 * 收到请求头
* @param header 请求头 * @param data 请求头数据
* @param len 请求头长度
*
* @return 请求头后的content长度, * @return 请求头后的content长度,
* <0 : 代表后面所有数据都是content * <0 : 代表后面所有数据都是content
* 0 : 代表为后面数据还是请求头, * 0 : 代表为后面数据还是请求头,
* >0 : 代表后面数据为固定长度content, * >0 : 代表后面数据为固定长度content,
*/ */
virtual int64_t onRecvHeader(const string &header) = 0; virtual int64_t onRecvHeader(const char *data,uint64_t len) = 0;
/** /**
* 收到content分片或全部数据 * 收到content分片或全部数据
* onRecvHeader函数返回>0,则为全部数据 * onRecvHeader函数返回>0,则为全部数据
* @param content * @param data content分片或全部数据
* @param len 数据长度
*/ */
virtual void onRecvContent(const string &content) = 0; virtual void onRecvContent(const char *data,uint64_t len) {};
/** /**
* 设置content len * 设置content len
......
...@@ -117,7 +117,7 @@ HttpSession::~HttpSession() { ...@@ -117,7 +117,7 @@ HttpSession::~HttpSession() {
//DebugL; //DebugL;
} }
int64_t HttpSession::onRecvHeader(const string &header) { int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
typedef bool (HttpSession::*HttpCMDHandle)(int64_t &); typedef bool (HttpSession::*HttpCMDHandle)(int64_t &);
static unordered_map<string, HttpCMDHandle> g_mapCmdIndex; static unordered_map<string, HttpCMDHandle> g_mapCmdIndex;
static onceToken token([]() { static onceToken token([]() {
...@@ -125,7 +125,7 @@ int64_t HttpSession::onRecvHeader(const string &header) { ...@@ -125,7 +125,7 @@ int64_t HttpSession::onRecvHeader(const string &header) {
g_mapCmdIndex.emplace("POST",&HttpSession::Handle_Req_POST); g_mapCmdIndex.emplace("POST",&HttpSession::Handle_Req_POST);
}, nullptr); }, nullptr);
m_parser.Parse(header.data()); m_parser.Parse(header);
urlDecode(m_parser); urlDecode(m_parser);
string cmd = m_parser.Method(); string cmd = m_parser.Method();
auto it = g_mapCmdIndex.find(cmd); auto it = g_mapCmdIndex.find(cmd);
...@@ -148,9 +148,9 @@ int64_t HttpSession::onRecvHeader(const string &header) { ...@@ -148,9 +148,9 @@ int64_t HttpSession::onRecvHeader(const string &header) {
return content_len; return content_len;
} }
void HttpSession::onRecvContent(const string &content) { void HttpSession::onRecvContent(const char *data,uint64_t len) {
if(m_contentCallBack){ if(m_contentCallBack){
if(!m_contentCallBack(content)){ if(!m_contentCallBack(data,len)){
m_contentCallBack = nullptr; m_contentCallBack = nullptr;
} }
} }
...@@ -161,7 +161,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -161,7 +161,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
} }
void HttpSession::onRecv(const char *data,int size){ void HttpSession::onRecv(const char *data,int size){
m_ticker.resetTime(); m_ticker.resetTime();
input(string(data,size)); input(data,size);
} }
void HttpSession::onError(const SockException& err) { void HttpSession::onError(const SockException& err) {
...@@ -271,8 +271,8 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -271,8 +271,8 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) {
if(checkWebSocket()){ if(checkWebSocket()){
content_len = -1; content_len = -1;
auto parserCopy = m_parser; auto parserCopy = m_parser;
m_contentCallBack = [this,parserCopy](const string &data){ m_contentCallBack = [this,parserCopy](const char *data,uint64_t len){
onRecvWebSocketData(parserCopy,data); onRecvWebSocketData(parserCopy,data,len);
//m_contentCallBack是可持续的,后面还要处理后续数据 //m_contentCallBack是可持续的,后面还要处理后续数据
return true; return true;
}; };
...@@ -638,11 +638,11 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) { ...@@ -638,11 +638,11 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) {
//返回固定长度的content //返回固定长度的content
content_len = totalContentLen; content_len = totalContentLen;
auto parserCopy = m_parser; auto parserCopy = m_parser;
m_contentCallBack = [this,parserCopy](const string &content){ m_contentCallBack = [this,parserCopy](const char *data,uint64_t len){
//恢复http头 //恢复http头
m_parser = parserCopy; m_parser = parserCopy;
//设置content //设置content
m_parser.setContent(content); m_parser.setContent(string(data,len));
//触发http事件,emitHttpEvent内部会选择是否关闭连接 //触发http事件,emitHttpEvent内部会选择是否关闭连接
emitHttpEvent(true); emitHttpEvent(true);
//清空数据,节省内存 //清空数据,节省内存
...@@ -654,13 +654,13 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) { ...@@ -654,13 +654,13 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) {
//返回不固定长度的content //返回不固定长度的content
content_len = -1; content_len = -1;
auto parserCopy = m_parser; auto parserCopy = m_parser;
std::shared_ptr<int64_t> recvedContentLen = std::make_shared<int64_t>(0); std::shared_ptr<uint64_t> recvedContentLen = std::make_shared<uint64_t>(0);
bool bClose = (strcasecmp(m_parser["Connection"].data(),"close") == 0) || ( ++m_iReqCnt > maxReqCnt); bool bClose = (strcasecmp(m_parser["Connection"].data(),"close") == 0) || ( ++m_iReqCnt > maxReqCnt);
m_contentCallBack = [this,parserCopy,totalContentLen,recvedContentLen,bClose](const string &content){ m_contentCallBack = [this,parserCopy,totalContentLen,recvedContentLen,bClose](const char *data,uint64_t len){
*(recvedContentLen) += content.size(); *(recvedContentLen) += len;
onRecvUnlimitedContent(parserCopy,content,totalContentLen,*(recvedContentLen)); onRecvUnlimitedContent(parserCopy,data,len,totalContentLen,*(recvedContentLen));
if(*(recvedContentLen) < totalContentLen){ if(*(recvedContentLen) < totalContentLen){
//数据还没接收完毕 //数据还没接收完毕
......
...@@ -70,44 +70,34 @@ protected: ...@@ -70,44 +70,34 @@ protected:
std::shared_ptr<FlvMuxer> getSharedPtr() override; std::shared_ptr<FlvMuxer> getSharedPtr() override;
//HttpRequestSplitter override //HttpRequestSplitter override
/** int64_t onRecvHeader(const char *data,uint64_t len) override;
* 收到请求头 void onRecvContent(const char *data,uint64_t len) override;
* @param header 请求头
* @return 请求头后的content长度,
* <0 : 代表后面所有数据都是content
* 0 : 代表为后面数据还是请求头,
* >0 : 代表后面数据为固定长度content,
*/
int64_t onRecvHeader(const string &header) override;
/**
* 收到content分片或全部数据
* onRecvHeader函数返回>0,则为全部数据
* @param content
*/
void onRecvContent(const string &content) override;
/** /**
* 重载之用于处理不定长度的content * 重载之用于处理不定长度的content
* 这个函数可用于处理大文件上传、http-flv推流 * 这个函数可用于处理大文件上传、http-flv推流
* @param header http请求头 * @param header http请求头
* @param content content分片数据 * @param data content分片数据
* @param len content分片数据大小
* @param totalSize content总大小,如果为0则是不限长度content * @param totalSize content总大小,如果为0则是不限长度content
* @param recvedSize 已收数据大小 * @param recvedSize 已收数据大小
*/ */
virtual void onRecvUnlimitedContent(const Parser &header,const string &content,int64_t totalSize,int64_t recvedSize){ virtual void onRecvUnlimitedContent(const Parser &header,
const char *data,
uint64_t len,
uint64_t totalSize,
uint64_t recvedSize){
WarnL << "content数据长度过大,无法处理,请重载HttpSession::onRecvUnlimitedContent"; WarnL << "content数据长度过大,无法处理,请重载HttpSession::onRecvUnlimitedContent";
shutdown(); shutdown();
} }
void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{
DebugL << "默认关闭WebSocket";
shutdown();
};
/** void onRecvWebSocketData(const Parser &header,const char *data,uint64_t len){
* 重载之用于处理websocket数据 WebSocketSplitter::decode((uint8_t *)data,len);
* @param header http请求头
* @param data websocket数据
*/
virtual void onRecvWebSocketData(const Parser &header,const string &data){
WebSocketSplitter::decode((uint8_t *)data.data(),data.size());
} }
private: private:
Parser m_parser; Parser m_parser;
...@@ -119,7 +109,7 @@ private: ...@@ -119,7 +109,7 @@ private:
//flv over http //flv over http
MediaInfo m_mediaInfo; MediaInfo m_mediaInfo;
//处理content数据的callback //处理content数据的callback
function<bool (const string &content) > m_contentCallBack; function<bool (const char *data,uint64_t len) > m_contentCallBack;
private: private:
inline bool Handle_Req_GET(int64_t &content_len); inline bool Handle_Req_GET(int64_t &content_len);
inline bool Handle_Req_POST(int64_t &content_len); inline bool Handle_Req_POST(int64_t &content_len);
......
...@@ -90,6 +90,8 @@ begin_decode: ...@@ -90,6 +90,8 @@ begin_decode:
onWebSocketDecodeHeader(*this); onWebSocketDecodeHeader(*this);
} }
//进入后面逻辑代表已经获取到了webSocket协议头,
uint64_t remain = len - (ptr - data); uint64_t remain = len - (ptr - data);
if(remain > 0){ if(remain > 0){
uint64_t playload_slice_len = remain; uint64_t playload_slice_len = remain;
...@@ -101,16 +103,17 @@ begin_decode: ...@@ -101,16 +103,17 @@ begin_decode:
if(_playload_offset == _playload_len){ if(_playload_offset == _playload_len){
//这是下一个包 //这是下一个包
if(remain - playload_slice_len > 0){ remain -= playload_slice_len;
string nextPacket((char *)ptr + playload_slice_len,remain - playload_slice_len); ptr += playload_slice_len;
_got_header = false; _got_header = false;
_remain_data = nextPacket;
if(remain > 0){
//剩余数据是下一个包,把它的数据放置在缓存中
_remain_data.assign((char *)ptr,remain);
data = ptr = (uint8_t *)_remain_data.data(); data = ptr = (uint8_t *)_remain_data.data();
len = _remain_data.size(); len = _remain_data.size();
goto begin_decode; goto begin_decode;
} else{
_got_header = false;
} }
} }
} }
......
...@@ -140,15 +140,11 @@ void RtspSession::onManager() { ...@@ -140,15 +140,11 @@ void RtspSession::onManager() {
} }
void RtspSession::onRecvContent(const string &content){ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
}
int64_t RtspSession::onRecvHeader(const string &header) {
char tmp[2 * 1024]; char tmp[2 * 1024];
m_pcBuf = tmp; m_pcBuf = tmp;
m_parser.Parse(header.data()); //rtsp请求解析 m_parser.Parse(header); //rtsp请求解析
string strCmd = m_parser.Method(); //提取出请求命令字 string strCmd = m_parser.Method(); //提取出请求命令字
m_iCseq = atoi(m_parser["CSeq"].data()); m_iCseq = atoi(m_parser["CSeq"].data());
...@@ -188,18 +184,19 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -188,18 +184,19 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
m_ui64TotalBytes += pBuf->size(); m_ui64TotalBytes += pBuf->size();
if (m_bBase64need) { if (m_bBase64need) {
//quicktime 加密后的rtsp请求,需要解密 //quicktime 加密后的rtsp请求,需要解密
inputRtspOrRtcp(decodeBase64(string(pBuf->data(),pBuf->size()))); auto str = decodeBase64(string(pBuf->data(),pBuf->size()));
inputRtspOrRtcp(str.data(),str.size());
} else { } else {
inputRtspOrRtcp(string(pBuf->data(),pBuf->size())); inputRtspOrRtcp(pBuf->data(),pBuf->size());
} }
} }
void RtspSession::inputRtspOrRtcp(const string &str) { void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
if(str[0] == '$' && m_rtpType == PlayerBase::RTP_TCP){ if(data[0] == '$' && m_rtpType == PlayerBase::RTP_TCP){
//这是rtcp //这是rtcp
return; return;
} }
input(str); input(data,len);
} }
bool RtspSession::handleReq_Options() { bool RtspSession::handleReq_Options() {
......
/* /*
* MIT License * MIT License
* *
* Copyright (c) 2016 xiongziliang <771730766@qq.com> * Copyright (c) 2016 xiongziliang <771730766@qq.com>
...@@ -84,24 +84,9 @@ public: ...@@ -84,24 +84,9 @@ public:
protected: protected:
//HttpRequestSplitter override //HttpRequestSplitter override
/** int64_t onRecvHeader(const char *data,uint64_t len) override ;
* 收到请求头
* @param header 请求头
* @return 请求头后的content长度,
* <0 : 代表后面所有数据都是content
* 0 : 代表为后面数据还是请求头,
* >0 : 代表后面数据为固定长度content,
*/
int64_t onRecvHeader(const string &header) override ;
/**
* 收到content分片或全部数据
* onRecvHeader函数返回>0,则为全部数据
* @param content
*/
void onRecvContent(const string &content) override;
private: private:
void inputRtspOrRtcp(const string &str); void inputRtspOrRtcp(const char *data,uint64_t len);
int send(const string &strBuf) override { int send(const string &strBuf) override {
m_ui64TotalBytes += strBuf.size(); m_ui64TotalBytes += strBuf.size();
return m_pSender->send(strBuf); return m_pSender->send(strBuf);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论