WebSocketClient.h 12.1 KB
Newer Older
xiongziliang committed
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
 * MIT License
 *
 * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#ifndef ZLMEDIAKIT_WebSocketClient_H
#define ZLMEDIAKIT_WebSocketClient_H
baiyfcu committed
29 30 31

#include "Util/util.h"
#include "Util/base64.h"
32 33 34 35
#include "Util/SHA1.h"
#include "Network/TcpClient.h"
#include "HttpClientImp.h"
#include "WebSocketSplitter.h"
baiyfcu committed
36 37
using namespace toolkit;

38
namespace mediakit{
baiyfcu committed
39

40 41
template <typename ClientType,WebSocketHeader::Type DataType>
class HttpWsClient;
baiyfcu committed
42

43 44 45 46 47
/**
 * 辅助类,用于拦截TcpClient数据发送前的拦截
 * @tparam ClientType TcpClient派生类
 * @tparam DataType 这里无用,为了声明友元用
 */
48 49
template <typename ClientType,WebSocketHeader::Type DataType>
class ClientTypeImp : public ClientType {
baiyfcu committed
50
public:
51 52
    typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
    friend class HttpWsClient<ClientType,DataType>;
baiyfcu committed
53

54 55 56
    template<typename ...ArgsType>
    ClientTypeImp(ArgsType &&...args): ClientType(std::forward<ArgsType>(args)...){}
    ~ClientTypeImp() override {};
baiyfcu committed
57 58
protected:
    /**
59 60 61
     * 发送前拦截并打包为websocket协议
     * @param buf
     * @return
baiyfcu committed
62
     */
63 64 65 66 67 68
    int send(const Buffer::Ptr &buf) override{
        if(_beforeSendCB){
            return _beforeSendCB(buf);
        }
        return ClientType::send(buf);
    }
baiyfcu committed
69
    /**
70 71
     * 设置发送数据截取回调函数
     * @param cb 截取回调函数
baiyfcu committed
72
     */
73 74 75 76 77 78
    void setOnBeforeSendCB(const onBeforeSendCB &cb){
        _beforeSendCB = cb;
    }
private:
    onBeforeSendCB _beforeSendCB;
};
baiyfcu committed
79

80 81 82 83 84
/**
 * 此对象完成了weksocket 客户端握手协议,以及到TcpClient派生类事件的桥接
 * @tparam ClientType TcpClient派生类
 * @tparam DataType websocket负载类型,是TEXT还是BINARY类型
 */
85 86 87 88
template <typename ClientType,WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class HttpWsClient : public HttpClientImp , public WebSocketSplitter{
public:
    typedef shared_ptr<HttpWsClient> Ptr;
baiyfcu committed
89

90 91 92 93 94
    HttpWsClient(ClientTypeImp<ClientType,DataType> &delegate) : _delegate(delegate){
        _Sec_WebSocket_Key = encodeBase64(SHA1::encode_bin(makeRandStr(16, false)));
    }
    ~HttpWsClient(){}

95 96 97 98 99
    /**
     * 发起ws握手
     * @param ws_url ws连接url
     * @param fTimeOutSec 超时时间
     */
100 101 102 103 104 105 106 107 108 109 110 111
    void startWsClient(const string &ws_url,float fTimeOutSec){
        string http_url = ws_url;
        replace(http_url,"ws://","http://");
        replace(http_url,"wss://","https://");
        setMethod("GET");
        addHeader("Upgrade","websocket");
        addHeader("Connection","Upgrade");
        addHeader("Sec-WebSocket-Version","13");
        addHeader("Sec-WebSocket-Key",_Sec_WebSocket_Key);
        _onRecv = nullptr;
        sendRequest(http_url,fTimeOutSec);
    }
baiyfcu committed
112
protected:
113
    //HttpClientImp override
baiyfcu committed
114

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
    /**
     * 收到http回复头
     * @param status 状态码,譬如:200 OK
     * @param headers http头
     * @return 返回后续content的长度;-1:后续数据全是content;>=0:固定长度content
     *          需要指出的是,在http头中带有Content-Length字段时,该返回值无效
     */
    int64_t onResponseHeader(const string &status,const HttpHeader &headers) override {
        if(status == "101"){
            auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(_Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
            if(Sec_WebSocket_Accept == const_cast<HttpHeader &>(headers)["Sec-WebSocket-Accept"]){
                //success
                onWebSocketException(SockException());
                return 0;
            }
            shutdown(SockException(Err_shutdown,StrPrinter << "Sec-WebSocket-Accept mismatch"));
            return 0;
        }

        shutdown(SockException(Err_shutdown,StrPrinter << "bad http status code:" << status));
        return 0;
    };
baiyfcu committed
137

138 139 140 141 142 143 144
    /**
     * 接收http回复完毕,
     */
    void onResponseCompleted() override {}

    //TcpClient override

145 146 147
    /**
     * 定时触发
     */
148 149 150 151 152 153 154 155 156
    void onManager() override {
        if(_onRecv){
            //websocket连接成功了
            _delegate.onManager();
        } else{
            //websocket连接中...
            HttpClientImp::onManager();
        }
    }
157 158 159 160

    /**
     * 数据全部发送完毕后回调
     */
161 162 163 164 165 166 167 168 169
    void onFlush() override{
        if(_onRecv){
            //websocket连接成功了
            _delegate.onFlush();
        } else{
            //websocket连接中...
            HttpClientImp::onFlush();
        }
    }
baiyfcu committed
170

171 172 173 174 175 176 177 178 179 180 181 182 183
    /**
     * tcp连接结果
     * @param ex
     */
    void onConnect(const SockException &ex) override{
        if(ex){
            //tcp连接失败,直接返回失败
            onWebSocketException(ex);
            return;
        }
        //开始websocket握手
        HttpClientImp::onConnect(ex);
    }
baiyfcu committed
184

185 186 187 188 189 190
    /**
     * tcp收到数据
     * @param pBuf
     */
    void onRecv(const Buffer::Ptr &pBuf) override{
        if(_onRecv){
191
            //完成websocket握手后,拦截websocket数据并解析
192 193 194 195 196 197
            _onRecv(pBuf);
        }else{
            //websocket握手数据
            HttpClientImp::onRecv(pBuf);
        }
    }
baiyfcu committed
198

199 200 201 202
    /**
     * tcp连接断开
     * @param ex
     */
203 204 205 206
    void onErr(const SockException &ex) override{
        //tcp断开或者shutdown导致的断开
        onWebSocketException(ex);
    }
baiyfcu committed
207

208
    //WebSocketSplitter override
209

210 211 212 213 214 215 216
    /**
     * 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调
     * @param header 数据包头
     */
    void onWebSocketDecodeHeader(const WebSocketHeader &header) override{
        _payload.clear();
    }
baiyfcu committed
217

218 219 220 221 222 223 224 225 226 227
    /**
     * 收到webSocket数据包负载
     * @param header 数据包包头
     * @param ptr 负载数据指针
     * @param len 负载数据长度
     * @param recved 已接收数据长度(包含本次数据长度),等于header._playload_len时则接受完毕
     */
    void onWebSocketDecodePlayload(const WebSocketHeader &header, const uint8_t *ptr, uint64_t len, uint64_t recved) override{
        _payload.append((char *)ptr,len);
    }
baiyfcu committed
228 229


230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
    /**
     * 接收到完整的一个webSocket数据包后回调
     * @param header 数据包包头
     */
    void onWebSocketDecodeComplete(const WebSocketHeader &header_in) override{
        WebSocketHeader& header = const_cast<WebSocketHeader&>(header_in);
        auto  flag = header._mask_flag;
        //websocket客户端发送数据需要加密
        header._mask_flag = true;

        switch (header._opcode){
            case WebSocketHeader::CLOSE:{
                //服务器主动关闭
                WebSocketSplitter::encode(header,nullptr);
                shutdown(SockException(Err_eof,"websocket server close the connection"));
            }
                break;
            case WebSocketHeader::PING:{
                //心跳包
                header._opcode = WebSocketHeader::PONG;
                WebSocketSplitter::encode(header,std::make_shared<BufferString>(std::move(_payload)));
            }
                break;
            case WebSocketHeader::CONTINUATION:{
baiyfcu committed
254

255 256 257 258 259 260 261 262 263 264 265 266 267 268
            }
                break;
            case WebSocketHeader::TEXT:
            case WebSocketHeader::BINARY:{
                //接收完毕websocket数据包,触发onRecv事件
                _delegate.onRecv(std::make_shared<BufferString>(std::move(_payload)));
            }
                break;
            default:
                break;
        }
        _payload.clear();
        header._mask_flag = flag;
    }
baiyfcu committed
269

270 271 272 273 274 275 276 277
    /**
     * websocket数据编码回调
     * @param ptr 数据指针
     * @param len 数据指针长度
     */
    void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
        HttpClientImp::send(buffer);
    }
baiyfcu committed
278
private:
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
    void onWebSocketException(const SockException &ex){
        if(!ex){
            //websocket握手成功
            //此处截取TcpClient派生类发送的数据并进行websocket协议打包
            weak_ptr<HttpWsClient> weakSelf = dynamic_pointer_cast<HttpWsClient>(shared_from_this());
            _delegate.setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){
                auto strongSelf = weakSelf.lock();
                if(strongSelf){
                    WebSocketHeader header;
                    header._fin = true;
                    header._reserved = 0;
                    header._opcode = DataType;
                    //客户端需要加密
                    header._mask_flag = true;
                    strongSelf->WebSocketSplitter::encode(header,buf);
                }
                return buf->size();
            });
297 298 299

            //设置sock,否则shutdown等接口都无效
            _delegate.setSock(HttpClientImp::_sock);
300 301 302 303
            //触发连接成功事件
            _delegate.onConnect(ex);
            //拦截websocket数据接收
            _onRecv = [this](const Buffer::Ptr &pBuf){
304
                //解析websocket数据包
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
                WebSocketSplitter::decode((uint8_t*)pBuf->data(),pBuf->size());
            };
            return;
        }

