WebSocketSession.h 8.08 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xiongziliang committed
9 10 11 12 13 14
 */

#ifndef ZLMEDIAKIT_WEBSOCKETSESSION_H
#define ZLMEDIAKIT_WEBSOCKETSESSION_H

#include "HttpSession.h"
xiongziliang committed
15
#include "Network/TcpServer.h"
xiongziliang committed
16 17

/**
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
 * 数据发送拦截器
 */
class SendInterceptor{
public:
    typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
    SendInterceptor() = default;
    virtual ~SendInterceptor() = default;
    virtual void setOnBeforeSendCB(const onBeforeSendCB &cb) = 0;
};

/**
 * 该类实现了TcpSession派生类发送数据的截取
 * 目的是发送业务数据前进行websocket协议的打包
 */
template <typename TcpSessionType>
class TcpSessionTypeImp : public TcpSessionType, public SendInterceptor{
public:
    typedef std::shared_ptr<TcpSessionTypeImp> Ptr;

    TcpSessionTypeImp(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock) :
            _identifier(parent.getIdentifier()), TcpSessionType(pSock) {}

    ~TcpSessionTypeImp() {}

    /**
     * 设置发送数据截取回调函数
     * @param cb 截取回调函数
     */
    void setOnBeforeSendCB(const onBeforeSendCB &cb) override {
        _beforeSendCB = cb;
    }

protected:
    /**
     * 重载send函数截取数据
     * @param buf 需要截取的数据
     * @return 数据字节数
     */
    int send(const Buffer::Ptr &buf) override {
        if (_beforeSendCB) {
            return _beforeSendCB(buf);
        }
        return TcpSessionType::send(buf);
    }

    string getIdentifier() const override {
        return _identifier;
    }

private:
    onBeforeSendCB _beforeSendCB;
    string _identifier;
};

template <typename TcpSessionType>
class TcpSessionCreator {
public:
    //返回的TcpSession必须派生于SendInterceptor,可以返回null
    TcpSession::Ptr operator()(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock){
        return std::make_shared<TcpSessionTypeImp<TcpSessionType> >(header,parent,pSock);
    }
};

/**
xiongziliang committed
82 83 84
* 通过该模板类可以透明化WebSocket协议,
* 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等
*/
85 86
template<typename Creator, typename HttpSessionType = HttpSession, WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class WebSocketSessionBase : public HttpSessionType {
xiongziliang committed
87
public:
88 89
    WebSocketSessionBase(const Socket::Ptr &pSock) : HttpSessionType(pSock){}
    virtual ~WebSocketSessionBase(){}
xiongziliang committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108

    //收到eof或其他导致脱离TcpServer事件的回调
    void onError(const SockException &err) override{
        HttpSessionType::onError(err);
        if(_session){
            _session->onError(err);
        }
    }
    //每隔一段时间触发,用来做超时管理
    void onManager() override{
        if(_session){
            _session->onManager();
        }else{
            HttpSessionType::onManager();
        }
    }

    void attachServer(const TcpServer &server) override{
        HttpSessionType::attachServer(server);
109
        _weak_server = const_cast<TcpServer &>(server).shared_from_this();
xiongziliang committed
110
    }
111

xiongziliang committed
112 113
protected:
    /**
114 115 116 117 118 119
     * websocket客户端连接上事件
     * @param header http头
     * @return true代表允许websocket连接,否则拒绝
     */
    bool onWebSocketConnect(const Parser &header) override{
        //创建websocket session类
120
        _session = _creator(header, *this,HttpSessionType::getSock());
121 122 123 124
        if(!_session){
            //此url不允许创建websocket连接
            return false;
        }
125
        auto strongServer = _weak_server.lock();
126 127 128 129 130
        if(strongServer){
            _session->attachServer(*strongServer);
        }

        //此处截取数据并进行websocket协议打包
131 132
        weak_ptr<WebSocketSessionBase> weakSelf = dynamic_pointer_cast<WebSocketSessionBase>(HttpSessionType::shared_from_this());
        dynamic_pointer_cast<SendInterceptor>(_session)->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf) {
133
            auto strongSelf = weakSelf.lock();
134
            if (strongSelf) {
135 136 137 138 139
                WebSocketHeader header;
                header._fin = true;
                header._reserved = 0;
                header._opcode = DataType;
                header._mask_flag = false;
140
                strongSelf->WebSocketSplitter::encode(header, buf);
141 142 143 144 145 146 147
            }
            return buf->size();
        });

        //允许websocket客户端
        return true;
    }
