WebSocketClient.h 12.8 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11 12
 */

#ifndef ZLMEDIAKIT_WebSocketClient_H
#define ZLMEDIAKIT_WebSocketClient_H
baiyfcu committed
13 14 15

#include "Util/util.h"
#include "Util/base64.h"
16 17 18 19
#include "Util/SHA1.h"
#include "Network/TcpClient.h"
#include "HttpClientImp.h"
#include "WebSocketSplitter.h"
baiyfcu committed
20 21
using namespace toolkit;

22
namespace mediakit{
baiyfcu committed
23

24 25
template <typename ClientType,WebSocketHeader::Type DataType>
class HttpWsClient;
baiyfcu committed
26

27 28 29 30 31
/**
 * 辅助类,用于拦截TcpClient数据发送前的拦截
 * @tparam ClientType TcpClient派生类
 * @tparam DataType 这里无用,为了声明友元用
 */
32 33
template <typename ClientType,WebSocketHeader::Type DataType>
class ClientTypeImp : public ClientType {
baiyfcu committed
34
public:
35 36
    typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
    friend class HttpWsClient<ClientType,DataType>;
baiyfcu committed
37

38 39 40
    template<typename ...ArgsType>
    ClientTypeImp(ArgsType &&...args): ClientType(std::forward<ArgsType>(args)...){}
    ~ClientTypeImp() override {};
41

baiyfcu committed
42 43
protected:
    /**
44
     * 发送前拦截并打包为websocket协议
baiyfcu committed
45
     */
46 47 48 49 50 51
    int send(const Buffer::Ptr &buf) override{
        if(_beforeSendCB){
            return _beforeSendCB(buf);
        }
        return ClientType::send(buf);
    }
52

baiyfcu committed
53
    /**
54 55
     * 设置发送数据截取回调函数
     * @param cb 截取回调函数
baiyfcu committed
56
     */
57 58 59
    void setOnBeforeSendCB(const onBeforeSendCB &cb){
        _beforeSendCB = cb;
    }
60

61 62 63
private:
    onBeforeSendCB _beforeSendCB;
};
baiyfcu committed
64

65 66 67 68 69
/**
 * 此对象完成了weksocket 客户端握手协议,以及到TcpClient派生类事件的桥接
 * @tparam ClientType TcpClient派生类
 * @tparam DataType websocket负载类型,是TEXT还是BINARY类型
 */
70 71 72 73
template <typename ClientType,WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class HttpWsClient : public HttpClientImp , public WebSocketSplitter{
public:
    typedef shared_ptr<HttpWsClient> Ptr;
baiyfcu committed
74

75 76
    HttpWsClient(ClientTypeImp<ClientType,DataType> &delegate) : _delegate(delegate){
        _Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(makeRandStr(16, false)));
77
        setPoller(delegate.getPoller());
78 79 80
    }
    ~HttpWsClient(){}

81 82 83 84 85
    /**
     * 发起ws握手
     * @param ws_url ws连接url
     * @param fTimeOutSec 超时时间
     */
86 87 88 89 90 91 92 93 94 95 96 97
    void startWsClient(const string &ws_url,float fTimeOutSec){
        string http_url = ws_url;
        replace(http_url,"ws://","http://");
        replace(http_url,"wss://","https://");
        setMethod("GET");
        addHeader("Upgrade","websocket");
        addHeader("Connection","Upgrade");
        addHeader("Sec-WebSocket-Version","13");
        addHeader("Sec-WebSocket-Key",_Sec_WebSocket_Key);
        _onRecv = nullptr;
        sendRequest(http_url,fTimeOutSec);
    }
98 99 100 101 102 103 104 105 106 107 108 109 110 111

