Commit 8f8e4abc by xiongziliang

支持http-flv直播

parent 50f93cdc
......@@ -85,6 +85,13 @@ int HttpClientImp::send(const string& str) {
}
return HttpClient::send(str);
}
int HttpClientImp::send(string &&str){
if(_sslBox){
_sslBox->onSend(str.data(),str.size());
return str.size();
}
return HttpClient::send(std::move(str));
}
int HttpClientImp::send(const char* str, int len) {
if(_sslBox){
......
......@@ -55,6 +55,7 @@ private:
#ifdef ENABLE_OPENSSL
virtual void onRecvBytes(const char *data,int size) override;
virtual int send(const string &str) override;
virtual int send(string &&str) override;
virtual int send(const char *str, int len) override;
std::shared_ptr<SSL_Box> _sslBox;
#endif //ENABLE_OPENSSL
......
......@@ -40,7 +40,7 @@
#include "Util/onceToken.h"
#include "Util/mini.h"
#include "Util/NoticeCenter.h"
#include "Rtmp/utils.h"
using namespace ZL::Util;
......@@ -58,7 +58,7 @@ static const char*
get_mime_type(const char* name) {
const char* dot;
dot = strrchr(name, '.');
static unordered_map<string, string> mapType;
static HttpSession::KeyValue mapType;
static onceToken token([&]() {
mapType.emplace(".html","text/html");
mapType.emplace(".htm","text/html");
......@@ -85,13 +85,12 @@ get_mime_type(const char* name) {
mapType.emplace(".mp3","audio/mpeg");
mapType.emplace(".ogg","application/ogg");
mapType.emplace(".pac","application/x-ns-proxy-autoconfig");
mapType.emplace(".flv","video/x-flv");
}, nullptr);
if(!dot){
return "text/plain";
}
string strDot(dot);
transform(strDot.begin(), strDot.end(), strDot.begin(), (int (*)(int))tolower);
auto it = mapType.find(strDot);
auto it = mapType.find(dot);
if (it == mapType.end()) {
return "text/plain";
}
......@@ -173,13 +172,108 @@ void HttpSession::onManager() {
}
}
inline bool HttpSession::checkLiveFlvStream(){
auto pos = strrchr(m_parser.Url().data(),'.');
if(!pos){
//未找到".flv"后缀
return false;
}
if(strcasecmp(pos,".flv") != 0){
//未找到".flv"后缀
return false;
}
auto app = FindField(m_parser.Url().data(),"/","/");
auto stream = FindField(m_parser.Url().data(),(string("/") + app + "/").data(),".");
if(app.empty() || stream.empty()){
//不能拆分成2级url
return false;
}
//TO-DO
auto mediaSrc = RtmpMediaSource::find(app,stream);
if(!mediaSrc){
//该rtmp源不存在
return false;
}
if(!mediaSrc->ready()){
//未准备好
return false;
}
//找到rtmp源,发送http头,负载后续发送
sendResponse("200 OK", makeHttpHeader(false,0,get_mime_type(m_parser.Url().data())), "");
//发送flv文件头
char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video
bool is_have_audio = false,is_have_video = false;
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
if(pkt->typeId == MSG_VIDEO){
is_have_video = true;
}
if(pkt->typeId == MSG_AUDIO){
is_have_audio = true;
}
});
if (is_have_audio && is_have_video) {
flv_file_header[4] = 0x05;
} else if (is_have_audio && !is_have_video) {
flv_file_header[4] = 0x04;
} else if (!is_have_audio && is_have_video) {
flv_file_header[4] = 0x01;
} else {
flv_file_header[4] = 0x00;
}
//send flv header
send(flv_file_header, sizeof(flv_file_header) - 1);
//send metadata
AMFEncoder invoke;
invoke << "onMetaData" << mediaSrc->getMetaData();
sendRtmp(MSG_DATA, invoke.data(), 0);
//send config frame
mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
onSendMedia(pkt);
});
//开始发送rtmp负载
m_pRingReader = mediaSrc->getRing()->attach();
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->async([pkt,weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->onSendMedia(pkt);
});
});
m_pRingReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->async_first([weakSelf](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->shutdown();
});
});
return true;
}
inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
//先看看该http事件是否被拦截
if(emitHttpEvent(false)){
return Http_success;
}
if(checkLiveFlvStream()){
return Http_success;
}
//事件未被拦截,则认为是http下载请求
string strFile = m_strPath + m_parser.Url();
/////////////HTTP连接是否需要被关闭////////////////
static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as<uint32_t>();
......@@ -235,7 +329,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
//先回复HTTP头部分
sendResponse(pcHttpResult, httpHeader, "");
if (iRangeEnd - iRangeStart < 0) {
//文件空的!
//文件空的!
return eHttpCode;
}
//回复Content部分
......@@ -388,15 +482,17 @@ inline HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iC
static uint32_t keepAliveSec = mINI::Instance()[Config::Http::kKeepAliveSecond].as<uint32_t>();
static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as<uint32_t>();
headerOut.emplace("Date", dateStr());
headerOut.emplace("Server", serverName);
headerOut.emplace("Connection", bClose ? "close" : "keep-alive");
if(!bClose){
headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl);
}
headerOut.emplace("Date", dateStr());
if(iContentSize >=0 && pcContentType !=nullptr){
if(pcContentType){
auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl;
headerOut.emplace("Content-Type",strContentType.data());
headerOut.emplace("Content-Type",strContentType);
}
if(iContentSize > 0){
headerOut.emplace("Content-Length", StrPrinter<<iContentSize<<endl);
}
return headerOut;
......@@ -484,5 +580,48 @@ inline void HttpSession::sendNotFound(bool bClose) {
sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound);
}
void HttpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
auto modifiedStamp = pkt->timeStamp;
auto &firstStamp = m_aui32FirstStamp[pkt->typeId % 2];
if(!firstStamp){
firstStamp = modifiedStamp;
}
if(modifiedStamp >= firstStamp){
//计算时间戳增量
modifiedStamp -= firstStamp;
}else{
//发生回环,重新计算时间戳增量
CLEAR_ARR(m_aui32FirstStamp);
modifiedStamp = 0;
}
sendRtmp(pkt->typeId, pkt->strBuf, modifiedStamp);
}
class RtmpTagHeader {
public:
uint8_t type = 0;
uint8_t data_size[3] = {0};
uint8_t timestamp[3] = {0};
uint8_t timestamp_ex = 0;
uint8_t streamid[3] = {0}; /* Always 0. */
}PACKED;
void HttpSession::sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp) {
auto size = htonl(m_previousTagSize);
send((char *)&size,4);//send PreviousTagSize
RtmpTagHeader header;
header.type = ui8Type;
set_be24(header.data_size, strBuf.size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
send((char *)&header, sizeof(header));//send tag header
send(strBuf);//send tag data
m_previousTagSize += (strBuf.size() + sizeof(header) + 4);
m_ticker.resetTime();
}
} /* namespace Http */
} /* namespace ZL */
......@@ -30,8 +30,10 @@
#include "Common/config.h"
#include "Rtsp/Rtsp.h"
#include "Network/TcpLimitedSession.h"
#include "Rtmp/RtmpMediaSource.h"
using namespace std;
using namespace ZL::Rtmp;
using namespace ZL::Network;
namespace ZL {
......@@ -71,10 +73,17 @@ private:
Ticker m_ticker;
uint32_t m_iReqCnt = 0;
//flv over http
uint32_t m_aui32FirstStamp[2] = {0};
uint32_t m_previousTagSize = 0;
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr m_pRingReader;
void onSendMedia(const RtmpPacket::Ptr &pkt);
void sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp);
inline HttpCode parserHttpReq(const string &);
inline HttpCode Handle_Req_GET();
inline HttpCode Handle_Req_POST();
inline bool checkLiveFlvStream();
inline bool emitHttpEvent(bool doInvoke);
inline void urlDecode(Parser &parser);
inline bool makeMeun(const string &strFullPath, string &strRet);
......
......@@ -76,6 +76,11 @@ private:
m_sslBox.onSend(buf.data(), buf.size());
return buf.size();
}
virtual int send(string &&buf) override{
TimeTicker();
m_sslBox.onSend(buf.data(), buf.size());
return buf.size();
}
virtual int send(const char *buf, int size) override{
TimeTicker();
m_sslBox.onSend(buf, size);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论