WebSocketClient.h 11.8 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11 12
 */

#ifndef ZLMEDIAKIT_WebSocketClient_H
#define ZLMEDIAKIT_WebSocketClient_H
baiyfcu committed
13 14 15

#include "Util/util.h"
#include "Util/base64.h"
16 17 18 19
#include "Util/SHA1.h"
#include "Network/TcpClient.h"
#include "HttpClientImp.h"
#include "WebSocketSplitter.h"
baiyfcu committed
20 21
using namespace toolkit;

22
namespace mediakit{
baiyfcu committed
23

24 25
template <typename ClientType,WebSocketHeader::Type DataType>
class HttpWsClient;
baiyfcu committed
26

27 28 29 30 31
/**
 * 辅助类,用于拦截TcpClient数据发送前的拦截
 * @tparam ClientType TcpClient派生类
 * @tparam DataType 这里无用,为了声明友元用
 */
32 33
template <typename ClientType,WebSocketHeader::Type DataType>
class ClientTypeImp : public ClientType {
baiyfcu committed
34
public:
35 36
    typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
    friend class HttpWsClient<ClientType,DataType>;
baiyfcu committed
37

38 39 40
    template<typename ...ArgsType>
    ClientTypeImp(ArgsType &&...args): ClientType(std::forward<ArgsType>(args)...){}
    ~ClientTypeImp() override {};
baiyfcu committed
41 42
protected:
    /**
43 44 45
     * 发送前拦截并打包为websocket协议
     * @param buf
     * @return
baiyfcu committed
46
     */
47 48 49 50 51 52
    int send(const Buffer::Ptr &buf) override{
        if(_beforeSendCB){
            return _beforeSendCB(buf);
        }
        return ClientType::send(buf);
    }
baiyfcu committed
53
    /**
54 55
     * 设置发送数据截取回调函数
     * @param cb 截取回调函数
baiyfcu committed
56
     */
57 58 59 60 61 62
    void setOnBeforeSendCB(const onBeforeSendCB &cb){
        _beforeSendCB = cb;
    }
private:
    onBeforeSendCB _beforeSendCB;
};
baiyfcu committed
63

64 65 66 67 68
/**
 * 此对象完成了weksocket 客户端握手协议,以及到TcpClient派生类事件的桥接
 * @tparam ClientType TcpClient派生类
 * @tparam DataType websocket负载类型,是TEXT还是BINARY类型
 */
69 70 71 72
template <typename ClientType,WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class HttpWsClient : public HttpClientImp , public WebSocketSplitter{
public:
    typedef shared_ptr<HttpWsClient> Ptr;
baiyfcu committed
73

74 75
    HttpWsClient(ClientTypeImp<ClientType,DataType> &delegate) : _delegate(delegate){
        _Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(makeRandStr(16, false)));
xiongziliang committed
76
        _poller = delegate.getPoller();
77 78 79
    }
    ~HttpWsClient(){}

80 81 82 83 84
    /**
     * 发起ws握手
     * @param ws_url ws连接url
     * @param fTimeOutSec 超时时间
     */
85 86 87 88 89 90 91 92 93 94 95 96
    void startWsClient(const string &ws_url,float fTimeOutSec){
        string http_url = ws_url;
        replace(http_url,"ws://","http://");
        replace(http_url,"wss://","https://");
        setMethod("GET");
        addHeader("Upgrade","websocket");
        addHeader("Connection","Upgrade");
        addHeader("Sec-WebSocket-Version","13");
        addHeader("Sec-WebSocket-Key",_Sec_WebSocket_Key);
        _onRecv = nullptr;
        sendRequest(http_url,fTimeOutSec);
    }
97 98 99 100 101 102 103 104 105 106 107 108 109 110

