Commit d2a78e5a by xiongziliang

完成rtsp推流

parent 212a761e
......@@ -42,8 +42,6 @@ using namespace toolkit;
namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
/**
* rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
* 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
......@@ -72,6 +70,8 @@ static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) : TcpSession(pTh, pSock) {
//设置10秒发送缓存
pSock->setSendBufSecond(10);
......@@ -134,13 +134,23 @@ void RtspSession::onManager() {
}
}
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
_ticker.resetTime();
_ui64TotalBytes += pBuf->size();
if (_onRecv) {
//http poster的请求数据转发给http getter处理
_onRecv(pBuf);
} else {
// TraceL << pBuf->size() << "\r\n" << pBuf->data();
input(pBuf->data(),pBuf->size());
}
}
int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
_parser.Parse(header); //rtsp请求解析
string strCmd = _parser.Method(); //提取出请求命令字
_iCseq = atoi(_parser["CSeq"].data());
void RtspSession::onWholeRtspPacket(Parser &parser) {
string strCmd = parser.Method(); //提取出请求命令字
_iCseq = atoi(parser["CSeq"].data());
typedef int (RtspSession::*rtsp_request_handler)();
typedef bool (RtspSession::*rtsp_request_handler)(const Parser &parser);
static unordered_map<string, rtsp_request_handler> s_handler_map;
static onceToken token( []() {
s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options);
......@@ -158,63 +168,51 @@ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
}, []() {});
auto it = s_handler_map.find(strCmd);
int ret = 0;
if (it != s_handler_map.end()) {
auto fun = it->second;
ret = (this->*fun)();
if(ret == -1){
auto &fun = it->second;
if(!(this->*fun)(parser)){
shutdown();
}
} else{
shutdown();
WarnL << "不支持的rtsp命令:" << strCmd;
}
_parser.Clear();
return ret;
}
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
_ticker.resetTime();
_ui64TotalBytes += pBuf->size();
if (_onRecv) {
//http poster的请求数据转发给http getter处理
_onRecv(pBuf);
} else {
inputRtspOrRtcp(pBuf->data(),pBuf->size());
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
if(len > 1600){
//没有大于MTU的包
return;
}
int trackIdx = -1;
uint8_t interleaved = data[1];
if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved);
}
if (trackIdx != -1) {
handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
}
}
void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
// DebugL << data;
if(data[0] == '$' && _rtpType == PlayerBase::RTP_TCP){
//这是rtcp
return;
int64_t RtspSession::getContentLength(Parser &parser) {
if(parser.Method() == "POST"){
//http post请求的content数据部分是base64编码后的rtsp请求信令包
return remainDataSize();
}
input(data,len);
return RtspSplitter::getContentLength(parser);
}
int RtspSession::handleReq_Options() {
bool RtspSession::handleReq_Options(const Parser &parser) {
//支持这些命令
sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
return 0;
}
void RtspSession::onRecvContent(const char *data, uint64_t len) {
// DebugL << data;
if(_onContent){
_onContent(data,len);
_onContent = nullptr;
}
return true;
}
int RtspSession::handleReq_ANNOUNCE() {
auto parseCopy = _parser;
_onContent = [this,parseCopy](const char *data, uint64_t len){
_parser = parseCopy;
_strSdp.assign(data,len);
bool RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
_strSdp = parser.Content();
//解析url获取媒体名称
_mediaInfo.parse(_parser.FullUrl());
_mediaInfo.parse(parser.FullUrl());
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTSP_SCHEMA,
_mediaInfo._vhost,
......@@ -228,26 +226,24 @@ int RtspSession::handleReq_ANNOUNCE() {
<< _mediaInfo._vhost << " "
<< _mediaInfo._app << " "
<< _mediaInfo._streamid << endl;
shutdown();
return;
return false;
}
_strSession = makeRandStr(12);
_aTrackInfo = SdpAttr(_strSdp).getAvailableTrack();
_strUrl = _parser.Url();
_strUrl = parser.Url();
_pushSrc = std::make_shared<RtspToRtmpMediaSource>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->onGetSDP(_strSdp);
sendRtspResponse("200 OK");
};
return atoi(_parser["Content-Length"].data());
return true;
}
int RtspSession::handleReq_RECORD(){
if (_aTrackInfo.empty() || _parser["Session"] != _strSession) {
bool RtspSession::handleReq_RECORD(const Parser &parser){
if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
send_SessionNotFound();
return -1;
return false;
}
auto onRes = [this](const string &err){
bool authSuccess = err.empty();
......@@ -295,26 +291,23 @@ int RtspSession::handleReq_RECORD(){
//该事件无人监听,默认不鉴权
onRes("");
}
return 0;
return true;
}
int RtspSession::handleReq_Describe() {
{
bool RtspSession::handleReq_Describe(const Parser &parser) {
//解析url获取媒体名称
_strUrl = _parser.Url();
_mediaInfo.parse(_parser.FullUrl());
}
_strUrl = parser.Url();
_mediaInfo.parse(parser.FullUrl());
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());\
auto parserCopy = _parser;
findStream([weakSelf,parserCopy](bool success){
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto authorization = parser["Authorization"];
findStream([weakSelf,authorization](bool success){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
//恢复现场
strongSelf->_parser = parserCopy;
if(!success){
//未找到相应的MediaSource
......@@ -324,7 +317,6 @@ int RtspSession::handleReq_Describe() {
return;
}
//该请求中的认证信息
auto authorization = strongSelf->_parser["Authorization"];
onGetRealm invoker = [weakSelf,authorization](const string &realm){
if(realm.empty()){
//无需认证,回复sdp
......@@ -344,7 +336,7 @@ int RtspSession::handleReq_Describe() {
invoker("");
}
});
return 0;
return true;
}
void RtspSession::onAuthSuccess(const weak_ptr<RtspSession> &weakSelf) {
auto strongSelf = weakSelf.lock();
......@@ -544,23 +536,23 @@ inline void RtspSession::send_UnsupportedTransport() {
inline void RtspSession::send_SessionNotFound() {
sendRtspResponse("454 Session Not Found",{"Connection","Close"});
}
int RtspSession::handleReq_Setup() {
bool RtspSession::handleReq_Setup(const Parser &parser) {
//处理setup命令,该函数可能进入多次
auto controlSuffix = _parser.FullUrl().substr(1 + _parser.FullUrl().rfind('/'));
auto controlSuffix = parser.FullUrl().substr(1 + parser.FullUrl().rfind('/'));
int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
if (trackIdx == -1) {
//未找到相应track
return -1;
return false;
}
SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
if (trackRef->_inited) {
//已经初始化过该Track
return -1;
return false;
}
trackRef->_inited = true; //现在初始化
if(_rtpType == PlayerBase::RTP_Invalid){
auto strTransport = _parser["Transport"];
auto strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){
_rtpType = PlayerBase::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){
......@@ -570,8 +562,12 @@ int RtspSession::handleReq_Setup() {
}
}
//允许接收rtp、rtcp包
RtspSplitter::enableRecvRtp(_rtpType == PlayerBase::RTP_TCP);
switch (_rtpType) {
case PlayerBase::RTP_TCP: {
trackRef->_interleaved = trackRef->_type * 2;
sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
<< "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";"
......@@ -588,19 +584,19 @@ int RtspSession::handleReq_Setup() {
//分配端口失败
WarnL << "分配rtp端口失败";
send_NotAcceptable();
return -1;
return false;
}
auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller());
if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return -1;
return false;
}
_apRtpSock[trackIdx] = pSockRtp;
_apRtcpSock[trackIdx] = pSockRtcp;
//设置客户端内网端口信息
string strClientPort = FindField(_parser["Transport"].data(), "client_port=", NULL);
string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
struct sockaddr_in peerAddr;
peerAddr.sin_family = AF_INET;
......@@ -625,7 +621,7 @@ int RtspSession::handleReq_Setup() {
_pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_pBrdcaster) {
send_NotAcceptable();
return -1;
return false;
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pBrdcaster->setDetachCB(this, [weakSelf]() {
......@@ -644,7 +640,7 @@ int RtspSession::handleReq_Setup() {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return -1;
return false;
}
startListenPeerUdpData(trackIdx);
GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL);
......@@ -662,15 +658,15 @@ int RtspSession::handleReq_Setup() {
default:
break;
}
return 0;
return true;
}
int RtspSession::handleReq_Play() {
if (_aTrackInfo.empty() || _parser["Session"] != _strSession) {
bool RtspSession::handleReq_Play(const Parser &parser) {
if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
send_SessionNotFound();
return -1;
return false;
}
auto strRange = _parser["Range"];
auto strRange = parser["Range"];
auto onRes = [this,strRange](const string &err){
bool authSuccess = err.empty();
if(!authSuccess){
......@@ -750,9 +746,6 @@ int RtspSession::handleReq_Play() {
if(!strongSelf) {
return;
}
if(!strongSelf->_enableSendRtp) {
return;
}
strongSelf->async([weakSelf,pack](){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
......@@ -791,28 +784,28 @@ int RtspSession::handleReq_Play() {
//后面是seek或恢复命令,不需要鉴权
onRes("");
}
return 0;
return true;
}
int RtspSession::handleReq_Pause() {
if (_parser["Session"] != _strSession) {
bool RtspSession::handleReq_Pause(const Parser &parser) {
if (parser["Session"] != _strSession) {
send_SessionNotFound();
return -1;
return false;
}
sendRtspResponse("200 OK");
_enableSendRtp = false;
return 0;
return true;
}
int RtspSession::handleReq_Teardown() {
bool RtspSession::handleReq_Teardown(const Parser &parser) {
sendRtspResponse("200 OK");
TraceL << "播放器断开连接!";
return 0;
return true;
}
int RtspSession::handleReq_Get() {
_http_x_sessioncookie = _parser["x-sessioncookie"];
bool RtspSession::handleReq_Get(const Parser &parser) {
_http_x_sessioncookie = parser["x-sessioncookie"];
sendRtspResponse("200 OK",
{"Connection","Close",
"Cache-Control","no-store",
......@@ -823,18 +816,18 @@ int RtspSession::handleReq_Get() {
//注册http getter,以便http poster绑定
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
return 0;
return true;
}
int RtspSession::handleReq_Post() {
bool RtspSession::handleReq_Post(const Parser &parser) {
lock_guard<recursive_mutex> lock(g_mtxGetter);
string sessioncookie = _parser["x-sessioncookie"];
string sessioncookie = parser["x-sessioncookie"];
//Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) {
WarnL << "Http Poster未找到Http Getter";
return -1;
return false;
}
//Poster 找到Getter的SOCK
......@@ -842,22 +835,6 @@ int RtspSession::handleReq_Post() {
//移除http getter的弱引用记录
g_mapGetter.erase(sessioncookie);
auto nextPacketSize = remainDataSize();
if(nextPacketSize > 0){
//防止http poster中的content部分粘包(后续content都是base64编码的rtsp请求包)
_onContent = [this](const char *data,uint64_t len){
BufferRaw::Ptr buffer = std::make_shared<BufferRaw>();
buffer->assign(data,len);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
async([weakSelf,buffer](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
strongSelf->onRecv(buffer);
}
},false);
};
}
//http poster收到请求后转发给http getter处理
_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
auto httpGetterStrong = httpGetterWeak.lock();
......@@ -877,13 +854,17 @@ int RtspSession::handleReq_Post() {
});
};
return nextPacketSize;
if(!parser.Content().empty()){
//http poster后面的粘包
_onRecv(std::make_shared<BufferString>(parser.Content()));
}
return true;
}
int RtspSession::handleReq_SET_PARAMETER() {
bool RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
//TraceL<<endl;
sendRtspResponse("200 OK");
return 0;
return true;
}
inline void RtspSession::send_NotAcceptable() {
......@@ -1027,7 +1008,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
if(_pushSrc){
_pushSrc->onWrite(rtppt,true);
_pushSrc->onWrite(rtppt, false);
}
}
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
......@@ -1153,6 +1134,9 @@ bool RtspSession::sendRtspResponse(const string &res_code,
}
int RtspSession::send(const Buffer::Ptr &pkt){
// if(!_enableSendRtp){
// DebugL << pkt->data();
// }
_ui64TotalBytes += pkt->size();
return TcpSession::send(pkt);
}
......@@ -1200,6 +1184,15 @@ inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix
return -1;
}
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (_aTrackInfo[i]->_interleaved == interleaved) {
return i;
}
}
return -1;
}
bool RtspSession::close() {
InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
safeShutdown();
......
......@@ -30,15 +30,15 @@
#include <set>
#include <vector>
#include <unordered_map>
#include "Util/util.h"
#include "Util/logger.h"
#include "Common/config.h"
#include "Network/TcpSession.h"
#include "Player/PlayerBase.h"
#include "Rtsp.h"
#include "RtpBroadCaster.h"
#include "RtspMediaSource.h"
#include "Player/PlayerBase.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Network/TcpSession.h"
#include "Http/HttpRequestSplitter.h"
#include "RtspSplitter.h"
#include "RtpReceiver.h"
#include "RtspToRtmpMediaSource.h"
......@@ -66,7 +66,7 @@ private:
uint32_t _offset;
};
class RtspSession: public TcpSession, public HttpRequestSplitter, public RtpReceiver , public MediaSourceEvent{
class RtspSession: public TcpSession, public RtspSplitter, public RtpReceiver , public MediaSourceEvent{
public:
typedef std::shared_ptr<RtspSession> Ptr;
typedef std::function<void(const string &realm)> onGetRealm;
......@@ -80,26 +80,43 @@ public:
void onError(const SockException &err) override;
void onManager() override;
protected:
//HttpRequestSplitter override
int64_t onRecvHeader(const char *data,uint64_t len) override ;
void onRecvContent(const char *data,uint64_t len) override;
//RtspSplitter override
/**
* 收到完整的rtsp包回调,包括sdp等content数据
* @param parser rtsp包
*/
void onWholeRtspPacket(Parser &parser) override;
/**
* 收到rtp包回调
* @param data
* @param len
*/
void onRtpPacket(const char *data,uint64_t len) override;
/**
* 从rtsp头中获取Content长度
* @param parser
* @return
*/
int64_t getContentLength(Parser &parser) override;
//RtpReceiver override
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
//MediaSourceEvent override
bool close() override ;
private:
void inputRtspOrRtcp(const char *data,uint64_t len);
int handleReq_Options(); //处理options方法
int handleReq_Describe(); //处理describe方法
int handleReq_ANNOUNCE(); //处理options方法
int handleReq_RECORD(); //处理options方法
int handleReq_Setup(); //处理setup方法
int handleReq_Play(); //处理play方法
int handleReq_Pause(); //处理pause方法
int handleReq_Teardown(); //处理teardown方法
int handleReq_Get(); //处理Get方法
int handleReq_Post(); //处理Post方法
int handleReq_SET_PARAMETER(); //处理SET_PARAMETER方法
bool handleReq_Options(const Parser &parser); //处理options方法
bool handleReq_Describe(const Parser &parser); //处理describe方法
bool handleReq_ANNOUNCE(const Parser &parser); //处理options方法
bool handleReq_RECORD(const Parser &parser); //处理options方法
bool handleReq_Setup(const Parser &parser); //处理setup方法
bool handleReq_Play(const Parser &parser); //处理play方法
bool handleReq_Pause(const Parser &parser); //处理pause方法
bool handleReq_Teardown(const Parser &parser); //处理teardown方法
bool handleReq_Get(const Parser &parser); //处理Get方法
bool handleReq_Post(const Parser &parser); //处理Post方法
bool handleReq_SET_PARAMETER(const Parser &parser); //处理SET_PARAMETER方法
void inline send_StreamNotFound(); //rtsp资源未找到
void inline send_UnsupportedTransport(); //不支持的传输模式
......@@ -111,6 +128,7 @@ private:
inline string printSSRC(uint32_t ui32Ssrc);
inline int getTrackIndexByTrackType(TrackType type);
inline int getTrackIndexByControlSuffix(const string &controlSuffix);
inline int getTrackIndexByInterleaved(int interleaved);
inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr);
inline void startListenPeerUdpData(int iTrackIdx);
......@@ -131,7 +149,6 @@ private:
int send(const Buffer::Ptr &pkt) override;
private:
Ticker _ticker;
Parser _parser; //rtsp解析类
int _iCseq = 0;
string _strUrl;
string _strSdp;
......@@ -162,7 +179,6 @@ private:
//quicktime 请求rtsp会产生两次tcp连接,
//一次发送 get 一次发送post,需要通过x-sessioncookie关联起来
string _http_x_sessioncookie;
function<void(const char *data,uint64_t len)> _onContent;
function<void(const Buffer::Ptr &pBuf)> _onRecv;
std::function<void()> _delayTask;
......
......@@ -59,9 +59,10 @@ int64_t RtspSplitter::onRecvHeader(const char *data, uint64_t len) {
return 0;
}
_parser.Parse(data);
auto ret = atoi(_parser["Content-Length"].data());
auto ret = getContentLength(_parser);
if(ret == 0){
onWholeRtspPacket(_parser);
_parser.Clear();
}
return ret;
}
......@@ -69,12 +70,17 @@ int64_t RtspSplitter::onRecvHeader(const char *data, uint64_t len) {
void RtspSplitter::onRecvContent(const char *data, uint64_t len) {
_parser.setContent(string(data,len));
onWholeRtspPacket(_parser);
_parser.Clear();
}
void RtspSplitter::enableRecvRtp(bool enable) {
_enableRecvRtp = enable;
}
int64_t RtspSplitter::getContentLength(Parser &parser) {
return atoi(parser["Content-Length"].data());
}
}//namespace mediakit
......
......@@ -36,6 +36,12 @@ class RtspSplitter : public HttpRequestSplitter{
public:
RtspSplitter(){}
virtual ~RtspSplitter(){}
/**
* 是否允许接收rtp包
* @param enable
*/
void enableRecvRtp(bool enable);
protected:
/**
* 收到完整的rtsp包回调,包括sdp等content数据
......@@ -51,10 +57,11 @@ protected:
virtual void onRtpPacket(const char *data,uint64_t len) = 0;
/**
* 是否允许接收rtp包
* @param enable
* 从rtsp头中获取Content长度
* @param parser
* @return
*/
void enableRecvRtp(bool enable);
virtual int64_t getContentLength(Parser &parser);
protected:
const char *onSearchPacketTail(const char *data,int len) override ;
int64_t onRecvHeader(const char *data,uint64_t len) override;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论