HttpSession.h 10.4 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 * Copyright (c) 2016 xiongziliang <771730766@qq.com>
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25 26 27 28
 */
#ifndef SRC_HTTP_HTTPSESSION_H_
#define SRC_HTTP_HTTPSESSION_H_

xiongzilaing committed
29
#include <functional>
xiongziliang committed
30
#include "Common/config.h"
xzl committed
31
#include "Rtsp/Rtsp.h"
32
#include "Network/TcpSession.h"
33
#include "Network/TcpServer.h"
xiongziliang committed
34
#include "Rtmp/RtmpMediaSource.h"
xiongziliang committed
35
#include "Rtmp/FlvMuxer.h"
36
#include "HttpRequestSplitter.h"
37
#include "WebSocketSplitter.h"
xzl committed
38 39

using namespace std;
xiongziliang committed
40
using namespace ZL::Rtmp;
xzl committed
41 42 43 44 45
using namespace ZL::Network;

namespace ZL {
namespace Http {

46 47 48 49
class HttpSession: public TcpSession,
                   public FlvMuxer,
                   public HttpRequestSplitter,
                   public WebSocketSplitter {
xzl committed
50
public:
xiongziliang committed
51
	typedef StrCaseMap KeyValue;
52 53 54 55
	typedef std::function<void(const string &codeOut,
							   const KeyValue &headerOut,
							   const string &contentOut)>  HttpResponseInvoker;

xzl committed
56 57 58
	HttpSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock);
	virtual ~HttpSession();

59
	virtual void onRecv(const Buffer::Ptr &) override;
xzl committed
60 61
	virtual void onError(const SockException &err) override;
	virtual void onManager() override;
xiongziliang committed
62 63

	static string urlDecode(const string &str);
xzl committed
64
protected:
65
	//用于HttpsSession调用
xzl committed
66
	void onRecv(const char *data,int size);
xiongziliang committed
67 68 69 70 71
	//FlvMuxer override
	void onWrite(const Buffer::Ptr &data) override ;
	void onWrite(const char *data,int len) override;
	void onDetach() override;
	std::shared_ptr<FlvMuxer> getSharedPtr() override;
72 73
	//HttpRequestSplitter override

74 75
	int64_t onRecvHeader(const char *data,uint64_t len) override;
	void onRecvContent(const char *data,uint64_t len) override;
76

77 78
	/**
	 * 重载之用于处理不定长度的content
xiongziliang committed
79
	 * 这个函数可用于处理大文件上传、http-flv推流
80
	 * @param header http请求头
81 82
	 * @param data content分片数据
	 * @param len content分片数据大小
83 84
	 * @param totalSize content总大小,如果为0则是不限长度content
	 * @param recvedSize 已收数据大小
85
	 */
86 87 88 89 90
	virtual void onRecvUnlimitedContent(const Parser &header,
										const char *data,
										uint64_t len,
										uint64_t totalSize,
										uint64_t recvedSize){
91 92 93 94
        WarnL << "content数据长度过大,无法处理,请重载HttpSession::onRecvUnlimitedContent";
        shutdown();
	}

95 96 97 98
    void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{
        DebugL << "默认关闭WebSocket";
        shutdown();
    };
xiongziliang committed
99

100 101
	void onRecvWebSocketData(const Parser &header,const char *data,uint64_t len){
        WebSocketSplitter::decode((uint8_t *)data,len);
xiongziliang committed
102
    }
103
private:
xzl committed
104 105 106 107
	Parser m_parser;
	string m_strPath;
	Ticker m_ticker;
	uint32_t m_iReqCnt = 0;
xiongziliang committed
108 109
	//消耗的总流量
	uint64_t m_ui64TotalBytes = 0;
xiongziliang committed
110
	//flv over http
111
    MediaInfo m_mediaInfo;
112
    //处理content数据的callback
113
	function<bool (const char *data,uint64_t len) > m_contentCallBack;
114 115 116
private:
	inline bool Handle_Req_GET(int64_t &content_len);
	inline bool Handle_Req_POST(int64_t &content_len);
xiongziliang committed
117
	inline bool checkLiveFlvStream();
xiongziliang committed
118
	inline bool checkWebSocket();
xiongziliang committed
119 120
	inline bool emitHttpEvent(bool doInvoke);
	inline void urlDecode(Parser &parser);
121
	inline bool makeMeun(const string &strFullPath,const string &vhost, string &strRet);
xzl committed
122 123
	inline void sendNotFound(bool bClose);
	inline void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent);
124
	inline static KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html");
xiongziliang committed
125 126 127
	void responseDelay(const string &Origin,bool bClose,
					   const string &codeOut,const KeyValue &headerOut,
					   const string &contentOut);
xzl committed
128 129
};

130

xiongziliang committed
131
/**
132 133 134
 * 通过该模板类可以透明化WebSocket协议,
 * 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等
 * @tparam SessionType 业务协议的TcpSession类
xiongziliang committed
135
 */
136 137
template <typename SessionType>
class WebSocketSession : public HttpSession {
xiongziliang committed
138
public:
139 140
    WebSocketSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) : HttpSession(pTh,pSock){}
    virtual ~WebSocketSession(){}
