Commit 1ce9af35 by xiongziliang

1、复用rtsp url解析代码

2、修复rtsp/rtmp推流异常回调紊乱的问题
parent e4f9e377
...@@ -60,8 +60,8 @@ void RtmpPusher::teardown() { ...@@ -60,8 +60,8 @@ void RtmpPusher::teardown() {
} }
} }
void RtmpPusher::onPublishResult(const SockException &ex) { void RtmpPusher::onPublishResult(const SockException &ex,bool handshakeCompleted) {
if(_pPublishTimer){ if(!handshakeCompleted){
//播放结果回调 //播放结果回调
_pPublishTimer.reset(); _pPublishTimer.reset();
if(_onPublished){ if(_onPublished){
...@@ -87,7 +87,7 @@ void RtmpPusher::publish(const string &strUrl) { ...@@ -87,7 +87,7 @@ void RtmpPusher::publish(const string &strUrl) {
_strTcUrl = string("rtmp://") + strHost + "/" + _strApp; _strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
if (!_strApp.size() || !_strStream.size()) { if (!_strApp.size() || !_strStream.size()) {
onPublishResult(SockException(Err_other,"rtmp url非法")); onPublishResult(SockException(Err_other,"rtmp url非法"),false);
return; return;
} }
DebugL << strHost << " " << _strApp << " " << _strStream; DebugL << strHost << " " << _strApp << " " << _strStream;
...@@ -102,13 +102,13 @@ void RtmpPusher::publish(const string &strUrl) { ...@@ -102,13 +102,13 @@ void RtmpPusher::publish(const string &strUrl) {
} }
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this()); weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0; float publishTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
_pPublishTimer.reset( new Timer(playTimeOutSec, [weakSelf]() { _pPublishTimer.reset( new Timer(publishTimeOutSec, [weakSelf]() {
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return false; return false;
} }
strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"), false);
return false; return false;
},getPoller())); },getPoller()));
...@@ -120,11 +120,12 @@ void RtmpPusher::publish(const string &strUrl) { ...@@ -120,11 +120,12 @@ void RtmpPusher::publish(const string &strUrl) {
} }
void RtmpPusher::onErr(const SockException &ex){ void RtmpPusher::onErr(const SockException &ex){
onPublishResult(ex); //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex,!_pPublishTimer);
} }
void RtmpPusher::onConnect(const SockException &err){ void RtmpPusher::onConnect(const SockException &err){
if(err) { if(err) {
onPublishResult(err); onPublishResult(err,false);
return; return;
} }
//推流器不需要多大的接收缓存,节省内存占用 //推流器不需要多大的接收缓存,节省内存占用
...@@ -146,7 +147,8 @@ void RtmpPusher::onRecv(const Buffer::Ptr &pBuf){ ...@@ -146,7 +147,8 @@ void RtmpPusher::onRecv(const Buffer::Ptr &pBuf){
onParseRtmp(pBuf->data(), pBuf->size()); onParseRtmp(pBuf->data(), pBuf->size());
} catch (exception &e) { } catch (exception &e) {
SockException ex(Err_other, e.what()); SockException ex(Err_other, e.what());
onPublishResult(ex); //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex,!_pPublishTimer);
} }
} }
...@@ -223,10 +225,10 @@ inline void RtmpPusher::send_metaData(){ ...@@ -223,10 +225,10 @@ inline void RtmpPusher::send_metaData(){
_pRtmpReader->setDetachCB([weakSelf](){ _pRtmpReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(strongSelf){ if(strongSelf){
strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放")); strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放"), !strongSelf->_pPublishTimer);
} }
}); });
onPublishResult(SockException(Err_success,"success")); onPublishResult(SockException(Err_success,"success"), false);
//提升发送性能 //提升发送性能
setSocketFlags(); setSocketFlags();
} }
......
...@@ -63,7 +63,7 @@ protected: ...@@ -63,7 +63,7 @@ protected:
send(buffer); send(buffer);
} }
private: private:
void onPublishResult(const SockException &ex); void onPublishResult(const SockException &ex,bool handshakeCompleted);
template<typename FUN> template<typename FUN>
inline void addOnResultCB(const FUN &fun) { inline void addOnResultCB(const FUN &fun) {
......
...@@ -242,5 +242,58 @@ string SdpParser::toString() const { ...@@ -242,5 +242,58 @@ string SdpParser::toString() const {
return title + video + audio; return title + video + audio;
} }
bool RtspUrl::parse(const string &strUrl) {
auto schema = FindField(strUrl.data(), nullptr, "://");
bool isSSL = strcasecmp(schema.data(), "rtsps") == 0;
//查找"://"与"/"之间的字符串,用于提取用户名密码
auto middle_url = FindField(strUrl.data(), "://", "/");
if (middle_url.empty()) {
middle_url = FindField(strUrl.data(), "://", nullptr);
}
auto pos = middle_url.rfind('@');
if (pos == string::npos) {
//并没有用户名密码
return setup(isSSL, strUrl, "", "");
}
//包含用户名密码
auto user_pwd = middle_url.substr(0, pos);
auto suffix = strUrl.substr(schema.size() + 3 + pos + 1);
auto url = StrPrinter << "rtsp://" << suffix << endl;
if (user_pwd.find(":") == string::npos) {
return setup(isSSL, url, user_pwd, "");
}
auto user = FindField(user_pwd.data(), nullptr, ":");
auto pwd = FindField(user_pwd.data(), ":", nullptr);
return setup(isSSL, url, user, pwd);
}
bool RtspUrl::setup(bool isSSL, const string &strUrl, const string &strUser, const string &strPwd) {
auto ip = FindField(strUrl.data(), "://", "/");
if (ip.empty()) {
ip = split(FindField(strUrl.data(), "://", NULL), "?")[0];
}
auto port = atoi(FindField(ip.data(), ":", NULL).data());
if (port <= 0 || port > UINT16_MAX) {
//rtsp 默认端口554
port = isSSL ? 322 : 554;
} else {
//服务器域名
ip = FindField(ip.data(), NULL, ":");
}
if (ip.empty()) {
return false;
}
_url = std::move(strUrl);
_user = std::move(strUser);
_passwd = std::move(strPwd);
_host = std::move(ip);
_port = port;
_is_ssl = isSSL;
return true;
}
}//namespace mediakit }//namespace mediakit
...@@ -125,6 +125,24 @@ private: ...@@ -125,6 +125,24 @@ private:
map<string, SdpTrack::Ptr> _track_map; map<string, SdpTrack::Ptr> _track_map;
}; };
/**
* 解析rtsp url的工具类
*/
class RtspUrl{
public:
string _url;
string _user;
string _passwd;
string _host;
uint16_t _port;
bool _is_ssl;
public:
RtspUrl() = default;
~RtspUrl() = default;
bool parse(const string &url);
private:
bool setup(bool,const string &, const string &, const string &);
};
/** /**
* rtsp sdp基类 * rtsp sdp基类
......
...@@ -81,70 +81,25 @@ void RtspPlayer::teardown(){ ...@@ -81,70 +81,25 @@ void RtspPlayer::teardown(){
} }
void RtspPlayer::play(const string &strUrl){ void RtspPlayer::play(const string &strUrl){
Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[kRtpType]; RtspUrl url;
auto schema = FindField(strUrl.data(), nullptr,"://"); if(!url.parse(strUrl)){
bool isSSL = strcasecmp(schema.data(),"rtsps") == 0; onPlayResult_l(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl),false);
//查找"://"与"/"之间的字符串,用于提取用户名密码
auto middle_url = FindField(strUrl.data(),"://","/");
if(middle_url.empty()){
middle_url = FindField(strUrl.data(),"://", nullptr);
}
auto pos = middle_url.rfind('@');
if(pos == string::npos){
//并没有用户名密码
play(isSSL,strUrl,"","",eType);
return;
}
//包含用户名密码
auto user_pwd = middle_url.substr(0,pos);
auto suffix = strUrl.substr(schema.size() + 3 + pos + 1);
auto url = StrPrinter << "rtsp://" << suffix << endl;
if(user_pwd.find(":") == string::npos){
play(isSSL,url,user_pwd,"",eType);
return; return;
} }
auto user = FindField(user_pwd.data(),nullptr,":");
auto pwd = FindField(user_pwd.data(),":",nullptr);
play(isSSL,url,user,pwd,eType);
}
void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) {
DebugL << strUrl << " "
<< (strUser.size() ? strUser : "null") << " "
<< (strPwd.size() ? strPwd:"null") << " "
<< eType;
teardown(); teardown();
if(strUser.size()){ if (url._user.size()) {
(*this)[kRtspUser] = strUser; (*this)[kRtspUser] = url._user;
} }
if(strPwd.size()){ if (url._passwd.size()) {
(*this)[kRtspPwd] = strPwd; (*this)[kRtspPwd] = url._passwd;
(*this)[kRtspPwdIsMD5] = false; (*this)[kRtspPwdIsMD5] = false;
} }
_eType = eType; _strUrl = url._url;
_eType = (Rtsp::eRtpType)(int)(*this)[kRtpType];
auto ip = FindField(strUrl.data(), "://", "/"); DebugL << url._url << " " << (url._user.size() ? url._user : "null") << " " << (url._passwd.size() ? url._passwd : "null") << " " << _eType;
if (ip.empty()) {
ip = split(FindField(strUrl.data(), "://", NULL),"?")[0];
}
auto port = atoi(FindField(ip.data(), ":", NULL).data());
if (port <= 0) {
//rtsp 默认端口554
port = isSSL ? 322 : 554;
} else {
//服务器域名
ip = FindField(ip.data(), NULL, ":");
}
if(ip.empty()){
onPlayResult_l(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl),false);
return;
}
_strUrl = strUrl;
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this()); weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0; float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
...@@ -160,8 +115,9 @@ void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, co ...@@ -160,8 +115,9 @@ void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, co
if(!(*this)[kNetAdapter].empty()){ if(!(*this)[kNetAdapter].empty()){
setNetAdapter((*this)[kNetAdapter]); setNetAdapter((*this)[kNetAdapter]);
} }
startConnect(ip, port , playTimeOutSec); startConnect(url._host, url._port, playTimeOutSec);
} }
void RtspPlayer::onConnect(const SockException &err){ void RtspPlayer::onConnect(const SockException &err){
if(err.getErrCode() != Err_success) { if(err.getErrCode() != Err_success) {
onPlayResult_l(err,false); onPlayResult_l(err,false);
......
...@@ -106,7 +106,6 @@ private: ...@@ -106,7 +106,6 @@ private:
int getTrackIndexByInterleaved(int interleaved) const; int getTrackIndexByInterleaved(int interleaved) const;
int getTrackIndexByTrackType(TrackType trackType) const; int getTrackIndexByTrackType(TrackType trackType) const;
void play(bool isSSL,const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType);
void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex); void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex);
void handleResDESCRIBE(const Parser &parser); void handleResDESCRIBE(const Parser &parser);
bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr);
......
...@@ -43,98 +43,71 @@ void RtspPusher::teardown() { ...@@ -43,98 +43,71 @@ void RtspPusher::teardown() {
} }
void RtspPusher::publish(const string &strUrl) { void RtspPusher::publish(const string &strUrl) {
auto userAndPwd = FindField(strUrl.data(),"://","@"); RtspUrl url;
Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[ kRtpType]; if(!url.parse(strUrl)){
if(userAndPwd.empty()){ onPublishResult(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl),false);
publish(strUrl,"","",eType);
return; return;
} }
auto suffix = FindField(strUrl.data(),"@",nullptr);
auto url = StrPrinter << "rtsp://" << suffix << endl;
if(userAndPwd.find(":") == string::npos){
publish(url,userAndPwd,"",eType);
return;
}
auto user = FindField(userAndPwd.data(),nullptr,":");
auto pwd = FindField(userAndPwd.data(),":",nullptr);
publish(url,user,pwd,eType);
}
void RtspPusher::onPublishResult(const SockException &ex) {
if(_pPublishTimer){
//播放结果回调
_pPublishTimer.reset();
if(_onPublished){
_onPublished(ex);
}
} else {
//播放成功后异常断开回调
if(_onShutdown){
_onShutdown(ex);
}
}
if(ex){
teardown();
}
}
void RtspPusher::publish(const string & strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) {
DebugL << strUrl << " "
<< (strUser.size() ? strUser : "null") << " "
<< (strPwd.size() ? strPwd:"null") << " "
<< eType;
teardown(); teardown();
if(strUser.size()){ if (url._user.size()) {
(*this)[kRtspUser] = strUser; (*this)[kRtspUser] = url._user;
} }
if(strPwd.size()){ if (url._passwd.size()) {
(*this)[kRtspPwd] = strPwd; (*this)[kRtspPwd] = url._passwd;
(*this)[kRtspPwdIsMD5] = false; (*this)[kRtspPwdIsMD5] = false;
} }
_eType = eType;
auto ip = FindField(strUrl.data(), "://", "/");
if (!ip.size()) {
ip = FindField(strUrl.data(), "://", NULL);
}
auto port = atoi(FindField(ip.data(), ":", NULL).data());
if (port <= 0) {
//rtsp 默认端口554
port = 554;
} else {
//服务器域名
ip = FindField(ip.data(), NULL, ":");
}
_strUrl = strUrl; _strUrl = strUrl;
_eType = (Rtsp::eRtpType)(int)(*this)[kRtpType];
DebugL << url._url << " " << (url._user.size() ? url._user : "null") << " " << (url._passwd.size() ? url._passwd : "null") << " " << _eType;
weak_ptr<RtspPusher> weakSelf = dynamic_pointer_cast<RtspPusher>(shared_from_this()); weak_ptr<RtspPusher> weakSelf = dynamic_pointer_cast<RtspPusher>(shared_from_this());
float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0; float publishTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
_pPublishTimer.reset( new Timer(playTimeOutSec, [weakSelf]() { _pPublishTimer.reset( new Timer(publishTimeOutSec, [weakSelf]() {
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return false; return false;
} }
strongSelf->onPublishResult(SockException(Err_timeout,"publish rtsp timeout")); strongSelf->onPublishResult(SockException(Err_timeout,"publish rtsp timeout"),false);
return false; return false;
},getPoller())); },getPoller()));
if(!(*this)[kNetAdapter].empty()){ if(!(*this)[kNetAdapter].empty()){
setNetAdapter((*this)[kNetAdapter]); setNetAdapter((*this)[kNetAdapter]);
} }
startConnect(ip, port , playTimeOutSec);
startConnect(url._host, url._port, publishTimeOutSec);
}
void RtspPusher::onPublishResult(const SockException &ex, bool handshakeCompleted) {
if(!handshakeCompleted){
//播放结果回调
_pPublishTimer.reset();
if(_onPublished){
_onPublished(ex);
}
} else {
//播放成功后异常断开回调
if(_onShutdown){
_onShutdown(ex);
}
}
if(ex){
teardown();
}
} }
void RtspPusher::onErr(const SockException &ex) { void RtspPusher::onErr(const SockException &ex) {
onPublishResult(ex); //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex,!_pPublishTimer);
} }
void RtspPusher::onConnect(const SockException &err) { void RtspPusher::onConnect(const SockException &err) {
if(err) { if(err) {
onPublishResult(err); onPublishResult(err,false);
return; return;
} }
//推流器不需要多大的接收缓存,节省内存占用 //推流器不需要多大的接收缓存,节省内存占用
...@@ -147,7 +120,8 @@ void RtspPusher::onRecv(const Buffer::Ptr &pBuf){ ...@@ -147,7 +120,8 @@ void RtspPusher::onRecv(const Buffer::Ptr &pBuf){
input(pBuf->data(), pBuf->size()); input(pBuf->data(), pBuf->size());
} catch (exception &e) { } catch (exception &e) {
SockException ex(Err_other, e.what()); SockException ex(Err_other, e.what());
onPublishResult(ex); //定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex,!_pPublishTimer);
} }
} }
...@@ -377,7 +351,7 @@ void RtspPusher::sendRecord() { ...@@ -377,7 +351,7 @@ void RtspPusher::sendRecord() {
_pRtspReader->setDetachCB([weakSelf](){ _pRtspReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(strongSelf){ if(strongSelf){
strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放")); strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放"), !strongSelf->_pPublishTimer);
} }
}); });
if(_eType != Rtsp::RTP_TCP){ if(_eType != Rtsp::RTP_TCP){
...@@ -392,7 +366,7 @@ void RtspPusher::sendRecord() { ...@@ -392,7 +366,7 @@ void RtspPusher::sendRecord() {
return true; return true;
},getPoller())); },getPoller()));
} }
onPublishResult(SockException(Err_success,"success")); onPublishResult(SockException(Err_success,"success"), false);
//提升发送性能 //提升发送性能
setSocketFlags(); setSocketFlags();
}; };
......
...@@ -48,8 +48,7 @@ protected: ...@@ -48,8 +48,7 @@ protected:
void onWholeRtspPacket(Parser &parser) override ; void onWholeRtspPacket(Parser &parser) override ;
void onRtpPacket(const char *data,uint64_t len) override {}; void onRtpPacket(const char *data,uint64_t len) override {};
private: private:
void publish(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ); void onPublishResult(const SockException &ex, bool handshakeCompleted);
void onPublishResult(const SockException &ex);
void sendAnnounce(); void sendAnnounce();
void sendSetup(unsigned int uiTrackIndex); void sendSetup(unsigned int uiTrackIndex);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论