Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
644905a3
Commit
644905a3
authored
5 years ago
by
xiongziliang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
实现websocket客户端模板
parent
30950757
隐藏空白字符变更
内嵌
并排
正在显示
4 个修改的文件
包含
377 行增加
和
460 行删除
+377
-460
3rdpart/ZLToolKit
+1
-1
src/Http/WebSocketClient.cpp
+0
-288
src/Http/WebSocketClient.h
+292
-171
tests/test_wsClient.cpp
+84
-0
没有找到文件。
ZLToolKit
@
91246bb0
Subproject commit
0b406073125080ab8edd13ee7c14e573e54baa3
5
Subproject commit
91246bb01475c7336040a4b7ec35d0584887f36
5
This diff is collapsed.
Click to expand it.
src/Http/WebSocketClient.cpp
deleted
100644 → 0
查看文件 @
30950757
#include "WebSocketClient.h"
int
mediakit
::
WebSocketClient
::
send
(
const
string
&
buf
)
{
if
(
_sock
)
{
if
(
_WSClientStatus
==
WORKING
)
{
_session
->
send
(
buf
);
return
0
;
}
else
{
return
-
1
;
}
}
}
void
mediakit
::
WebSocketClient
::
clear
()
{
_method
.
clear
();
_path
.
clear
();
_parser
.
Clear
();
_recvedBodySize
=
0
;
_totalBodySize
=
0
;
_aliveTicker
.
resetTime
();
_chunkedSplitter
.
reset
();
HttpRequestSplitter
::
reset
();
}
const
std
::
string
&
mediakit
::
WebSocketClient
::
responseStatus
()
const
{
return
_parser
.
Url
();
}
const
mediakit
::
WebSocketClient
::
HttpHeader
&
mediakit
::
WebSocketClient
::
responseHeader
()
const
{
return
_parser
.
getValues
();
}
const
mediakit
::
Parser
&
mediakit
::
WebSocketClient
::
response
()
const
{
return
_parser
;
}
const
std
::
string
&
mediakit
::
WebSocketClient
::
getUrl
()
const
{
return
_url
;
}
int64_t
mediakit
::
WebSocketClient
::
onResponseHeader
(
const
string
&
status
,
const
HttpHeader
&
headers
)
{
DebugL
<<
status
;
//无Content-Length字段时默认后面全是content
return
-
1
;
}
void
mediakit
::
WebSocketClient
::
onResponseBody
(
const
char
*
buf
,
int64_t
size
,
int64_t
recvedSize
,
int64_t
totalSize
)
{
DebugL
<<
size
<<
" "
<<
recvedSize
<<
" "
<<
totalSize
;
}
void
mediakit
::
WebSocketClient
::
onResponseCompleted
()
{
DebugL
;
}
int64_t
mediakit
::
WebSocketClient
::
onRecvHeader
(
const
char
*
data
,
uint64_t
len
)
{
_parser
.
Parse
(
data
);
if
(
_parser
.
Url
()
==
"101"
)
{
switch
(
_WSClientStatus
)
{
case
HANDSHAKING
:
{
StrCaseMap
&
valueMap
=
_parser
.
getValues
();
auto
key
=
valueMap
.
find
(
"Sec-WebSocket-Accept"
);
if
(
key
!=
valueMap
.
end
()
&&
key
->
second
.
length
()
>
0
)
{
onConnect
(
SockException
());
}
break
;
}
}
return
-
1
;
}
else
{
shutdown
(
SockException
(
Err_shutdown
,
_parser
.
Url
().
c_str
()));
return
0
;
}
return
-
1
;
}
void
mediakit
::
WebSocketClient
::
onRecvContent
(
const
char
*
data
,
uint64_t
len
)
{
if
(
_chunkedSplitter
)
{
_chunkedSplitter
->
input
(
data
,
len
);
return
;
}
auto
recvedBodySize
=
_recvedBodySize
+
len
;
if
(
_totalBodySize
<
0
)
{
//不限长度的content,最大支持INT64_MAX个字节
onResponseBody
(
data
,
len
,
recvedBodySize
,
INT64_MAX
);
_recvedBodySize
=
recvedBodySize
;
return
;
}
//固定长度的content
if
(
recvedBodySize
<
_totalBodySize
)
{
//content还未接收完毕
onResponseBody
(
data
,
len
,
recvedBodySize
,
_totalBodySize
);
_recvedBodySize
=
recvedBodySize
;
return
;
}
//content接收完毕
onResponseBody
(
data
,
_totalBodySize
-
_recvedBodySize
,
_totalBodySize
,
_totalBodySize
);
bool
biggerThanExpected
=
recvedBodySize
>
_totalBodySize
;
onResponseCompleted_l
();
if
(
biggerThanExpected
)
{
//声明的content数据比真实的小,那么我们只截取前面部分的并断开链接
shutdown
(
SockException
(
Err_shutdown
,
"http response content size bigger than expected"
));
}
}
void
mediakit
::
WebSocketClient
::
onConnect
(
const
SockException
&
ex
)
{
_aliveTicker
.
resetTime
();
if
(
ex
)
{
onDisconnect
(
ex
);
return
;
}
//先假设http客户端只会接收一点点数据(只接受http头,节省内存)
_sock
->
setReadBuffer
(
std
::
make_shared
<
BufferRaw
>
(
1
*
1024
));
_totalBodySize
=
0
;
_recvedBodySize
=
0
;
HttpRequestSplitter
::
reset
();
_chunkedSplitter
.
reset
();
if
(
_WSClientStatus
==
WSCONNECT
)
{
//Websocket握手
string
random
=
get_random
(
16
);
auto
Sec_WebSocket_Key
=
encodeBase64
(
SHA1
::
encode_bin
(
random
));
_key
=
Sec_WebSocket_Key
;
string
p
=
generate_websocket_client_handshake
(
_ip
.
c_str
(),
_port
,
_url
.
c_str
(),
_key
.
c_str
());
TcpClient
::
send
(
p
);
_WSClientStatus
=
HANDSHAKING
;
}
else
if
(
_WSClientStatus
==
HANDSHAKING
)
{
_WSClientStatus
=
WORKING
;
}
onFlush
();
}
void
mediakit
::
WebSocketClient
::
onRecv
(
const
Buffer
::
Ptr
&
pBuf
)
{
_aliveTicker
.
resetTime
();
if
(
_WSClientStatus
==
HANDSHAKING
||
_WSClientStatus
==
WSCONNECT
)
HttpRequestSplitter
::
input
(
pBuf
->
data
(),
pBuf
->
size
());
else
if
(
_WSClientStatus
==
WORKING
)
{
WebSocketSplitter
::
decode
((
uint8_t
*
)
pBuf
->
data
(),
pBuf
->
size
());
}
}
void
mediakit
::
WebSocketClient
::
onErr
(
const
SockException
&
ex
)
{
_session
->
onError
(
ex
);
onDisconnect
(
ex
);
}
void
mediakit
::
WebSocketClient
::
onManager
()
{
if
(
_WSClientStatus
!=
WORKING
)
{
if
(
_fTimeOutSec
>
0
&&
_aliveTicker
.
elapsedTime
()
>
_fTimeOutSec
*
1000
)
{
//超时
shutdown
(
SockException
(
Err_timeout
,
"ws server respone timeout"
));
}
}
else
_session
->
onManager
();
}
std
::
string
mediakit
::
WebSocketClient
::
generate_websocket_client_handshake
(
const
char
*
ip
,
uint16_t
port
,
const
char
*
path
,
const
char
*
key
)
{
/**
* @brief 业务数据被分片的单片最大大小, 等于 65535 - 14 - 1
*/
#define DATA_FRAME_MAX_LEN 65520
#define HANDSHAKE_SIZE 1024
char
buf
[
HANDSHAKE_SIZE
]
=
{
0
};
snprintf
(
buf
,
HANDSHAKE_SIZE
,
"GET %s HTTP/1.1
\r\n
"
"Host: %s:%d
\r\n
"
"Upgrade: websocket
\r\n
"
"Connection: Upgrade
\r\n
"
"Sec-WebSocket-Key: %s
\r\n
"
"Sec-WebSocket-Version: 13
\r\n
"
"
\r\n
"
,
path
,
ip
,
port
,
key
);
string
temBuf
(
buf
);
return
temBuf
;
}
std
::
string
mediakit
::
WebSocketClient
::
get_random
(
size_t
n
)
{
random_device
rd
;
_StrPrinter
printer
;
for
(
int
i
=
0
;
i
<
n
;
i
++
)
{
unsigned
int
rnd
=
rd
();
printer
<<
rnd
%
9
;
}
return
string
(
printer
);
}
void
mediakit
::
WebSocketClient
::
onWebSocketDecodeHeader
(
const
WebSocketHeader
&
packet
)
{
//新包,原来的包残余数据清空掉
_remian_data
.
clear
();
if
(
_firstPacket
)
{
//这是个WebSocket会话而不是普通的Http会话
_firstPacket
=
false
;
//此处截取数据并进行websocket协议打包
}
}
void
mediakit
::
WebSocketClient
::
onWebSocketDecodePlayload
(
const
WebSocketHeader
&
packet
,
const
uint8_t
*
ptr
,
uint64_t
len
,
uint64_t
recved
)
{
_remian_data
.
append
((
char
*
)
ptr
,
len
);
}
void
mediakit
::
WebSocketClient
::
onWebSocketDecodeComplete
(
const
WebSocketHeader
&
header_in
)
{
WebSocketHeader
&
header
=
const_cast
<
WebSocketHeader
&>
(
header_in
);
auto
flag
=
header
.
_mask_flag
;
header
.
_mask_flag
=
false
;
switch
(
header
.
_opcode
)
{
case
WebSocketHeader
:
:
CLOSE
:
{
shutdown
(
SockException
(
Err_timeout
,
"session timeouted"
));
}
break
;
case
WebSocketHeader
:
:
PING
:
{
const_cast
<
WebSocketHeader
&>
(
header
).
_opcode
=
WebSocketHeader
::
PONG
;
WebSocketSplitter
::
encode
(
header
,
(
uint8_t
*
)
_remian_data
.
data
(),
_remian_data
.
size
());
}
break
;
case
WebSocketHeader
:
:
CONTINUATION
:
{
}
break
;
case
WebSocketHeader
:
:
TEXT
:
case
WebSocketHeader
:
:
BINARY
:
{
BufferString
::
Ptr
buffer
=
std
::
make_shared
<
BufferString
>
(
_remian_data
);
_session
->
onRecv
(
buffer
);
}
break
;
default
:
break
;
}
_remian_data
.
clear
();
header
.
_mask_flag
=
flag
;
}
void
mediakit
::
WebSocketClient
::
onWebSocketEncodeData
(
const
uint8_t
*
ptr
,
uint64_t
len
)
{
TcpClient
::
send
(
string
((
char
*
)
ptr
,
len
));
}
void
mediakit
::
WebSocketClient
::
onResponseCompleted_l
()
{
_totalBodySize
=
0
;
_recvedBodySize
=
0
;
onResponseCompleted
();
}
This diff is collapsed.
Click to expand it.
src/Http/WebSocketClient.h
查看文件 @
644905a3
#
ifndef
Http_WebSocketClient_h
/*
#define Http_WebSocketClient_h
* MIT License
*
* Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef ZLMEDIAKIT_WebSocketClient_H
#define ZLMEDIAKIT_WebSocketClient_H
#include <stdio.h>
#include <string.h>
#include <functional>
#include <memory>
#include "Util/util.h"
#include "Util/util.h"
#include "Util/mini.h"
#include "Network/TcpClient.h"
#include "Common/Parser.h"
#include "HttpRequestSplitter.h"
#include "HttpCookie.h"
#include "HttpChunkedSplitter.h"
#include "strCoding.h"
#include "Http/HttpClient.h"
#include "Http/WebSocketSplitter.h"
#include "Http/WebSocketSession.h"
#include <cstdlib>
#include <random>
#include "Common/config.h"
#include "Util/SHA1.h"
#include "Util/base64.h"
#include "Util/base64.h"
#include "Util/SHA1.h"
#include "Network/TcpClient.h"
#include "HttpClientImp.h"
using
namespace
std
;
#include "WebSocketSplitter.h"
using
namespace
toolkit
;
using
namespace
toolkit
;
namespace
mediakit
{
namespace
mediakit
{
/**
template
<
typename
ClientType
,
WebSocketHeader
::
Type
DataType
>
* @brief 客户端的状态
class
HttpWsClient
;
*/
typedef
enum
WSClientStatus
{
WSCONNECT
,
HANDSHAKING
,
///握手中
WORKING
,
///工作中
}
WSClientStatus
;
class
WebSocketClient
:
public
TcpClient
,
public
HttpRequestSplitter
,
public
WebSocketSplitter
template
<
typename
ClientType
,
WebSocketHeader
::
Type
DataType
>
{
class
ClientTypeImp
:
public
ClientType
{
public
:
public
:
typedef
StrCaseMap
HttpHeader
;
typedef
function
<
int
(
const
Buffer
::
Ptr
&
buf
)
>
onBeforeSendCB
;
typedef
std
::
shared_ptr
<
WebSocketClient
>
Ptr
;
friend
class
HttpWsClient
<
ClientType
,
DataType
>
;
WebSocketClient
()
:
_WSClientStatus
(
WSCONNECT
)
{}
virtual
~
WebSocketClient
()
{}
template
<
typename
SessionType
>
void
startConnect
(
const
string
&
strUrl
,
uint16_t
iPort
,
float
fTimeOutSec
=
3
)
{
_ip
=
strUrl
;
_port
=
iPort
;
TcpClient
::
startConnect
(
strUrl
,
iPort
,
fTimeOutSec
);
typedef
function
<
int
(
const
Buffer
::
Ptr
&
buf
)
>
onBeforeSendCB
;
/**
* 该类实现了TcpSession派生类发送数据的截取
* 目的是发送业务数据前进行websocket协议的打包
*/
class
SessionImp
:
public
SessionType
{
public
:
SessionImp
(
const
Socket
::
Ptr
&
pSock
)
:
SessionType
(
pSock
)
{}
~
SessionImp
()
{}
/**
* 设置发送数据截取回调函数
* @param cb 截取回调函数
*/
void
setOnBeforeSendCB
(
const
onBeforeSendCB
&
cb
)
{
_beforeSendCB
=
cb
;
}
protected
:
/**
* 重载send函数截取数据
* @param buf 需要截取的数据
* @return 数据字节数
*/
int
send
(
const
Buffer
::
Ptr
&
buf
)
override
{
if
(
_beforeSendCB
)
{
return
_beforeSendCB
(
buf
);
}
return
SessionType
::
send
(
buf
);
}
private
:
onBeforeSendCB
_beforeSendCB
;
};
std
::
shared_ptr
<
SessionImp
>
temSession
=
std
::
make_shared
<
SessionImp
>
(
_sock
);
//此处截取数据并进行websocket协议打包
weak_ptr
<
WebSocketClient
>
weakSelf
=
dynamic_pointer_cast
<
WebSocketClient
>
(
WebSocketClient
::
shared_from_this
());
_sock
->
setOnErr
([
weakSelf
](
const
SockException
&
ex
)
{
auto
strongSelf
=
weakSelf
.
lock
();
if
(
!
strongSelf
)
{
return
;
}
strongSelf
->
onErr
(
ex
);
});
temSession
->
setOnBeforeSendCB
([
weakSelf
](
const
Buffer
::
Ptr
&
buf
)
{
auto
strongSelf
=
weakSelf
.
lock
();
if
(
strongSelf
)
{
WebSocketHeader
header
;
header
.
_fin
=
true
;
header
.
_reserved
=
0
;
header
.
_opcode
=
WebSocketHeader
::
BINARY
;
header
.
_mask_flag
=
false
;
strongSelf
->
WebSocketSplitter
::
encode
(
header
,
(
uint8_t
*
)
buf
->
data
(),
buf
->
size
());
}
return
buf
->
size
();
});
_session
=
temSession
;
_session
->
onManager
();
}
virtual
int
send
(
const
string
&
buf
);
virtual
void
clear
();
const
string
&
responseStatus
()
const
;
const
HttpHeader
&
responseHeader
()
const
;
const
Parser
&
response
()
const
;
const
string
&
getUrl
()
const
;
template
<
typename
...
ArgsType
>
ClientTypeImp
(
ArgsType
&&
...
args
)
:
ClientType
(
std
::
forward
<
ArgsType
>
(
args
)...){}
~
ClientTypeImp
()
override
{};
protected
:
protected
:
virtual
int64_t
onResponseHeader
(
const
string
&
status
,
const
HttpHeader
&
headers
);;
virtual
void
onResponseBody
(
const
char
*
buf
,
int64_t
size
,
int64_t
recvedSize
,
int64_t
totalSize
);;
/**
/**
* 接收http回复完毕,
* 发送前拦截并打包为websocket协议
* @param buf
* @return
*/
*/
virtual
void
onResponseCompleted
();
int
send
(
const
Buffer
::
Ptr
&
buf
)
override
{
if
(
_beforeSendCB
){
return
_beforeSendCB
(
buf
);
}
return
ClientType
::
send
(
buf
);
}
/**
/**
*
http链接断开回调
*
设置发送数据截取回调函数
* @param
ex 断开原因
* @param
cb 截取回调函数
*/
*/
virtual
void
onDisconnect
(
const
SockException
&
ex
){}
void
setOnBeforeSendCB
(
const
onBeforeSendCB
&
cb
){
_beforeSendCB
=
cb
;
//HttpRequestSplitter override
}
int64_t
onRecvHeader
(
const
char
*
data
,
uint64_t
len
)
override
;
private
:
onBeforeSendCB
_beforeSendCB
;
};
void
onRecvContent
(
const
char
*
data
,
uint64_t
len
)
override
;
template
<
typename
ClientType
,
WebSocketHeader
::
Type
DataType
=
WebSocketHeader
::
TEXT
>
class
HttpWsClient
:
public
HttpClientImp
,
public
WebSocketSplitter
{
public
:
typedef
shared_ptr
<
HttpWsClient
>
Ptr
;
HttpWsClient
(
ClientTypeImp
<
ClientType
,
DataType
>
&
delegate
)
:
_delegate
(
delegate
){
_Sec_WebSocket_Key
=
encodeBase64
(
SHA1
::
encode_bin
(
makeRandStr
(
16
,
false
)));
}
~
HttpWsClient
(){}
void
startWsClient
(
const
string
&
ws_url
,
float
fTimeOutSec
){
string
http_url
=
ws_url
;
replace
(
http_url
,
"ws://"
,
"http://"
);
replace
(
http_url
,
"wss://"
,
"https://"
);
setMethod
(
"GET"
);
addHeader
(
"Upgrade"
,
"websocket"
);
addHeader
(
"Connection"
,
"Upgrade"
);
addHeader
(
"Sec-WebSocket-Version"
,
"13"
);
addHeader
(
"Sec-WebSocket-Key"
,
_Sec_WebSocket_Key
);
_onRecv
=
nullptr
;
sendRequest
(
http_url
,
fTimeOutSec
);
}
protected
:
protected
:
virtual
void
onConnect
(
const
SockException
&
ex
)
override
;
//HttpClientImp override
virtual
void
onRecv
(
const
Buffer
::
Ptr
&
pBuf
)
override
;
/**
* 收到http回复头
* @param status 状态码,譬如:200 OK
* @param headers http头
* @return 返回后续content的长度;-1:后续数据全是content;>=0:固定长度content
* 需要指出的是,在http头中带有Content-Length字段时,该返回值无效
*/
int64_t
onResponseHeader
(
const
string
&
status
,
const
HttpHeader
&
headers
)
override
{
if
(
status
==
"101"
){
auto
Sec_WebSocket_Accept
=
encodeBase64
(
SHA1
::
encode_bin
(
_Sec_WebSocket_Key
+
"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
));
if
(
Sec_WebSocket_Accept
==
const_cast
<
HttpHeader
&>
(
headers
)[
"Sec-WebSocket-Accept"
]){
//success
onWebSocketException
(
SockException
());
return
0
;
}
shutdown
(
SockException
(
Err_shutdown
,
StrPrinter
<<
"Sec-WebSocket-Accept mismatch"
));
return
0
;
}
shutdown
(
SockException
(
Err_shutdown
,
StrPrinter
<<
"bad http status code:"
<<
status
));
return
0
;
};
virtual
void
onErr
(
const
SockException
&
ex
)
override
;
/**
* 接收http回复完毕,
*/
void
onResponseCompleted
()
override
{}
//TcpClient override
void
onManager
()
override
{
if
(
_onRecv
){
//websocket连接成功了
_delegate
.
onManager
();
}
else
{
//websocket连接中...
HttpClientImp
::
onManager
();
}
}
//数据全部发送完毕后回调
void
onFlush
()
override
{
if
(
_onRecv
){
//websocket连接成功了
_delegate
.
onFlush
();
}
else
{
//websocket连接中...
HttpClientImp
::
onFlush
();
}
}
virtual
void
onFlush
()
override
{};
/**
* tcp连接结果
* @param ex
*/
void
onConnect
(
const
SockException
&
ex
)
override
{
if
(
ex
){
//tcp连接失败,直接返回失败
onWebSocketException
(
ex
);
return
;
}
//开始websocket握手
HttpClientImp
::
onConnect
(
ex
);
}
virtual
void
onManager
()
override
;
/**
* tcp收到数据
* @param pBuf
*/
void
onRecv
(
const
Buffer
::
Ptr
&
pBuf
)
override
{
if
(
_onRecv
){
//完成websocket握手后,拦截websocket数据
_onRecv
(
pBuf
);
}
else
{
//websocket握手数据
HttpClientImp
::
onRecv
(
pBuf
);
}
}
protected
:
//tcp连接断开
string
generate_websocket_client_handshake
(
const
char
*
ip
,
uint16_t
port
,
const
char
*
path
,
const
char
*
key
);
void
onErr
(
const
SockException
&
ex
)
override
{
//tcp断开或者shutdown导致的断开
onWebSocketException
(
ex
);
}
string
get_random
(
size_t
n
);
//WebSocketSplitter override
/**
* 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调
* @param header 数据包头
*/
void
onWebSocketDecodeHeader
(
const
WebSocketHeader
&
header
)
override
{
_payload
.
clear
();
}
void
onWebSocketDecodeHeader
(
const
WebSocketHeader
&
packet
)
override
;
/**
* 收到webSocket数据包负载
* @param header 数据包包头
* @param ptr 负载数据指针
* @param len 负载数据长度
* @param recved 已接收数据长度(包含本次数据长度),等于header._playload_len时则接受完毕
*/
void
onWebSocketDecodePlayload
(
const
WebSocketHeader
&
header
,
const
uint8_t
*
ptr
,
uint64_t
len
,
uint64_t
recved
)
override
{
_payload
.
append
((
char
*
)
ptr
,
len
);
}
void
onWebSocketDecodePlayload
(
const
WebSocketHeader
&
packet
,
const
uint8_t
*
ptr
,
uint64_t
len
,
uint64_t
recved
)
override
;
void
onWebSocketDecodeComplete
(
const
WebSocketHeader
&
header_in
)
override
;
/**
* 接收到完整的一个webSocket数据包后回调
* @param header 数据包包头
*/
void
onWebSocketDecodeComplete
(
const
WebSocketHeader
&
header_in
)
override
{
WebSocketHeader
&
header
=
const_cast
<
WebSocketHeader
&>
(
header_in
);
auto
flag
=
header
.
_mask_flag
;
//websocket客户端发送数据需要加密
header
.
_mask_flag
=
true
;
switch
(
header
.
_opcode
){
case
WebSocketHeader
:
:
CLOSE
:
{
//服务器主动关闭
WebSocketSplitter
::
encode
(
header
,
nullptr
);
shutdown
(
SockException
(
Err_eof
,
"websocket server close the connection"
));
}
break
;
case
WebSocketHeader
:
:
PING
:
{
//心跳包
header
.
_opcode
=
WebSocketHeader
::
PONG
;
WebSocketSplitter
::
encode
(
header
,
std
::
make_shared
<
BufferString
>
(
std
::
move
(
_payload
)));
}
break
;
case
WebSocketHeader
:
:
CONTINUATION
:
{
virtual
void
onWebSocketEncodeData
(
const
uint8_t
*
ptr
,
uint64_t
len
);
}
break
;
case
WebSocketHeader
:
:
TEXT
:
case
WebSocketHeader
:
:
BINARY
:
{
//接收完毕websocket数据包,触发onRecv事件
_delegate
.
onRecv
(
std
::
make_shared
<
BufferString
>
(
std
::
move
(
_payload
)));
}
break
;
default
:
break
;
}
_payload
.
clear
();
header
.
_mask_flag
=
flag
;
}
/**
* websocket数据编码回调
* @param ptr 数据指针
* @param len 数据指针长度
*/
void
onWebSocketEncodeData
(
const
Buffer
::
Ptr
&
buffer
)
override
{
HttpClientImp
::
send
(
buffer
);
}
private
:
private
:
void
onResponseCompleted_l
();
void
onWebSocketException
(
const
SockException
&
ex
){
if
(
!
ex
){
//websocket握手成功
//此处截取TcpClient派生类发送的数据并进行websocket协议打包
weak_ptr
<
HttpWsClient
>
weakSelf
=
dynamic_pointer_cast
<
HttpWsClient
>
(
shared_from_this
());
_delegate
.
setOnBeforeSendCB
([
weakSelf
](
const
Buffer
::
Ptr
&
buf
){
auto
strongSelf
=
weakSelf
.
lock
();
if
(
strongSelf
){
WebSocketHeader
header
;
header
.
_fin
=
true
;
header
.
_reserved
=
0
;
header
.
_opcode
=
DataType
;
//客户端需要加密
header
.
_mask_flag
=
true
;
strongSelf
->
WebSocketSplitter
::
encode
(
header
,
buf
);
}
return
buf
->
size
();
});
//触发连接成功事件
_delegate
.
_sock
=
HttpClientImp
::
_sock
;
_delegate
.
onConnect
(
ex
);
//拦截websocket数据接收
_onRecv
=
[
this
](
const
Buffer
::
Ptr
&
pBuf
){
WebSocketSplitter
::
decode
((
uint8_t
*
)
pBuf
->
data
(),
pBuf
->
size
());
};
return
;
}
//websocket握手失败或者tcp连接失败或者中途断开
if
(
_onRecv
){
//握手成功之后的中途断开
_onRecv
=
nullptr
;
_delegate
.
onErr
(
ex
);
return
;
}
//websocket握手失败或者tcp连接失败
_delegate
.
onConnect
(
ex
);
}
protected
:
bool
_isHttps
;
private
:
private
:
string
_ip
;
string
_Sec_WebSocket_Key
;
int
_port
;
function
<
void
(
const
Buffer
::
Ptr
&
pBuf
)
>
_onRecv
;
string
_url
;
ClientTypeImp
<
ClientType
,
DataType
>
&
_delegate
;
string
_method
;
string
_payload
;
string
_path
;
//recv
int64_t
_recvedBodySize
;
int64_t
_totalBodySize
;
Parser
_parser
;
string
_lastHost
;
Ticker
_aliveTicker
;
float
_fTimeOutSec
=
0
;
std
::
shared_ptr
<
HttpChunkedSplitter
>
_chunkedSplitter
;
std
::
string
_key
;
///客户端的key
WSClientStatus
_WSClientStatus
;
///客户端状态
bool
_firstPacket
=
true
;
string
_remian_data
;
std
::
shared_ptr
<
TcpSession
>
_session
;
};
};
}
/* namespace mediakit */
template
<
typename
ClientType
,
WebSocketHeader
::
Type
DataType
=
WebSocketHeader
::
TEXT
,
bool
userWSS
=
false
>
class
WebSocketClient
:
public
ClientTypeImp
<
ClientType
,
DataType
>
{
public
:
typedef
std
::
shared_ptr
<
WebSocketClient
>
Ptr
;
template
<
typename
...
ArgsType
>
WebSocketClient
(
ArgsType
&&
...
args
)
:
ClientTypeImp
<
ClientType
,
DataType
>
(
std
::
forward
<
ArgsType
>
(
args
)...){
_wsClient
.
reset
(
new
HttpWsClient
<
ClientType
,
DataType
>
(
*
this
));
}
~
WebSocketClient
()
override
{}
void
startConnect
(
const
string
&
strUrl
,
uint16_t
iPort
,
float
fTimeOutSec
=
3
)
override
{
string
ws_url
;
if
(
userWSS
){
ws_url
=
StrPrinter
<<
"wss://"
+
strUrl
<<
":"
<<
iPort
<<
"/"
;
}
else
{
ws_url
=
StrPrinter
<<
"ws://"
+
strUrl
<<
":"
<<
iPort
<<
"/"
;
}
_wsClient
->
startWsClient
(
ws_url
,
fTimeOutSec
);
}
private
:
typename
HttpWsClient
<
ClientType
,
DataType
>::
Ptr
_wsClient
;
};
#endif
/* Http_HttpClient_h */
}
//namespace mediakit
#endif //ZLMEDIAKIT_WebSocketClient_H
This diff is collapsed.
Click to expand it.
tests/test_wsClient.cpp
0 → 100644
查看文件 @
644905a3
/*
* MIT License
*
* Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <signal.h>
#include <string>
#include <iostream>
#include "Util/MD5.h"
#include "Util/logger.h"
#include "Http/WebSocketClient.h"
using
namespace
std
;
using
namespace
toolkit
;
using
namespace
mediakit
;
class
EchoTcpClient
:
public
TcpClient
{
public
:
EchoTcpClient
(
const
EventPoller
::
Ptr
&
poller
=
nullptr
){
InfoL
;
}
~
EchoTcpClient
()
override
{
InfoL
;
}
protected
:
void
onRecv
(
const
Buffer
::
Ptr
&
pBuf
)
override
{
DebugL
<<
pBuf
->
toString
();
}
//被动断开连接回调
void
onErr
(
const
SockException
&
ex
)
override
{
WarnL
<<
ex
.
what
();
}
//tcp连接成功后每2秒触发一次该事件
void
onManager
()
override
{
send
(
"echo test!"
);
DebugL
<<
"send echo test"
;
}
//连接服务器结果回调
void
onConnect
(
const
SockException
&
ex
)
override
{
DebugL
<<
ex
.
what
();
}
//数据全部发送完毕后回调
void
onFlush
()
override
{
DebugL
;
}
};
int
main
(
int
argc
,
char
*
argv
[])
{
//设置退出信号处理函数
static
semaphore
sem
;
signal
(
SIGINT
,
[](
int
)
{
sem
.
post
();
});
// 设置退出信号
//设置日志
Logger
::
Instance
().
add
(
std
::
make_shared
<
ConsoleChannel
>
());
Logger
::
Instance
().
setWriter
(
std
::
make_shared
<
AsyncLogWriter
>
());
WebSocketClient
<
EchoTcpClient
>::
Ptr
client
=
std
::
make_shared
<
WebSocketClient
<
EchoTcpClient
>
>
();
client
->
startConnect
(
"127.0.0.1"
,
80
);
sem
.
wait
();
return
0
;
}
This diff is collapsed.
Click to expand it.
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论