    void closeWsClient(){
        if(!_onRecv){
            //未连接
            return;
        }
        WebSocketHeader header;
        header._fin = true;
        header._reserved = 0;
        header._opcode = CLOSE;
        //客户端需要加密
        header._mask_flag = true;
        WebSocketSplitter::encode(header, nullptr);
    }
112

baiyfcu committed
113
protected:
114
    //HttpClientImp override
baiyfcu committed
115

116 117 118 119 120 121 122 123 124 125 126 127 128
    /**
     * 收到http回复头
     * @param status 状态码,譬如:200 OK
     * @param headers http头
     * @return 返回后续content的长度;-1:后续数据全是content;>=0:固定长度content
     *          需要指出的是,在http头中带有Content-Length字段时,该返回值无效
     */
    int64_t onResponseHeader(const string &status,const HttpHeader &headers) override {
        if(status == "101"){
            auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(_Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
            if(Sec_WebSocket_Accept == const_cast<HttpHeader &>(headers)["Sec-WebSocket-Accept"]){
                //success
                onWebSocketException(SockException());
129 130
                //防止ws服务器返回Content-Length
                const_cast<HttpHeader &>(headers).erase("Content-Length");
131 132
                //后续全是websocket负载数据
                return -1;
133 134 135 136 137 138 139 140
            }
            shutdown(SockException(Err_shutdown,StrPrinter << "Sec-WebSocket-Accept mismatch"));
            return 0;
        }

        shutdown(SockException(Err_shutdown,StrPrinter << "bad http status code:" << status));
        return 0;
    };
baiyfcu committed
141

142 143 144 145 146
    /**
     * 接收http回复完毕,
     */
    void onResponseCompleted() override {}

147 148 149 150 151 152 153 154 155 156
    /**
     * 接收websocket负载数据
     */
    void onResponseBody(const char *buf,int64_t size,int64_t recvedSize,int64_t totalSize) override{
        if(_onRecv){
            //完成websocket握手后,拦截websocket数据并解析
            _onRecv(buf, size);
        }
    };

157 158
    //TcpClient override

159 160 161
    /**
     * 定时触发
     */
162 163 164 165 166 167 168 169 170
    void onManager() override {
        if(_onRecv){
            //websocket连接成功了
            _delegate.onManager();
        } else{
            //websocket连接中...
            HttpClientImp::onManager();
        }
    }
171 172 173 174

    /**
     * 数据全部发送完毕后回调
     */
175 176 177 178 179 180 181 182 183
    void onFlush() override{
        if(_onRecv){
            //websocket连接成功了
            _delegate.onFlush();
        } else{
            //websocket连接中...
            HttpClientImp::onFlush();
        }
    }
baiyfcu committed
184

185 186 187 188 189 190 191 192 193 194 195 196
    /**
     * tcp连接结果
     */
    void onConnect(const SockException &ex) override{
        if(ex){
            //tcp连接失败,直接返回失败
            onWebSocketException(ex);
            return;
        }
        //开始websocket握手
        HttpClientImp::onConnect(ex);
    }
baiyfcu committed
197

198
    /**
199 200
     * tcp连接断开
     */
201 202 203 204
    void onErr(const SockException &ex) override{
        //tcp断开或者shutdown导致的断开
        onWebSocketException(ex);
    }
baiyfcu committed
205

206
    //WebSocketSplitter override
207

208
    /**
xiongziliang committed
209
     * 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePayload回调
210 211 212
     * @param header 数据包头
     */
    void onWebSocketDecodeHeader(const WebSocketHeader &header) override{
213
        _payload_section.clear();
214
    }
baiyfcu committed
215

216 217 218 219 220
    /**
     * 收到webSocket数据包负载
     * @param header 数据包包头
     * @param ptr 负载数据指针
     * @param len 负载数据长度
xiongziliang committed
221
     * @param recved 已接收数据长度(包含本次数据长度),等于header._payload_len时则接受完毕
222
     */
xiongziliang committed
223
    void onWebSocketDecodePayload(const WebSocketHeader &header, const uint8_t *ptr, uint64_t len, uint64_t recved) override{
224
        _payload_section.append((char *)ptr,len);
225
    }
baiyfcu committed
226

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
    /**
     * 接收到完整的一个webSocket数据包后回调
     * @param header 数据包包头
     */
    void onWebSocketDecodeComplete(const WebSocketHeader &header_in) override{
        WebSocketHeader& header = const_cast<WebSocketHeader&>(header_in);
        auto  flag = header._mask_flag;
        //websocket客户端发送数据需要加密
        header._mask_flag = true;

        switch (header._opcode){
            case WebSocketHeader::CLOSE:{
                //服务器主动关闭
                WebSocketSplitter::encode(header,nullptr);
                shutdown(SockException(Err_eof,"websocket server close the connection"));
                break;
243 244
            }

245 246 247
            case WebSocketHeader::PING:{
                //心跳包
                header._opcode = WebSocketHeader::PONG;
248
                WebSocketSplitter::encode(header,std::make_shared<BufferString>(std::move(_payload_section)));
249 250
                break;
            }
251 252

            case WebSocketHeader::CONTINUATION:
253 254
            case WebSocketHeader::TEXT:
            case WebSocketHeader::BINARY:{
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
                if (!header._fin) {
                    //还有后续分片数据, 我们先缓存数据,所有分片收集完成才一次性输出
                    _payload_cache.append(std::move(_payload_section));
                    if (_payload_cache.size() < MAX_WS_PACKET) {
                        //还有内存容量缓存分片数据
                        break;
                    }
                    //分片缓存太大,需要清空
                }

                //最后一个包
                if (_payload_cache.empty()) {
                    //这个包是唯一个分片
                    _delegate.onRecv(std::make_shared<WebSocketBuffer>(header._opcode, header._fin, std::move(_payload_section)));
                    break;
                }

                //这个包由多个分片组成
                _payload_cache.append(std::move(_payload_section));
                _delegate.onRecv(std::make_shared<WebSocketBuffer>(header._opcode, header._fin, std::move(_payload_cache)));
                _payload_cache.clear();
276
                break;
277 278 279
            }

            default: break;
280
        }
281
        _payload_section.clear();
282 283
        header._mask_flag = flag;
    }
baiyfcu committed
284

285 286 287 288 289 290 291 292
    /**
     * websocket数据编码回调
     * @param ptr 数据指针
     * @param len 数据指针长度
     */
    void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
        HttpClientImp::send(buffer);
    }
293

baiyfcu committed
294
private:
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
    void onWebSocketException(const SockException &ex){
        if(!ex){
            //websocket握手成功
            //此处截取TcpClient派生类发送的数据并进行websocket协议打包
            weak_ptr<HttpWsClient> weakSelf = dynamic_pointer_cast<HttpWsClient>(shared_from_this());
            _delegate.setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){
                auto strongSelf = weakSelf.lock();
                if(strongSelf){
                    WebSocketHeader header;
                    header._fin = true;
                    header._reserved = 0;
                    header._opcode = DataType;
                    //客户端需要加密
                    header._mask_flag = true;
                    strongSelf->WebSocketSplitter::encode(header,buf);
                }
                return buf->size();
            });
313 314