    void closeWsClient(){
        if(!_onRecv){
            //未连接
            return;
        }
        WebSocketHeader header;
        header._fin = true;
        header._reserved = 0;
        header._opcode = CLOSE;
        //客户端需要加密
        header._mask_flag = true;
        WebSocketSplitter::encode(header, nullptr);
    }
baiyfcu committed
111
protected:
112
    //HttpClientImp override
baiyfcu committed
113

114 115 116 117 118 119 120 121 122 123 124 125 126
    /**
     * 收到http回复头
     * @param status 状态码,譬如:200 OK
     * @param headers http头
     * @return 返回后续content的长度;-1:后续数据全是content;>=0:固定长度content
     *          需要指出的是,在http头中带有Content-Length字段时,该返回值无效
     */
    int64_t onResponseHeader(const string &status,const HttpHeader &headers) override {
        if(status == "101"){
            auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(_Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
            if(Sec_WebSocket_Accept == const_cast<HttpHeader &>(headers)["Sec-WebSocket-Accept"]){
                //success
                onWebSocketException(SockException());
127 128
                //后续全是websocket负载数据
                return -1;
129 130 131 132 133 134 135 136
            }
            shutdown(SockException(Err_shutdown,StrPrinter << "Sec-WebSocket-Accept mismatch"));
            return 0;
        }

        shutdown(SockException(Err_shutdown,StrPrinter << "bad http status code:" << status));
        return 0;
    };
baiyfcu committed
137

138 139 140 141 142
    /**
     * 接收http回复完毕,
     */
    void onResponseCompleted() override {}

143 144 145 146 147 148 149 150 151 152
    /**
     * 接收websocket负载数据
     */
    void onResponseBody(const char *buf,int64_t size,int64_t recvedSize,int64_t totalSize) override{
        if(_onRecv){
            //完成websocket握手后,拦截websocket数据并解析
            _onRecv(buf, size);
        }
    };

153 154
    //TcpClient override

155 156 157
    /**
     * 定时触发
     */
158 159 160 161 162 163 164 165 166
    void onManager() override {
        if(_onRecv){
            //websocket连接成功了
            _delegate.onManager();
        } else{
            //websocket连接中...
            HttpClientImp::onManager();
        }
    }
167 168 169 170

    /**
     * 数据全部发送完毕后回调
     */
171 172 173 174 175 176 177 178 179
    void onFlush() override{
        if(_onRecv){
            //websocket连接成功了
            _delegate.onFlush();
        } else{
            //websocket连接中...
            HttpClientImp::onFlush();
        }
    }
baiyfcu committed
180

181 182 183 184 185 186 187 188 189 190 191 192 193
    /**
     * tcp连接结果
     * @param ex
     */
    void onConnect(const SockException &ex) override{
        if(ex){
            //tcp连接失败,直接返回失败
            onWebSocketException(ex);
            return;
        }
        //开始websocket握手
        HttpClientImp::onConnect(ex);
    }
baiyfcu committed
194

195
    /**
196 197 198
     * tcp连接断开
     * @param ex
     */
199 200 201 202
    void onErr(const SockException &ex) override{
        //tcp断开或者shutdown导致的断开
        onWebSocketException(ex);
    }
baiyfcu committed
203

204
    //WebSocketSplitter override
205

206
    /**
xiongziliang committed
207
     * 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePayload回调
208 209 210 211 212
     * @param header 数据包头
     */
    void onWebSocketDecodeHeader(const WebSocketHeader &header) override{
        _payload.clear();
    }
baiyfcu committed
213

214 215 216 217 218
    /**
     * 收到webSocket数据包负载
     * @param header 数据包包头
     * @param ptr 负载数据指针
     * @param len 负载数据长度
xiongziliang committed
219
     * @param recved 已接收数据长度(包含本次数据长度),等于header._payload_len时则接受完毕
220
     */
xiongziliang committed
221
    void onWebSocketDecodePayload(const WebSocketHeader &header, const uint8_t *ptr, uint64_t len, uint64_t recved) override{
222 223
        _payload.append((char *)ptr,len);
    }
baiyfcu committed
224 225


226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
    /**
     * 接收到完整的一个webSocket数据包后回调
     * @param header 数据包包头
     */
    void onWebSocketDecodeComplete(const WebSocketHeader &header_in) override{
        WebSocketHeader& header = const_cast<WebSocketHeader&>(header_in);
        auto  flag = header._mask_flag;
        //websocket客户端发送数据需要加密
        header._mask_flag = true;

        switch (header._opcode){
            case WebSocketHeader::CLOSE:{
                //服务器主动关闭
                WebSocketSplitter::encode(header,nullptr);
                shutdown(SockException(Err_eof,"websocket server close the connection"));
            }
                break;
            case WebSocketHeader::PING:{
                //心跳包
                header._opcode = WebSocketHeader::PONG;
                WebSocketSplitter::encode(header,std::make_shared<BufferString>(std::move(_payload)));
            }
                break;
            case WebSocketHeader::CONTINUATION:{
baiyfcu committed
250

251 252 253 254 255 256 257 258 259 260 261 262 263 264
            }
                break;
            case WebSocketHeader::TEXT:
            case WebSocketHeader::BINARY:{
                //接收完毕websocket数据包,触发onRecv事件
                _delegate.onRecv(std::make_shared<BufferString>(std::move(_payload)));
            }
                break;
            default:
                break;
        }
        _payload.clear();
        header._mask_flag = flag;
    }
baiyfcu committed
265

266 267 268 269 270 271 272 273
    /**
     * websocket数据编码回调
     * @param ptr 数据指针
     * @param len 数据指针长度
     */
    void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
        HttpClientImp::send(buffer);
    }
baiyfcu committed
274
private:
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
    void onWebSocketException(const SockException &ex){
        if(!ex){
            //websocket握手成功
            //此处截取TcpClient派生类发送的数据并进行websocket协议打包
            weak_ptr<HttpWsClient> weakSelf = dynamic_pointer_cast<HttpWsClient>(shared_from_this());
            _delegate.setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){
                auto strongSelf = weakSelf.lock();
                if(strongSelf){
                    WebSocketHeader header;
                    header._fin = true;
                    header._reserved = 0;
                    header._opcode = DataType;
                    //客户端需要加密
                    header._mask_flag = true;
                    strongSelf->WebSocketSplitter::encode(header,buf);
                }
                return buf->size();
            });
293 294 295

