Commit e445c7e1 by xiongziliang

避免内存拷贝,大幅提高rtmp服务器的性能

parent 2b4d8a12
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include <cstdlib> #include <cstdlib>
#include "Util/util.h" #include "Util/util.h"
#include "Util/logger.h" #include "Util/logger.h"
#include "Network/Buffer.h"
#include "Network/sockutil.h" #include "Network/sockutil.h"
using namespace toolkit; using namespace toolkit;
...@@ -127,7 +128,7 @@ public: ...@@ -127,7 +128,7 @@ public:
#pragma pack(pop) #pragma pack(pop)
#endif // defined(_WIN32) #endif // defined(_WIN32)
class RtmpPacket { class RtmpPacket : public Buffer{
public: public:
typedef std::shared_ptr<RtmpPacket> Ptr; typedef std::shared_ptr<RtmpPacket> Ptr;
uint8_t typeId; uint8_t typeId;
...@@ -139,7 +140,13 @@ public: ...@@ -139,7 +140,13 @@ public:
uint32_t streamId; uint32_t streamId;
uint32_t chunkId; uint32_t chunkId;
std::string strBuf; std::string strBuf;
public:
char *data() const override{
return (char*)strBuf.data();
}
uint32_t size() const override {
return strBuf.size();
};
public: public:
RtmpPacket() = default; RtmpPacket() = default;
RtmpPacket(const RtmpPacket &that) = default; RtmpPacket(const RtmpPacket &that) = default;
......
...@@ -188,60 +188,83 @@ void RtmpProtocol::sendRequest(int iCmd, const string& str) { ...@@ -188,60 +188,83 @@ void RtmpProtocol::sendRequest(int iCmd, const string& str) {
sendRtmp(iCmd, _ui32StreamId, str, 0, CHUNK_SERVER_REQUEST); sendRtmp(iCmd, _ui32StreamId, str, 0, CHUNK_SERVER_REQUEST);
} }
class BufferPartial : public Buffer {
public:
BufferPartial(const Buffer::Ptr &buffer,uint32_t offset,uint32_t size){
_buffer = buffer;
_data = buffer->data() + offset;
_size = size;
}
~BufferPartial(){}
char *data() const override {
return _data;
}
uint32_t size() const override{
return _size;
}
private:
Buffer::Ptr _buffer;
char *_data;
uint32_t _size;
};
void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId,
const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) {
if (iChunkId < 2 || iChunkId > 63) { sendRtmp(ui8Type,ui32StreamId,std::make_shared<BufferString>(strBuf),ui32TimeStamp,iChunkId);
auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; }
throw std::runtime_error(strErr);
}
bool bExtStamp = ui32TimeStamp >= 0xFFFFFF; void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId,
RtmpHeader header; const Buffer::Ptr &buf, uint32_t ui32TimeStamp, int iChunkId){
header.flags = (iChunkId & 0x3f) | (0 << 6); if (iChunkId < 2 || iChunkId > 63) {
header.typeId = ui8Type; auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl;
set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); throw std::runtime_error(strErr);
set_be24(header.bodySize, strBuf.size()); }
set_le32(header.streamId, ui32StreamId);
bool bExtStamp = ui32TimeStamp >= 0xFFFFFF;
//估算rtmp包数据大小 RtmpHeader header;
uint32_t capacity = ((bExtStamp ? 5 : 1) * (1 + (strBuf.size() / _iChunkLenOut))) + strBuf.size() + sizeof(header); header.flags = (iChunkId & 0x3f) | (0 << 6);
uint32_t totalSize = 0; header.typeId = ui8Type;
BufferRaw::Ptr buffer = obtainBuffer(); set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp);
buffer->setCapacity(capacity); set_be24(header.bodySize, buf->size());
memcpy(buffer->data() + totalSize,(char *) &header, sizeof(header)); set_le32(header.streamId, ui32StreamId);
totalSize += sizeof(header);
//估算rtmp包数据大小
char acExtStamp[4]; uint32_t totalSize = 0;
if (bExtStamp) { onSendRawData(obtainBuffer((char *) &header, sizeof(header)));
//扩展时间戳 totalSize += sizeof(header);
set_be32(acExtStamp, ui32TimeStamp);
} char acExtStamp[4];
size_t pos = 0; if (bExtStamp) {
while (pos < strBuf.size()) { //扩展时间戳
if (pos) { set_be32(acExtStamp, ui32TimeStamp);
uint8_t flags = (iChunkId & 0x3f) | (3 << 6); }
memcpy(buffer->data() + totalSize,&flags, 1); size_t pos = 0;
totalSize += 1; while (pos < buf->size()) {
} if (pos) {
if (bExtStamp) { uint8_t flags = (iChunkId & 0x3f) | (3 << 6);
//扩展时间戳 onSendRawData(obtainBuffer((char *) &flags, 1));
memcpy(buffer->data() + totalSize,acExtStamp, 4); totalSize += 1;
totalSize += 4; }
} if (bExtStamp) {
size_t chunk = min(_iChunkLenOut, strBuf.size() - pos); //扩展时间戳
memcpy(buffer->data() + totalSize,strBuf.data() + pos, chunk); onSendRawData(obtainBuffer(acExtStamp, 4));
totalSize += chunk; totalSize += 4;
pos += chunk; }
} size_t chunk = min(_iChunkLenOut, buf->size() - pos);
buffer->setSize(totalSize); onSendRawData(std::make_shared<BufferPartial>(buf,pos,chunk));
onSendRawData(buffer); totalSize += chunk;
_ui32ByteSent += totalSize; pos += chunk;
if (_ui32WinSize > 0 && _ui32ByteSent - _ui32LastSent >= _ui32WinSize) { }
_ui32LastSent = _ui32ByteSent; _ui32ByteSent += totalSize;
sendAcknowledgement(_ui32ByteSent); if (_ui32WinSize > 0 && _ui32ByteSent - _ui32LastSent >= _ui32WinSize) {
} _ui32LastSent = _ui32ByteSent;
sendAcknowledgement(_ui32ByteSent);
}
} }
void RtmpProtocol::onParseRtmp(const char *pcRawData, int iSize) { void RtmpProtocol::onParseRtmp(const char *pcRawData, int iSize) {
_strRcvBuf.append(pcRawData, iSize); _strRcvBuf.append(pcRawData, iSize);
auto cb = _nextHandle; auto cb = _nextHandle;
......
...@@ -74,6 +74,7 @@ protected: ...@@ -74,6 +74,7 @@ protected:
void sendRequest(int iCmd, const string &str); void sendRequest(int iCmd, const string &str);
void sendResponse(int iType, const string &str); void sendResponse(int iType, const string &str);
void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID); void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID);
void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, int iChunkID);
protected: protected:
int _iReqID = 0; int _iReqID = 0;
uint32_t _ui32StreamId = STREAM_CONTROL; uint32_t _ui32StreamId = STREAM_CONTROL;
......
...@@ -200,7 +200,7 @@ inline void RtmpPusher::send_metaData(){ ...@@ -200,7 +200,7 @@ inline void RtmpPusher::send_metaData(){
sendRequest(MSG_DATA, enc.data()); sendRequest(MSG_DATA, enc.data());
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
sendRtmp(pkt->typeId, _ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId ); sendRtmp(pkt->typeId, _ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId );
}); });
_pRtmpReader = src->getRing()->attach(getPoller()); _pRtmpReader = src->getRing()->attach(getPoller());
...@@ -210,7 +210,7 @@ inline void RtmpPusher::send_metaData(){ ...@@ -210,7 +210,7 @@ inline void RtmpPusher::send_metaData(){
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId); strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId);
}); });
_pRtmpReader->setDetachCB([weakSelf](){ _pRtmpReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
......
...@@ -547,7 +547,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { ...@@ -547,7 +547,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
CLEAR_ARR(_aui32FirstStamp); CLEAR_ARR(_aui32FirstStamp);
modifiedStamp = 0; modifiedStamp = 0;
} }
sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId); sendRtmp(pkt->typeId, pkt->streamId, pkt, modifiedStamp, pkt->chunkId);
} }
void RtmpSession::doDelay(int delaySec, const std::function<void()> &fun) { void RtmpSession::doDelay(int delaySec, const std::function<void()> &fun) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论