141 142 143 144

    //收到eof或其他导致脱离TcpServer事件的回调
    void onError(const SockException &err) override{
        HttpSession::onError(err);
145 146 147
        if(_session){
            _session->onError(err);
        }
148 149 150 151
    }
    //每隔一段时间触发,用来做超时管理
    void onManager() override{
        HttpSession::onManager();
152 153 154
        if(_session){
            _session->onManager();
        }
155
    }
156 157 158 159 160

    void attachServer(const TcpServer &server) override{
        HttpSession::attachServer(server);
        _weakServer = const_cast<TcpServer &>(server).shared_from_this();
    }
xiongziliang committed
161
protected:
162 163 164 165
    /**
     * 开始收到一个webSocket数据包
     * @param packet
     */
166
    void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{
167 168
        //新包,原来的包残余数据清空掉
        _remian_data.clear();
169 170 171 172 173 174 175 176 177 178 179

        if(!_firstPacket){
            return;
        }
        //这是个WebSocket会话而不是普通的Http会话
        _firstPacket = false;
        _session = std::make_shared<SessionImp>(nullptr,_sock);

        auto strongServer = _weakServer.lock();
        if(strongServer){
            _session->attachServer(*strongServer);
180
        }
181 182 183 184 185 186 187 188 189 190 191 192 193 194

        //此处截取数据并进行websocket协议打包
        weak_ptr<WebSocketSession> weakSelf = dynamic_pointer_cast<WebSocketSession>(shared_from_this());
        _session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){
            auto strongSelf = weakSelf.lock();
            if(strongSelf){
                bool mask_flag = strongSelf->_mask_flag;
                strongSelf->_mask_flag = false;
                strongSelf->WebSocketSplitter::encode((uint8_t *)buf->data(),buf->size());
                strongSelf->_mask_flag = mask_flag;
            }
            return buf->size();
        });

195 196 197 198 199 200 201 202 203
    }

    /**
     * 收到websocket数据包负载
     * @param packet
     * @param ptr
     * @param len
     * @param recved
     */
204
    void onWebSocketDecodePlayload(const WebSocketHeader &packet,const uint8_t *ptr,uint64_t len,uint64_t recved) override {
205 206 207 208 209 210 211 212 213 214 215 216 217 218
        if(packet._playload_len == recved){
            //收到完整的包
            if(_remian_data.empty()){
                onRecvWholePacket((char *)ptr,len);
            }else{
                _remian_data.append((char *)ptr,len);
                onRecvWholePacket(_remian_data);
                _remian_data.clear();
            }
        } else {
            //部分数据
            _remian_data.append((char *)ptr,len);
        }
    }
219

220 221 222 223 224 225 226 227
    /**
     * 发送数据进行websocket协议打包后回调
     * @param ptr
     * @param len
     */
    void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override{
        _session->realSend(_session->obtainBuffer((char *)ptr,len));
    }
228

229 230 231 232 233 234 235 236 237
    /**
     * 收到一个完整的websock数据包
     * @param data
     * @param len
     */
    void onRecvWholePacket(const char *data,uint64_t len){
        BufferRaw::Ptr buffer = _session->obtainBuffer(data,len);
        _session->onRecv(buffer);
    }
238

239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
    /**
     * 收到一个完整的websock数据包
     * @param str
     */
    void onRecvWholePacket(const string &str){
        BufferString::Ptr buffer = std::make_shared<BufferString>(str);
        _session->onRecv(buffer);
    }

private:
    typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
    /**
     * 该类实现了TcpSession派生类发送数据的截取
     * 目的是发送业务数据前进行websocket协议的打包
     */
    class SessionImp : public SessionType{
    public:
        SessionImp(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) : SessionType(pTh,pSock){};
        ~SessionImp(){}

        /**
         * 截取到数据后,再进行webSocket协议打包
         * 然后真正的发送数据到socket
         * @param buf 数据
         * @return 数据字节数
         */
        int realSend(const Buffer::Ptr &buf){
            return SessionType::send(buf);
        }

        /**
         * 设置发送数据截取回调函数
         * @param cb 截取回调函数
         */
        void setOnBeforeSendCB(const onBeforeSendCB &cb){
            _beforeSendCB = cb;
        }
    protected:
        /**
         * 重载send函数截取数据
         * @param buf 需要截取的数据
         * @return 数据字节数
         */
        int send(const Buffer::Ptr &buf) override {
            if(_beforeSendCB){
                return _beforeSendCB(buf);
            }
            return SessionType::send(buf);
        }
    private:
        onBeforeSendCB _beforeSendCB;
290
    };
291 292 293
private:
    std::shared_ptr<SessionImp> _session;
    string _remian_data;
294
    bool _firstPacket = true;
295
    weak_ptr<TcpServer> _weakServer;
296
};
297

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
/**
 * 回显会话
 */
class EchoSession : public TcpSession {
public:
    EchoSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) : TcpSession(pTh,pSock){};
    virtual ~EchoSession(){};

    void onRecv(const Buffer::Ptr &buffer) override {
        send(buffer);
    }
    void onError(const SockException &err) override{
        WarnL << err.what();
    }
    //每隔一段时间触发,用来做超时管理
    void onManager() override{
        DebugL;
    }
xiongziliang committed
316 317
};

318 319 320

typedef WebSocketSession<EchoSession> EchoWebSocketSession;

xzl committed
321 322 323 324
} /* namespace Http */
} /* namespace ZL */

#endif /* SRC_HTTP_HTTPSESSION_H_ */