            //设置sock,否则shutdown等接口都无效
315
            _delegate.setSock(HttpClientImp::getSock());
316 317 318
            //触发连接成功事件
            _delegate.onConnect(ex);
            //拦截websocket数据接收
319
            _onRecv = [this](const char *data, int len){
320
                //解析websocket数据包
321
                this->WebSocketSplitter::decode((uint8_t *)data, len);
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
            };
            return;
        }

        //websocket握手失败或者tcp连接失败或者中途断开
        if(_onRecv){
            //握手成功之后的中途断开
            _onRecv = nullptr;
            _delegate.onErr(ex);
            return;
        }

        //websocket握手失败或者tcp连接失败
        _delegate.onConnect(ex);
    }
baiyfcu committed
337 338

private:
339
    string _Sec_WebSocket_Key;
340
    function<void(const char *data, int len)> _onRecv;
341
    ClientTypeImp<ClientType,DataType> &_delegate;
342 343
    string _payload_section;
    string _payload_cache;
baiyfcu committed
344 345
};

346 347 348 349 350 351 352 353
/**
 * Tcp客户端转WebSocket客户端模板,
 * 通过该模板,开发者再不修改TcpClient派生类任何代码的情况下快速实现WebSocket协议的包装
 * @tparam ClientType TcpClient派生类
 * @tparam DataType websocket负载类型,是TEXT还是BINARY类型
 * @tparam useWSS 是否使用ws还是wss连接
 */
template <typename ClientType,WebSocketHeader::Type DataType = WebSocketHeader::TEXT,bool useWSS = false >
354 355 356 357 358 359 360 361
class WebSocketClient : public ClientTypeImp<ClientType,DataType>{
public:
    typedef std::shared_ptr<WebSocketClient> Ptr;

    template<typename ...ArgsType>
    WebSocketClient(ArgsType &&...args) : ClientTypeImp<ClientType,DataType>(std::forward<ArgsType>(args)...){
        _wsClient.reset(new HttpWsClient<ClientType,DataType>(*this));
    }
362 363 364
    ~WebSocketClient() override {
        _wsClient->closeWsClient();
    }
365

366 367 368
    /**
     * 重载startConnect方法,
     * 目的是替换TcpClient的连接服务器行为,使之先完成WebSocket握手
369
     * @param host websocket服务器ip或域名
370 371 372
     * @param iPort websocket服务器端口
     * @param fTimeOutSec 超时时间
     */
373
    void startConnect(const string &host, uint16_t iPort, float fTimeOutSec = 3) override {
374
        string ws_url;
375 376
        if(useWSS){
            //加密的ws
377
            ws_url = StrPrinter << "wss://" + host << ":" << iPort << "/" ;
378
        }else{
379
            //明文ws
380
            ws_url = StrPrinter << "ws://" + host << ":" << iPort << "/" ;
381 382 383
        }
        _wsClient->startWsClient(ws_url,fTimeOutSec);
    }
384 385 386 387

    void startWebSocket(const string &ws_url,float fTimeOutSec = 3){
        _wsClient->startWsClient(ws_url,fTimeOutSec);
    }
388

389 390 391
private:
    typename HttpWsClient<ClientType,DataType>::Ptr _wsClient;
};
baiyfcu committed
392

393 394
}//namespace mediakit
#endif //ZLMEDIAKIT_WebSocketClient_H