Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
ZLMediaKit
概览
Overview
Details
Activity
Cycle Analytics
版本库
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
问题
0
Issues
0
列表
Board
标记
里程碑
合并请求
0
Merge Requests
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
Snippets
成员
Collapse sidebar
Close sidebar
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
张翔宇
ZLMediaKit
Commits
b1951cc3
Commit
b1951cc3
authored
6 years ago
by
xiongziliang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
支持加密型websocket
支持更多websocket命令
parent
6b0e7883
隐藏空白字符变更
内嵌
并排
正在显示
4 个修改的文件
包含
234 行增加
和
231 行删除
+234
-231
src/Http/HttpSession.h
+0
-206
src/Http/HttpsSession.h
+196
-1
src/Http/WebSocketSplitter.cpp
+24
-18
src/Http/WebSocketSplitter.h
+14
-6
没有找到文件。
src/Http/HttpSession.h
查看文件 @
b1951cc3
...
...
@@ -128,212 +128,6 @@ private:
};
/**
* 通过该模板类可以透明化WebSocket协议,
* 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等
* @tparam SessionType 业务协议的TcpSession类
*/
template
<
typename
SessionType
>
class
WebSocketSession
:
public
HttpSession
{
public
:
WebSocketSession
(
const
std
::
shared_ptr
<
ThreadPool
>
&
pTh
,
const
Socket
::
Ptr
&
pSock
)
:
HttpSession
(
pTh
,
pSock
){}
virtual
~
WebSocketSession
(){}
//收到eof或其他导致脱离TcpServer事件的回调
void
onError
(
const
SockException
&
err
)
override
{
HttpSession
::
onError
(
err
);
if
(
_session
){
_session
->
onError
(
err
);
}
}
//每隔一段时间触发,用来做超时管理
void
onManager
()
override
{
HttpSession
::
onManager
();
if
(
_session
){
_session
->
onManager
();
}
}
void
attachServer
(
const
TcpServer
&
server
)
override
{
HttpSession
::
attachServer
(
server
);
_weakServer
=
const_cast
<
TcpServer
&>
(
server
).
shared_from_this
();
}
protected
:
/**
* 开始收到一个webSocket数据包
* @param packet
*/
void
onWebSocketDecodeHeader
(
const
WebSocketHeader
&
packet
)
override
{
//新包,原来的包残余数据清空掉
_remian_data
.
clear
();
if
(
!
_firstPacket
){
return
;
}
//这是个WebSocket会话而不是普通的Http会话
_firstPacket
=
false
;
_session
=
std
::
make_shared
<
SessionImp
>
(
getIdentifier
(),
nullptr
,
_sock
);
auto
strongServer
=
_weakServer
.
lock
();
if
(
strongServer
){
_session
->
attachServer
(
*
strongServer
);
}
//此处截取数据并进行websocket协议打包
weak_ptr
<
WebSocketSession
>
weakSelf
=
dynamic_pointer_cast
<
WebSocketSession
>
(
shared_from_this
());
_session
->
setOnBeforeSendCB
([
weakSelf
](
const
Buffer
::
Ptr
&
buf
){
auto
strongSelf
=
weakSelf
.
lock
();
if
(
strongSelf
){
bool
mask_flag
=
strongSelf
->
_mask_flag
;
strongSelf
->
_mask_flag
=
false
;
strongSelf
->
WebSocketSplitter
::
encode
((
uint8_t
*
)
buf
->
data
(),
buf
->
size
());
strongSelf
->
_mask_flag
=
mask_flag
;
}
return
buf
->
size
();
});
}
/**
* 收到websocket数据包负载
* @param packet
* @param ptr
* @param len
* @param recved
*/
void
onWebSocketDecodePlayload
(
const
WebSocketHeader
&
packet
,
const
uint8_t
*
ptr
,
uint64_t
len
,
uint64_t
recved
)
override
{
if
(
packet
.
_playload_len
==
recved
){
//收到完整的包
if
(
_remian_data
.
empty
()){
onRecvWholePacket
((
char
*
)
ptr
,
len
);
}
else
{
_remian_data
.
append
((
char
*
)
ptr
,
len
);
onRecvWholePacket
(
_remian_data
);
_remian_data
.
clear
();
}
}
else
{
//部分数据
_remian_data
.
append
((
char
*
)
ptr
,
len
);
}
}
/**
* 发送数据进行websocket协议打包后回调
* @param ptr
* @param len
*/
void
onWebSocketEncodeData
(
const
uint8_t
*
ptr
,
uint64_t
len
)
override
{
_session
->
realSend
(
_session
->
obtainBuffer
((
char
*
)
ptr
,
len
));
}
/**
* 收到一个完整的websock数据包
* @param data
* @param len
*/
void
onRecvWholePacket
(
const
char
*
data
,
uint64_t
len
){
BufferRaw
::
Ptr
buffer
=
_session
->
obtainBuffer
(
data
,
len
);
_session
->
onRecv
(
buffer
);
}
/**
* 收到一个完整的websock数据包
* @param str
*/
void
onRecvWholePacket
(
const
string
&
str
){
BufferString
::
Ptr
buffer
=
std
::
make_shared
<
BufferString
>
(
str
);
_session
->
onRecv
(
buffer
);
}
private
:
typedef
function
<
int
(
const
Buffer
::
Ptr
&
buf
)
>
onBeforeSendCB
;
/**
* 该类实现了TcpSession派生类发送数据的截取
* 目的是发送业务数据前进行websocket协议的打包
*/
class
SessionImp
:
public
SessionType
{
public
:
SessionImp
(
const
string
&
identifier
,
const
std
::
shared_ptr
<
ThreadPool
>
&
pTh
,
const
Socket
::
Ptr
&
pSock
)
:
_identifier
(
identifier
),
SessionType
(
pTh
,
pSock
){}
~
SessionImp
(){}
/**
* 截取到数据后,再进行webSocket协议打包
* 然后真正的发送数据到socket
* @param buf 数据
* @return 数据字节数
*/
int
realSend
(
const
Buffer
::
Ptr
&
buf
){
return
SessionType
::
send
(
buf
);
}
/**
* 设置发送数据截取回调函数
* @param cb 截取回调函数
*/
void
setOnBeforeSendCB
(
const
onBeforeSendCB
&
cb
){
_beforeSendCB
=
cb
;
}
protected
:
/**
* 重载send函数截取数据
* @param buf 需要截取的数据
* @return 数据字节数
*/
int
send
(
const
Buffer
::
Ptr
&
buf
)
override
{
if
(
_beforeSendCB
){
return
_beforeSendCB
(
buf
);
}
return
SessionType
::
send
(
buf
);
}
string
getIdentifier
()
const
override
{
return
_identifier
;
}
private
:
onBeforeSendCB
_beforeSendCB
;
string
_identifier
;
};
private
:
bool
_firstPacket
=
true
;
string
_remian_data
;
weak_ptr
<
TcpServer
>
_weakServer
;
std
::
shared_ptr
<
SessionImp
>
_session
;
};
/**
* 回显会话
*/
class
EchoSession
:
public
TcpSession
{
public
:
EchoSession
(
const
std
::
shared_ptr
<
ThreadPool
>
&
pTh
,
const
Socket
::
Ptr
&
pSock
)
:
TcpSession
(
pTh
,
pSock
){
DebugL
;
}
virtual
~
EchoSession
(){
DebugL
;
}
void
attachServer
(
const
TcpServer
&
server
)
override
{
DebugL
<<
getIdentifier
()
<<
" "
<<
TcpSession
::
getIdentifier
();
}
void
onRecv
(
const
Buffer
::
Ptr
&
buffer
)
override
{
send
(
buffer
);
}
void
onError
(
const
SockException
&
err
)
override
{
WarnL
<<
err
.
what
();
}
//每隔一段时间触发,用来做超时管理
void
onManager
()
override
{
DebugL
;
}
};
typedef
WebSocketSession
<
EchoSession
>
EchoWebSocketSession
;
}
/* namespace Http */
}
/* namespace ZL */
...
...
This diff is collapsed.
Click to expand it.
src/Http/HttpsSession.h
查看文件 @
b1951cc3
...
...
@@ -70,7 +70,7 @@ public:
HttpSession
::
onRecv
(
data
,
len
);
}
#endif//defined(__GNUC__) && (__GNUC__ < 5)
pr
ivate
:
pr
otected
:
virtual
int
send
(
const
Buffer
::
Ptr
&
buf
)
override
{
TimeTicker
();
m_sslBox
.
onSend
(
buf
->
data
(),
buf
->
size
());
...
...
@@ -79,6 +79,201 @@ private:
SSL_Box
m_sslBox
;
};
/**
* 通过该模板类可以透明化WebSocket协议,
* 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等
* @tparam SessionType 业务协议的TcpSession类
*/
template
<
class
SessionType
,
class
HttpSessionType
=
HttpSession
>
class
WebSocketSession
:
public
HttpSessionType
{
public
:
WebSocketSession
(
const
std
::
shared_ptr
<
ThreadPool
>
&
pTh
,
const
Socket
::
Ptr
&
pSock
)
:
HttpSessionType
(
pTh
,
pSock
){}
virtual
~
WebSocketSession
(){}
//收到eof或其他导致脱离TcpServer事件的回调
void
onError
(
const
SockException
&
err
)
override
{
HttpSession
::
onError
(
err
);
if
(
_session
){
_session
->
onError
(
err
);
}
}
//每隔一段时间触发,用来做超时管理
void
onManager
()
override
{
HttpSession
::
onManager
();
if
(
_session
){
_session
->
onManager
();
}
}
void
attachServer
(
const
TcpServer
&
server
)
override
{
HttpSession
::
attachServer
(
server
);
_weakServer
=
const_cast
<
TcpServer
&>
(
server
).
shared_from_this
();
}
protected
:
/**
* 开始收到一个webSocket数据包
* @param packet
*/
void
onWebSocketDecodeHeader
(
const
WebSocketHeader
&
packet
)
override
{
//新包,原来的包残余数据清空掉
_remian_data
.
clear
();
if
(
_firstPacket
){
//这是个WebSocket会话而不是普通的Http会话
_firstPacket
=
false
;
_session
=
std
::
make_shared
<
SessionImp
>
(
HttpSessionType
::
getIdentifier
(),
nullptr
,
HttpSessionType
::
_sock
);
auto
strongServer
=
_weakServer
.
lock
();
if
(
strongServer
){
_session
->
attachServer
(
*
strongServer
);
}
//此处截取数据并进行websocket协议打包
weak_ptr
<
WebSocketSession
>
weakSelf
=
dynamic_pointer_cast
<
WebSocketSession
>
(
HttpSessionType
::
shared_from_this
());
_session
->
setOnBeforeSendCB
([
weakSelf
](
const
Buffer
::
Ptr
&
buf
){
auto
strongSelf
=
weakSelf
.
lock
();
if
(
strongSelf
){
bool
mask_flag
=
strongSelf
->
_mask_flag
;
strongSelf
->
_mask_flag
=
false
;
strongSelf
->
WebSocketSplitter
::
encode
(
*
strongSelf
,(
uint8_t
*
)
buf
->
data
(),
buf
->
size
());
strongSelf
->
_mask_flag
=
mask_flag
;
}
return
buf
->
size
();
});
}
}
/**
* 收到websocket数据包负载
* @param packet
* @param ptr
* @param len
* @param recved
*/
void
onWebSocketDecodePlayload
(
const
WebSocketHeader
&
packet
,
const
uint8_t
*
ptr
,
uint64_t
len
,
uint64_t
recved
)
override
{
_remian_data
.
append
((
char
*
)
ptr
,
len
);
}
/**
* 接收到完整的一个webSocket数据包后回调
* @param header 数据包包头
*/
void
onWebSocketDecodeComplete
(
const
WebSocketHeader
&
header
)
override
{
switch
(
header
.
_opcode
){
case
WebSocketHeader
:
:
CLOSE
:
{
HttpSessionType
::
encode
(
header
,
nullptr
,
0
);
}
break
;
case
WebSocketHeader
:
:
PING
:
{
const_cast
<
WebSocketHeader
&>
(
header
).
_opcode
=
WebSocketHeader
::
PONG
;
HttpSessionType
::
encode
(
header
,(
uint8_t
*
)
_remian_data
.
data
(),
_remian_data
.
size
());
}
break
;
case
WebSocketHeader
:
:
CONTINUATION
:
{
}
break
;
case
WebSocketHeader
:
:
TEXT
:
case
WebSocketHeader
:
:
BINARY
:
{
BufferString
::
Ptr
buffer
=
std
::
make_shared
<
BufferString
>
(
_remian_data
);
_session
->
onRecv
(
buffer
);
}
break
;
default
:
break
;
}
_remian_data
.
clear
();
}
/**
* 发送数据进行websocket协议打包后回调
* @param ptr
* @param len
*/
void
onWebSocketEncodeData
(
const
uint8_t
*
ptr
,
uint64_t
len
)
override
{
SocketHelper
::
send
((
char
*
)
ptr
,
len
);
}
private
:
typedef
function
<
int
(
const
Buffer
::
Ptr
&
buf
)
>
onBeforeSendCB
;
/**
* 该类实现了TcpSession派生类发送数据的截取
* 目的是发送业务数据前进行websocket协议的打包
*/
class
SessionImp
:
public
SessionType
{
public
:
SessionImp
(
const
string
&
identifier
,
const
std
::
shared_ptr
<
ThreadPool
>
&
pTh
,
const
Socket
::
Ptr
&
pSock
)
:
_identifier
(
identifier
),
SessionType
(
pTh
,
pSock
){}
~
SessionImp
(){}
/**
* 设置发送数据截取回调函数
* @param cb 截取回调函数
*/
void
setOnBeforeSendCB
(
const
onBeforeSendCB
&
cb
){
_beforeSendCB
=
cb
;
}
protected
:
/**
* 重载send函数截取数据
* @param buf 需要截取的数据
* @return 数据字节数
*/
int
send
(
const
Buffer
::
Ptr
&
buf
)
override
{
if
(
_beforeSendCB
){
return
_beforeSendCB
(
buf
);
}
return
SessionType
::
send
(
buf
);
}
string
getIdentifier
()
const
override
{
return
_identifier
;
}
private
:
onBeforeSendCB
_beforeSendCB
;
string
_identifier
;
};
private
:
bool
_firstPacket
=
true
;
string
_remian_data
;
weak_ptr
<
TcpServer
>
_weakServer
;
std
::
shared_ptr
<
SessionImp
>
_session
;
};
/**
* 回显会话
*/
class
EchoSession
:
public
TcpSession
{
public
:
EchoSession
(
const
std
::
shared_ptr
<
ThreadPool
>
&
pTh
,
const
Socket
::
Ptr
&
pSock
)
:
TcpSession
(
pTh
,
pSock
){
DebugL
;
}
virtual
~
EchoSession
(){
DebugL
;
}
void
attachServer
(
const
TcpServer
&
server
)
override
{
DebugL
<<
getIdentifier
()
<<
" "
<<
TcpSession
::
getIdentifier
();
}
void
onRecv
(
const
Buffer
::
Ptr
&
buffer
)
override
{
send
(
buffer
);
}
void
onError
(
const
SockException
&
err
)
override
{
WarnL
<<
err
.
what
();
}
//每隔一段时间触发,用来做超时管理
void
onManager
()
override
{
DebugL
;
}
};
typedef
WebSocketSession
<
EchoSession
>
EchoWebSocketSession
;
}
/* namespace Http */
}
/* namespace ZL */
...
...
This diff is collapsed.
Click to expand it.
src/Http/WebSocketSplitter.cpp
查看文件 @
b1951cc3
...
...
@@ -115,6 +115,9 @@ begin_decode:
_mask_offset
=
0
;
_playload_offset
=
0
;
onWebSocketDecodeHeader
(
*
this
);
if
(
_playload_len
==
0
){
onWebSocketDecodeComplete
(
*
this
);
}
}
//进入后面逻辑代表已经获取到了webSocket协议头,
...
...
@@ -129,6 +132,8 @@ begin_decode:
onPlayloadData
(
ptr
,
playload_slice_len
);
if
(
_playload_offset
==
_playload_len
){
onWebSocketDecodeComplete
(
*
this
);
//这是下一个包
remain
-=
playload_slice_len
;
ptr
+=
playload_slice_len
;
...
...
@@ -157,47 +162,48 @@ void WebSocketSplitter::onPlayloadData(uint8_t *ptr, uint64_t len) {
onWebSocketDecodePlayload
(
*
this
,
_mask_flag
?
ptr
-
len
:
ptr
,
len
,
_playload_offset
);
}
void
WebSocketSplitter
::
encode
(
uint8_t
*
data
,
uint64_t
len
)
{
void
WebSocketSplitter
::
encode
(
const
WebSocketHeader
&
header
,
uint8_t
*
data
,
uint64_t
len
)
{
string
ret
;
uint8_t
byte
=
_fin
<<
7
|
((
_reserved
&
0x07
)
<<
4
)
|
(
_opcode
&
0x0F
)
;
uint8_t
byte
=
header
.
_fin
<<
7
|
((
header
.
_reserved
&
0x07
)
<<
4
)
|
(
header
.
_opcode
&
0x0F
)
;
ret
.
push_back
(
byte
);
_mask_flag
=
(
_mask_flag
&&
_mask
.
size
()
>=
4
);
byte
=
_
mask_flag
<<
7
;
auto
mask_flag
=
(
header
.
_mask_flag
&&
header
.
_mask
.
size
()
>=
4
);
byte
=
mask_flag
<<
7
;
_playload_len
=
len
;
if
(
_playload_len
<
126
){
byte
|=
_playload_len
;
if
(
len
<
126
){
byte
|=
len
;
ret
.
push_back
(
byte
);
}
else
if
(
_playload_
len
<=
0xFFFF
){
}
else
if
(
len
<=
0xFFFF
){
byte
|=
126
;
ret
.
push_back
(
byte
);
uint16_t
len
=
htons
(
_playload_
len
);
uint16_t
len
=
htons
(
len
);
ret
.
append
((
char
*
)
&
len
,
2
);
}
else
{
byte
|=
127
;
ret
.
push_back
(
byte
);
uint32_t
len_high
=
htonl
(
_playload_
len
>>
32
)
;
uint32_t
len_low
=
htonl
(
_playload_
len
&
0xFFFFFFFF
);
uint32_t
len_high
=
htonl
(
len
>>
32
)
;
uint32_t
len_low
=
htonl
(
len
&
0xFFFFFFFF
);
ret
.
append
((
char
*
)
&
len_high
,
4
);
ret
.
append
((
char
*
)
&
len_low
,
4
);
}
if
(
_
mask_flag
){
ret
.
append
((
char
*
)
_mask
.
data
(),
4
);
if
(
mask_flag
){
ret
.
append
((
char
*
)
header
.
_mask
.
data
(),
4
);
}
onWebSocketEncodeData
((
uint8_t
*
)
ret
.
data
(),
ret
.
size
());
if
(
_mask_flag
){
uint8_t
*
ptr
=
data
;
for
(
int
i
=
0
;
i
<
len
;
++
i
,
++
ptr
){
*
(
ptr
)
^=
_mask
[
i
%
4
];
if
(
len
>
0
){
if
(
mask_flag
){
uint8_t
*
ptr
=
data
;
for
(
int
i
=
0
;
i
<
len
;
++
i
,
++
ptr
){
*
(
ptr
)
^=
header
.
_mask
[
i
%
4
];
}
}
onWebSocketEncodeData
(
data
,
len
);
}
onWebSocketEncodeData
(
data
,
len
);
}
...
...
This diff is collapsed.
Click to expand it.
src/Http/WebSocketSplitter.h
查看文件 @
b1951cc3
...
...
@@ -87,25 +87,33 @@ public:
* 编码一个数据包
* 将触发2次onWebSocketEncodeData回调
* 第一次是数据头,第二次是负载数据
* @param header 数据头
* @param data 负载数据
* @param len 负载数据长度
*/
void
encode
(
uint8_t
*
data
,
uint64_t
len
);
void
encode
(
const
WebSocketHeader
&
header
,
uint8_t
*
data
,
uint64_t
len
);
protected
:
/**
* 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调
* @param
packet
数据包头
* @param
header
数据包头
*/
virtual
void
onWebSocketDecodeHeader
(
const
WebSocketHeader
&
packet
)
{};
virtual
void
onWebSocketDecodeHeader
(
const
WebSocketHeader
&
header
)
{};
/**
* 收到webSocket数据包负载
* @param
packet
数据包包头
* @param
header
数据包包头
* @param ptr 负载数据指针
* @param len 负载数据长度
* @param recved 已接收数据长度(包含本次数据长度),等于
packet
._playload_len时则接受完毕
* @param recved 已接收数据长度(包含本次数据长度),等于
header
._playload_len时则接受完毕
*/
virtual
void
onWebSocketDecodePlayload
(
const
WebSocketHeader
&
packet
,
const
uint8_t
*
ptr
,
uint64_t
len
,
uint64_t
recved
)
{};
virtual
void
onWebSocketDecodePlayload
(
const
WebSocketHeader
&
header
,
const
uint8_t
*
ptr
,
uint64_t
len
,
uint64_t
recved
)
{};
/**
* 接收到完整的一个webSocket数据包后回调
* @param header 数据包包头
*/
virtual
void
onWebSocketDecodeComplete
(
const
WebSocketHeader
&
header
)
{};
/**
* websocket数据编码回调
...
...
This diff is collapsed.
Click to expand it.
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论