            //设置sock,否则shutdown等接口都无效
            _delegate.setSock(HttpClientImp::_sock);
296 297 298
            //触发连接成功事件
            _delegate.onConnect(ex);
            //拦截websocket数据接收
299
            _onRecv = [this](const char *data, int len){
300
                //解析websocket数据包
301
                this->WebSocketSplitter::decode((uint8_t *)data, len);
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
            };
            return;
        }

        //websocket握手失败或者tcp连接失败或者中途断开
        if(_onRecv){
            //握手成功之后的中途断开
            _onRecv = nullptr;
            _delegate.onErr(ex);
            return;
        }

        //websocket握手失败或者tcp连接失败
        _delegate.onConnect(ex);
    }
baiyfcu committed
317 318

private:
319
    string _Sec_WebSocket_Key;
320
    function<void(const char *data, int len)> _onRecv;
321 322
    ClientTypeImp<ClientType,DataType> &_delegate;
    string _payload;
baiyfcu committed
323 324
};

325 326 327 328 329 330 331 332 333

/**
 * Tcp客户端转WebSocket客户端模板,
 * 通过该模板,开发者再不修改TcpClient派生类任何代码的情况下快速实现WebSocket协议的包装
 * @tparam ClientType TcpClient派生类
 * @tparam DataType websocket负载类型,是TEXT还是BINARY类型
 * @tparam useWSS 是否使用ws还是wss连接
 */
template <typename ClientType,WebSocketHeader::Type DataType = WebSocketHeader::TEXT,bool useWSS = false >
334 335 336 337 338 339 340 341
class WebSocketClient : public ClientTypeImp<ClientType,DataType>{
public:
    typedef std::shared_ptr<WebSocketClient> Ptr;

    template<typename ...ArgsType>
    WebSocketClient(ArgsType &&...args) : ClientTypeImp<ClientType,DataType>(std::forward<ArgsType>(args)...){
        _wsClient.reset(new HttpWsClient<ClientType,DataType>(*this));
    }
342 343 344
    ~WebSocketClient() override {
        _wsClient->closeWsClient();
    }
345

346 347 348
    /**
     * 重载startConnect方法,
     * 目的是替换TcpClient的连接服务器行为,使之先完成WebSocket握手
349
     * @param host websocket服务器ip或域名
350 351 352
     * @param iPort websocket服务器端口
     * @param fTimeOutSec 超时时间
     */
353
    void startConnect(const string &host, uint16_t iPort, float fTimeOutSec = 3) override {
354
        string ws_url;
355 356
        if(useWSS){
            //加密的ws
357
            ws_url = StrPrinter << "wss://" + host << ":" << iPort << "/" ;
358
        }else{
359
            //明文ws
360
            ws_url = StrPrinter << "ws://" + host << ":" << iPort << "/" ;
361 362 363
        }
        _wsClient->startWsClient(ws_url,fTimeOutSec);
    }
364 365 366 367

    void startWebSocket(const string &ws_url,float fTimeOutSec = 3){
        _wsClient->startWsClient(ws_url,fTimeOutSec);
    }
368 369 370
private:
    typename HttpWsClient<ClientType,DataType>::Ptr _wsClient;
};
baiyfcu committed
371

372 373
}//namespace mediakit
#endif //ZLMEDIAKIT_WebSocketClient_H