HttpsSession.h 8.29 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * MIT License
xzl committed
3
 *
xiongziliang committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 * Copyright (c) 2016 xiongziliang <771730766@qq.com>
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
xzl committed
25 26 27 28 29 30 31 32
 */

#ifndef SRC_HTTP_HTTPSSESSION_H_
#define SRC_HTTP_HTTPSSESSION_H_

#include "HttpSession.h"
#include "Util/SSLBox.h"
#include "Util/TimeTicker.h"
xiongzilaing committed
33

xzl committed
34 35 36 37 38 39 40 41 42 43
using namespace ZL::Util;

namespace ZL {
namespace Http {

class HttpsSession: public HttpSession {
public:
	HttpsSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock):
		HttpSession(pTh,pSock){
		m_sslBox.setOnEncData([&](const char *data, uint32_t len){
xiongziliang committed
44
#if defined(__GNUC__) && (__GNUC__ < 5)
xzl committed
45
			public_send(data,len);
xiongziliang committed
46
#else//defined(__GNUC__) && (__GNUC__ < 5)
47
			HttpSession::send(obtainBuffer(data,len));
xiongziliang committed
48
#endif//defined(__GNUC__) && (__GNUC__ < 5)
xzl committed
49 50
		});
		m_sslBox.setOnDecData([&](const char *data, uint32_t len){
xiongziliang committed
51
#if defined(__GNUC__) && (__GNUC__ < 5)
xzl committed
52
			public_onRecv(data,len);
xiongziliang committed
53
#else//defined(__GNUC__) && (__GNUC__ < 5)
xzl committed
54
			HttpSession::onRecv(data,len);
xiongziliang committed
55
#endif//defined(__GNUC__) && (__GNUC__ < 5)
xzl committed
56 57 58 59 60
		});
	}
	virtual ~HttpsSession(){
		//m_sslBox.shutdown();
	}
61
	void onRecv(const Buffer::Ptr &pBuf) override{
xzl committed
62 63 64
		TimeTicker();
		m_sslBox.onRecv(pBuf->data(), pBuf->size());
	}
xiongziliang committed
65
#if defined(__GNUC__) && (__GNUC__ < 5)
xzl committed
66
	int public_send(const char *data, uint32_t len){
67
		return HttpSession::send(obtainBuffer(data,len));
xzl committed
68 69 70 71
	}
	void public_onRecv(const char *data, uint32_t len){
		HttpSession::onRecv(data,len);
	}
xiongziliang committed
72
#endif//defined(__GNUC__) && (__GNUC__ < 5)
73
protected:
74
	virtual int send(const Buffer::Ptr &buf) override{
xzl committed
75
		TimeTicker();
76 77
		m_sslBox.onSend(buf->data(), buf->size());
		return buf->size();
xzl committed
78 79 80 81
	}
	SSL_Box m_sslBox;
};

82 83 84 85 86 87 88 89 90 91 92 93 94 95 96


/**
* 通过该模板类可以透明化WebSocket协议,
* 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等
* @tparam SessionType 业务协议的TcpSession类
*/
template <class SessionType,class HttpSessionType = HttpSession>
class WebSocketSession : public HttpSessionType {
public:
	WebSocketSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) : HttpSessionType(pTh,pSock){}
	virtual ~WebSocketSession(){}

	//收到eof或其他导致脱离TcpServer事件的回调
	void onError(const SockException &err) override{
xiongziliang committed
97
		HttpSessionType::onError(err);
98 99 100 101 102 103 104 105
		if(_session){
			_session->onError(err);
		}
	}
	//每隔一段时间触发,用来做超时管理
	void onManager() override{
		if(_session){
			_session->onManager();
xiongziliang committed
106 107
		}else{
            HttpSessionType::onManager();
108 109 110 111
		}
	}