148

149
    /**
xiongziliang committed
150 151 152 153
     * 开始收到一个webSocket数据包
     */
    void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{
        //新包,原来的包残余数据清空掉
154
        _payload_section.clear();
xiongziliang committed
155 156 157 158 159
    }

    /**
     * 收到websocket数据包负载
     */
xiongziliang committed
160
    void onWebSocketDecodePayload(const WebSocketHeader &packet,const uint8_t *ptr,uint64_t len,uint64_t recved) override {
161
        _payload_section.append((char *)ptr,len);
xiongziliang committed
162 163 164 165 166 167 168 169 170 171 172 173 174
    }

    /**
     * 接收到完整的一个webSocket数据包后回调
     * @param header 数据包包头
     */
    void onWebSocketDecodeComplete(const WebSocketHeader &header_in) override {
        WebSocketHeader& header = const_cast<WebSocketHeader&>(header_in);
        auto  flag = header._mask_flag;
        header._mask_flag = false;

        switch (header._opcode){
            case WebSocketHeader::CLOSE:{
175
                HttpSessionType::encode(header,nullptr);
176
                HttpSessionType::shutdown(SockException(Err_shutdown, "recv close request from client"));
xiongziliang committed
177
                break;
178 179
            }
            
xiongziliang committed
180
            case WebSocketHeader::PING:{
181
                header._opcode = WebSocketHeader::PONG;
182
                HttpSessionType::encode(header,std::make_shared<BufferString>(_payload_section));
xiongziliang committed
183 184
                break;
            }
185 186
            
            case WebSocketHeader::CONTINUATION:
xiongziliang committed
187 188
            case WebSocketHeader::TEXT:
            case WebSocketHeader::BINARY:{
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
                if (!header._fin) {
                    //还有后续分片数据, 我们先缓存数据,所有分片收集完成才一次性输出
                    _payload_cache.append(std::move(_payload_section));
                    if (_payload_cache.size() < MAX_WS_PACKET) {
                        //还有内存容量缓存分片数据
                        break;
                    }
                    //分片缓存太大,需要清空
                }

                //最后一个包
                if (_payload_cache.empty()) {
                    //这个包是唯一个分片
                    _session->onRecv(std::make_shared<WebSocketBuffer>(header._opcode, header._fin, std::move(_payload_section)));
                    break;
                }

                //这个包由多个分片组成
                _payload_cache.append(std::move(_payload_section));
                _session->onRecv(std::make_shared<WebSocketBuffer>(header._opcode, header._fin, std::move(_payload_cache)));
                _payload_cache.clear();
xiongziliang committed
210
                break;
211 212 213
            }
            
            default: break;
xiongziliang committed
214
        }
215
        _payload_section.clear();
xiongziliang committed
216 217 218 219
        header._mask_flag = flag;
    }

    /**
220
     * 发送数据进行websocket协议打包后回调
xiongziliang committed
221
    */
222
    void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
223
        HttpSessionType::send(buffer);
xiongziliang committed
224
    }
225

xiongziliang committed
226
private:
227 228 229
    string _payload_cache;
    string _payload_section;
    weak_ptr<TcpServer> _weak_server;
230 231
    TcpSession::Ptr _session;
    Creator _creator;
xiongziliang committed
232 233 234
};


235 236 237 238 239 240 241
template<typename TcpSessionType,typename HttpSessionType = HttpSession,WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class WebSocketSession : public WebSocketSessionBase<TcpSessionCreator<TcpSessionType>,HttpSessionType,DataType>{
public:
    WebSocketSession(const Socket::Ptr &pSock) : WebSocketSessionBase<TcpSessionCreator<TcpSessionType>,HttpSessionType,DataType>(pSock){}
    virtual ~WebSocketSession(){}
};

xiongziliang committed
242
#endif //ZLMEDIAKIT_WEBSOCKETSESSION_H