        //websocket握手失败或者tcp连接失败或者中途断开
        if(_onRecv){
            //握手成功之后的中途断开
            _onRecv = nullptr;
            _delegate.onErr(ex);
            return;
        }

        //websocket握手失败或者tcp连接失败
        _delegate.onConnect(ex);
    }
baiyfcu committed
321 322

private:
323 324 325 326
    string _Sec_WebSocket_Key;
    function<void(const Buffer::Ptr &pBuf)> _onRecv;
    ClientTypeImp<ClientType,DataType> &_delegate;
    string _payload;
baiyfcu committed
327 328
};

329 330 331 332 333 334 335 336 337

/**
 * Tcp客户端转WebSocket客户端模板,
 * 通过该模板,开发者再不修改TcpClient派生类任何代码的情况下快速实现WebSocket协议的包装
 * @tparam ClientType TcpClient派生类
 * @tparam DataType websocket负载类型,是TEXT还是BINARY类型
 * @tparam useWSS 是否使用ws还是wss连接
 */
template <typename ClientType,WebSocketHeader::Type DataType = WebSocketHeader::TEXT,bool useWSS = false >
338 339 340 341 342 343 344 345 346 347
class WebSocketClient : public ClientTypeImp<ClientType,DataType>{
public:
    typedef std::shared_ptr<WebSocketClient> Ptr;