	void attachServer(const TcpServer &server) override{
xiongziliang committed
112
		HttpSessionType::attachServer(server);
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
		_weakServer = const_cast<TcpServer &>(server).shared_from_this();
	}
protected:
	/**
	 * 开始收到一个webSocket数据包
	 * @param packet
	 */
	void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{
		//新包,原来的包残余数据清空掉
		_remian_data.clear();

		if(_firstPacket){
			//这是个WebSocket会话而不是普通的Http会话
			_firstPacket = false;
			_session = std::make_shared<SessionImp>(HttpSessionType::getIdentifier(),nullptr,HttpSessionType::_sock);

			auto strongServer = _weakServer.lock();
			if(strongServer){
				_session->attachServer(*strongServer);
			}

			//此处截取数据并进行websocket协议打包
			weak_ptr<WebSocketSession> weakSelf = dynamic_pointer_cast<WebSocketSession>(HttpSessionType::shared_from_this());
			_session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){
				auto strongSelf = weakSelf.lock();
				if(strongSelf){
139 140 141 142 143 144
                    WebSocketHeader header;
                    header._fin = true;
                    header._reserved = 0;
                    header._opcode = WebSocketHeader::TEXT;
                    header._mask_flag = false;
                    strongSelf->WebSocketSplitter::encode(header,(uint8_t *)buf->data(),buf->size());
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
				}
				return buf->size();
			});
		}
	}

	/**
	 * 收到websocket数据包负载
	 * @param packet
	 * @param ptr
	 * @param len
	 * @param recved
	 */
	void onWebSocketDecodePlayload(const WebSocketHeader &packet,const uint8_t *ptr,uint64_t len,uint64_t recved) override {
		_remian_data.append((char *)ptr,len);
	}

	/**
	 * 接收到完整的一个webSocket数据包后回调
	 * @param header 数据包包头
	 */
166 167 168 169 170
	void onWebSocketDecodeComplete(const WebSocketHeader &header_in) override {
        WebSocketHeader& header = const_cast<WebSocketHeader&>(header_in);
        auto  flag = header._mask_flag;
		header._mask_flag = false;

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
		switch (header._opcode){
            case WebSocketHeader::CLOSE:{
                HttpSessionType::encode(header,nullptr,0);
			}
				break;
			case WebSocketHeader::PING:{
				const_cast<WebSocketHeader&>(header)._opcode = WebSocketHeader::PONG;
                HttpSessionType::encode(header,(uint8_t *)_remian_data.data(),_remian_data.size());
			}
				break;
			case WebSocketHeader::CONTINUATION:{

			}
				break;
			case WebSocketHeader::TEXT:
			case WebSocketHeader::BINARY:{
				BufferString::Ptr buffer = std::make_shared<BufferString>(_remian_data);
				_session->onRecv(buffer);
			}
				break;
			default:
				break;
		}
		_remian_data.clear();
195
		header._mask_flag = flag;
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
	}

	/**
	* 发送数据进行websocket协议打包后回调
	* @param ptr
	* @param len
	*/
	void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override{
        SocketHelper::send((char *)ptr,len);
	}
private:
	typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
	/**
	 * 该类实现了TcpSession派生类发送数据的截取
	 * 目的是发送业务数据前进行websocket协议的打包
	 */
	class SessionImp : public SessionType{
	public:
		SessionImp(const string &identifier,
				   const std::shared_ptr<ThreadPool> &pTh,
				   const Socket::Ptr &pSock) :
				_identifier(identifier),SessionType(pTh,pSock){}

		~SessionImp(){}

		/**
		 * 设置发送数据截取回调函数
		 * @param cb 截取回调函数
		 */
		void setOnBeforeSendCB(const onBeforeSendCB &cb){
			_beforeSendCB = cb;
		}
	protected:
		/**
		 * 重载send函数截取数据
		 * @param buf 需要截取的数据
		 * @return 数据字节数
		 */
		int send(const Buffer::Ptr &buf) override {
			if(_beforeSendCB){
				return _beforeSendCB(buf);
			}
			return SessionType::send(buf);
		}
		string getIdentifier() const override{
			return _identifier;
		}
	private:
		onBeforeSendCB _beforeSendCB;
		string _identifier;
	};
private:
	bool _firstPacket = true;
	string _remian_data;
	weak_ptr<TcpServer> _weakServer;
	std::shared_ptr<SessionImp> _session;
};

/**
* 回显会话
*/
class EchoSession : public TcpSession {
public:
	EchoSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
			TcpSession(pTh,pSock){
		DebugL;
	}
	virtual ~EchoSession(){
		DebugL;
	}

	void attachServer(const TcpServer &server) override{
		DebugL << getIdentifier() << " " << TcpSession::getIdentifier();
	}
	void onRecv(const Buffer::Ptr &buffer) override {
		send(buffer);
	}
	void onError(const SockException &err) override{
		WarnL << err.what();
	}
	//每隔一段时间触发,用来做超时管理
	void onManager() override{
		DebugL;
	}
};


xiongziliang committed
283 284 285
typedef WebSocketSession<EchoSession,HttpSession> EchoWebSocketSession;
typedef WebSocketSession<EchoSession,HttpsSession> SSLEchoWebSocketSession;

286

xzl committed
287 288 289 290
} /* namespace Http */
} /* namespace ZL */

#endif /* SRC_HTTP_HTTPSSESSION_H_ */