Commit 453994f2 by xiongziliang

减少内存拷贝,提高websocket服务器性能

parent 0931d82a
...@@ -954,12 +954,12 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer) { ...@@ -954,12 +954,12 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer) {
header._reserved = 0; header._reserved = 0;
header._opcode = WebSocketHeader::BINARY; header._opcode = WebSocketHeader::BINARY;
header._mask_flag = false; header._mask_flag = false;
WebSocketSplitter::encode(header,(uint8_t *)buffer->data(),buffer->size()); WebSocketSplitter::encode(header,buffer);
} }
void HttpSession::onWebSocketEncodeData(const uint8_t *ptr,uint64_t len){ void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){
_ui64TotalBytes += len; _ui64TotalBytes += buffer->size();
send((char *)ptr,len); send(buffer);
} }
void HttpSession::onDetach() { void HttpSession::onDetach() {
......
...@@ -104,10 +104,9 @@ protected: ...@@ -104,10 +104,9 @@ protected:
/** /**
* 发送数据进行websocket协议打包后回调 * 发送数据进行websocket协议打包后回调
* @param ptr * @param buffer
* @param len
*/ */
void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override; void onWebSocketEncodeData(const Buffer::Ptr &buffer) override;
private: private:
inline void Handle_Req_GET(int64_t &content_len); inline void Handle_Req_GET(int64_t &content_len);
inline void Handle_Req_POST(int64_t &content_len); inline void Handle_Req_POST(int64_t &content_len);
......
...@@ -89,7 +89,7 @@ protected: ...@@ -89,7 +89,7 @@ protected:
header._reserved = 0; header._reserved = 0;
header._opcode = WebSocketHeader::TEXT; header._opcode = WebSocketHeader::TEXT;
header._mask_flag = false; header._mask_flag = false;
strongSelf->WebSocketSplitter::encode(header,(uint8_t *)buf->data(),buf->size()); strongSelf->WebSocketSplitter::encode(header,buf);
} }
return buf->size(); return buf->size();
}); });
...@@ -118,12 +118,12 @@ protected: ...@@ -118,12 +118,12 @@ protected:
switch (header._opcode){ switch (header._opcode){
case WebSocketHeader::CLOSE:{ case WebSocketHeader::CLOSE:{
HttpSessionType::encode(header,nullptr,0); HttpSessionType::encode(header,nullptr);
} }
break; break;
case WebSocketHeader::PING:{ case WebSocketHeader::PING:{
const_cast<WebSocketHeader&>(header)._opcode = WebSocketHeader::PONG; const_cast<WebSocketHeader&>(header)._opcode = WebSocketHeader::PONG;
HttpSessionType::encode(header,(uint8_t *)_remian_data.data(),_remian_data.size()); HttpSessionType::encode(header,std::make_shared<BufferString>(_remian_data));
} }
break; break;
case WebSocketHeader::CONTINUATION:{ case WebSocketHeader::CONTINUATION:{
...@@ -132,8 +132,7 @@ protected: ...@@ -132,8 +132,7 @@ protected:
break; break;
case WebSocketHeader::TEXT: case WebSocketHeader::TEXT:
case WebSocketHeader::BINARY:{ case WebSocketHeader::BINARY:{
BufferString::Ptr buffer = std::make_shared<BufferString>(_remian_data); _session->onRecv(std::make_shared<BufferString>(_remian_data));
_session->onRecv(buffer);
} }
break; break;
default: default:
...@@ -145,11 +144,10 @@ protected: ...@@ -145,11 +144,10 @@ protected:
/** /**
* 发送数据进行websocket协议打包后回调 * 发送数据进行websocket协议打包后回调
* @param ptr * @param buffer
* @param len
*/ */
void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override{ void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
SocketHelper::send((char *)ptr,len); SocketHelper::send(buffer);
} }
private: private:
typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB; typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
......
...@@ -164,9 +164,9 @@ void WebSocketSplitter::onPlayloadData(uint8_t *ptr, uint64_t len) { ...@@ -164,9 +164,9 @@ void WebSocketSplitter::onPlayloadData(uint8_t *ptr, uint64_t len) {
onWebSocketDecodePlayload(*this, _mask_flag ? ptr - len : ptr, len, _playload_offset); onWebSocketDecodePlayload(*this, _mask_flag ? ptr - len : ptr, len, _playload_offset);
} }
void WebSocketSplitter::encode(const WebSocketHeader &header,uint8_t *data, const uint64_t len) { void WebSocketSplitter::encode(const WebSocketHeader &header,const Buffer::Ptr &buffer) {
string ret; string ret;
auto len = buffer ? buffer->size() : 0;
uint8_t byte = header._fin << 7 | ((header._reserved & 0x07) << 4) | (header._opcode & 0x0F) ; uint8_t byte = header._fin << 7 | ((header._reserved & 0x07) << 4) | (header._opcode & 0x0F) ;
ret.push_back(byte); ret.push_back(byte);
...@@ -195,16 +195,16 @@ void WebSocketSplitter::encode(const WebSocketHeader &header,uint8_t *data, cons ...@@ -195,16 +195,16 @@ void WebSocketSplitter::encode(const WebSocketHeader &header,uint8_t *data, cons
ret.append((char *)header._mask.data(),4); ret.append((char *)header._mask.data(),4);
} }
onWebSocketEncodeData((uint8_t*)ret.data(),ret.size()); onWebSocketEncodeData(std::make_shared<BufferString>(std::move(ret)));
if(len > 0){ if(len > 0){
if(mask_flag){ if(mask_flag){
uint8_t *ptr = data; uint8_t *ptr = (uint8_t*)buffer->data();
for(int i = 0; i < len ; ++i,++ptr){ for(int i = 0; i < len ; ++i,++ptr){
*(ptr) ^= header._mask[i % 4]; *(ptr) ^= header._mask[i % 4];
} }
} }
onWebSocketEncodeData(data,len); onWebSocketEncodeData(buffer);
} }
} }
......
...@@ -31,8 +31,10 @@ ...@@ -31,8 +31,10 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <memory> #include <memory>
using namespace std; #include "Network/Buffer.h"
using namespace std;
using namespace toolkit;
namespace mediakit { namespace mediakit {
...@@ -85,12 +87,10 @@ public: ...@@ -85,12 +87,10 @@ public:
/** /**
* 编码一个数据包 * 编码一个数据包
* 将触发2次onWebSocketEncodeData回调 * 将触发2次onWebSocketEncodeData回调
* 第一次是数据头,第二次是负载数据
* @param header 数据头 * @param header 数据头
* @param data 负载数据 * @param buffer 负载数据
* @param len 负载数据长度
*/ */
void encode(const WebSocketHeader &header,uint8_t *data,const uint64_t len); void encode(const WebSocketHeader &header,const Buffer::Ptr &buffer);
protected: protected:
/** /**
* 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调 * 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调
...@@ -119,7 +119,7 @@ protected: ...@@ -119,7 +119,7 @@ protected:
* @param ptr 数据指针 * @param ptr 数据指针
* @param len 数据指针长度 * @param len 数据指针长度
*/ */
virtual void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len){}; virtual void onWebSocketEncodeData(const Buffer::Ptr &buffer){};
private: private:
void onPlayloadData(uint8_t *data,uint64_t len); void onPlayloadData(uint8_t *data,uint64_t len);
private: private:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论