    template<typename ...ArgsType>
    WebSocketClient(ArgsType &&...args) : ClientTypeImp<ClientType,DataType>(std::forward<ArgsType>(args)...){
        _wsClient.reset(new HttpWsClient<ClientType,DataType>(*this));
    }
    ~WebSocketClient() override {}

348 349 350 351 352 353 354
    /**
     * 重载startConnect方法,
     * 目的是替换TcpClient的连接服务器行为,使之先完成WebSocket握手
     * @param strUrl websocket服务器ip或域名
     * @param iPort websocket服务器端口
     * @param fTimeOutSec 超时时间
     */
355 356
    void startConnect(const string &strUrl, uint16_t iPort, float fTimeOutSec = 3) override {
        string ws_url;
357 358
        if(useWSS){
            //加密的ws
359 360
            ws_url = StrPrinter << "wss://" + strUrl << ":" << iPort << "/" ;
        }else{
361
            //明文ws
362 363 364 365 366 367 368
            ws_url = StrPrinter << "ws://" + strUrl << ":" << iPort << "/" ;
        }
        _wsClient->startWsClient(ws_url,fTimeOutSec);
    }
private:
    typename HttpWsClient<ClientType,DataType>::Ptr _wsClient;
};
baiyfcu committed
369

370 371
}//namespace mediakit
#endif //ZLMEDIAKIT_WebSocketClient_H