Commit 458a9252 by xiongziliang

完善服务器日志打印

parent 69624f84
...@@ -124,7 +124,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { ...@@ -124,7 +124,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
string cmd = _parser.Method(); string cmd = _parser.Method();
auto it = g_mapCmdIndex.find(cmd); auto it = g_mapCmdIndex.find(cmd);
if (it == g_mapCmdIndex.end()) { if (it == g_mapCmdIndex.end()) {
WarnL << cmd; WarnP(this) << cmd;
sendResponse("403 Forbidden", makeHttpHeader(true), ""); sendResponse("403 Forbidden", makeHttpHeader(true), "");
shutdown(); shutdown();
return 0; return 0;
...@@ -156,7 +156,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -156,7 +156,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
} }
void HttpSession::onError(const SockException& err) { void HttpSession::onError(const SockException& err) {
//WarnL << err.what(); // WarnP(this) << err.what();
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
...@@ -174,7 +174,7 @@ void HttpSession::onManager() { ...@@ -174,7 +174,7 @@ void HttpSession::onManager() {
if(_ticker.elapsedTime() > keepAliveSec * 1000){ if(_ticker.elapsedTime() > keepAliveSec * 1000){
//1分钟超时 //1分钟超时
WarnL<<"HttpSession timeouted!"; // WarnP(this) <<"HttpSession timeouted!";
shutdown(); shutdown();
} }
} }
...@@ -441,7 +441,6 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -441,7 +441,6 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) {
if (iRead < iReq || !*piLeft) { if (iRead < iReq || !*piLeft) {
//文件读完 //文件读完
//InfoL << "send complete!" << iRead << " " << iReq << " " << *piLeft;
if(iRead>0) { if(iRead>0) {
sendBuf->setSize(iRead); sendBuf->setSize(iRead);
strongSelf->send(sendBuf); strongSelf->send(sendBuf);
...@@ -456,7 +455,6 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { ...@@ -456,7 +455,6 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) {
int iSent = strongSelf->send(sendBuf); int iSent = strongSelf->send(sendBuf);
if(iSent == -1) { if(iSent == -1) {
//套机制销毁 //套机制销毁
//InfoL << "send error";
return false; return false;
} }
if(strongSelf->isSocketBusy()){ if(strongSelf->isSocketBusy()){
...@@ -583,7 +581,6 @@ inline void HttpSession::sendResponse(const char* pcStatus, const KeyValue& head ...@@ -583,7 +581,6 @@ inline void HttpSession::sendResponse(const char* pcStatus, const KeyValue& head
} }
printer << "\r\n" << strContent; printer << "\r\n" << strContent;
auto strSend = printer << endl; auto strSend = printer << endl;
//DebugL << strSend;
send(strSend); send(strSend);
_ticker.resetTime(); _ticker.resetTime();
} }
......
...@@ -34,7 +34,7 @@ namespace mediakit { ...@@ -34,7 +34,7 @@ namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
DebugL << get_peer_ip(); DebugP(this);
//设置15秒发送超时时间 //设置15秒发送超时时间
pSock->setSendTimeOutSecond(15); pSock->setSendTimeOutSecond(15);
//起始接收buffer缓存设置为4K,节省内存 //起始接收buffer缓存设置为4K,节省内存
...@@ -42,11 +42,11 @@ RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { ...@@ -42,11 +42,11 @@ RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
} }
RtmpSession::~RtmpSession() { RtmpSession::~RtmpSession() {
DebugL << get_peer_ip(); DebugP(this);
} }
void RtmpSession::onError(const SockException& err) { void RtmpSession::onError(const SockException& err) {
DebugL << err.what(); DebugP(this) << err.what();
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
...@@ -65,14 +65,14 @@ void RtmpSession::onError(const SockException& err) { ...@@ -65,14 +65,14 @@ void RtmpSession::onError(const SockException& err) {
void RtmpSession::onManager() { void RtmpSession::onManager() {
if (_ticker.createdTime() > 15 * 1000) { if (_ticker.createdTime() > 15 * 1000) {
if (!_pRingReader && !_pPublisherSrc) { if (!_pRingReader && !_pPublisherSrc) {
WarnL << "非法链接:" << get_peer_ip(); WarnP(this) << "非法链接";
shutdown(); shutdown();
} }
} }
if (_pPublisherSrc) { if (_pPublisherSrc) {
//publisher //publisher
if (_ticker.elapsedTime() > 15 * 1000) { if (_ticker.elapsedTime() > 15 * 1000) {
WarnL << "数据接收超时:" << get_peer_ip(); WarnP(this) << "数据接收超时";
shutdown(); shutdown();
} }
} }
...@@ -84,7 +84,7 @@ void RtmpSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -84,7 +84,7 @@ void RtmpSession::onRecv(const Buffer::Ptr &pBuf) {
_ui64TotalBytes += pBuf->size(); _ui64TotalBytes += pBuf->size();
onParseRtmp(pBuf->data(), pBuf->size()); onParseRtmp(pBuf->data(), pBuf->size());
} catch (exception &e) { } catch (exception &e) {
WarnL << e.what(); WarnP(this) << e.what();
shutdown(); shutdown();
} }
} }
...@@ -134,8 +134,12 @@ void RtmpSession::onCmd_createStream(AMFDecoder &dec) { ...@@ -134,8 +134,12 @@ void RtmpSession::onCmd_createStream(AMFDecoder &dec) {
void RtmpSession::onCmd_publish(AMFDecoder &dec) { void RtmpSession::onCmd_publish(AMFDecoder &dec) {
std::shared_ptr<Ticker> pTicker(new Ticker); std::shared_ptr<Ticker> pTicker(new Ticker);
std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker](){ weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
DebugL << "publish 回复时间:" << pTicker->elapsedTime() << "ms"; std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker,weakSelf](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
DebugP(strongSelf.get()) << "publish 回复时间:" << pTicker->elapsedTime() << "ms";
}
})); }));
dec.load<AMFValue>();/* NULL */ dec.load<AMFValue>();/* NULL */
_mediaInfo.parse(_strTcUrl + "/" + dec.load<std::string>()); _mediaInfo.parse(_strTcUrl + "/" + dec.load<std::string>());
...@@ -155,7 +159,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -155,7 +159,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
status.set("clientid", "0"); status.set("clientid", "0");
sendReply("onStatus", nullptr, status); sendReply("onStatus", nullptr, status);
if (!ok) { if (!ok) {
WarnL << "onPublish:" WarnP(this) << "onPublish:"
<< (authSuccess ? "Already publishing:" : err.data()) << " " << (authSuccess ? "Already publishing:" : err.data()) << " "
<< _mediaInfo._vhost << " " << _mediaInfo._vhost << " "
<< _mediaInfo._app << " " << _mediaInfo._app << " "
...@@ -169,7 +173,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -169,7 +173,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024)); _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
}; };
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){ Broadcast::AuthInvoker invoker = [weakSelf,onRes,pToken](const string &err){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
...@@ -219,7 +222,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -219,7 +222,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
status.set("clientid", "0"); status.set("clientid", "0");
sendReply("onStatus", nullptr, status); sendReply("onStatus", nullptr, status);
if (!ok) { if (!ok) {
WarnL << (authSuccess ? "No such stream:" : err.data()) << " " WarnP(this) << (authSuccess ? "No such stream:" : err.data()) << " "
<< _mediaInfo._vhost << " " << _mediaInfo._vhost << " "
<< _mediaInfo._app << " " << _mediaInfo._app << " "
<< _mediaInfo._streamid << _mediaInfo._streamid
...@@ -264,7 +267,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -264,7 +267,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
sendResponse(MSG_DATA, invoke.data()); sendResponse(MSG_DATA, invoke.data());
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) { src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) {
//DebugL<<"send initial frame"; //DebugP(this)<<"send initial frame";
onSendMedia(pkt); onSendMedia(pkt);
}); });
...@@ -318,10 +321,13 @@ void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool ...@@ -318,10 +321,13 @@ void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool
void RtmpSession::doPlay(AMFDecoder &dec){ void RtmpSession::doPlay(AMFDecoder &dec){
std::shared_ptr<Ticker> pTicker(new Ticker); std::shared_ptr<Ticker> pTicker(new Ticker);
std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker](){
DebugL << "play 回复时间:" << pTicker->elapsedTime() << "ms";
}));
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker,weakSelf](){
auto strongSelf = weakSelf.lock();
if(strongSelf) {
DebugP(strongSelf.get()) << "play 回复时间:" << pTicker->elapsedTime() << "ms";
}
}));
Broadcast::AuthInvoker invoker = [weakSelf,pToken](const string &err){ Broadcast::AuthInvoker invoker = [weakSelf,pToken](const string &err){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
...@@ -353,7 +359,7 @@ void RtmpSession::onCmd_play(AMFDecoder &dec) { ...@@ -353,7 +359,7 @@ void RtmpSession::onCmd_play(AMFDecoder &dec) {
void RtmpSession::onCmd_pause(AMFDecoder &dec) { void RtmpSession::onCmd_pause(AMFDecoder &dec) {
dec.load<AMFValue>();/* NULL */ dec.load<AMFValue>();/* NULL */
bool paused = dec.load<bool>(); bool paused = dec.load<bool>();
TraceL << paused; TraceP(this) << paused;
AMFValue status(AMF_OBJECT); AMFValue status(AMF_OBJECT);
status.set("level", "status"); status.set("level", "status");
status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify"); status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify");
...@@ -406,7 +412,7 @@ void RtmpSession::onProcessCmd(AMFDecoder &dec) { ...@@ -406,7 +412,7 @@ void RtmpSession::onProcessCmd(AMFDecoder &dec) {
std::string method = dec.load<std::string>(); std::string method = dec.load<std::string>();
auto it = g_mapCmd.find(method); auto it = g_mapCmd.find(method);
if (it == g_mapCmd.end()) { if (it == g_mapCmd.end()) {
TraceL << "can not support cmd:" << method; TraceP(this) << "can not support cmd:" << method;
return; return;
} }
_dNowReqID = dec.load<double>(); _dNowReqID = dec.load<double>();
...@@ -427,7 +433,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { ...@@ -427,7 +433,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
case MSG_DATA3: { case MSG_DATA3: {
AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0); AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0);
std::string type = dec.load<std::string>(); std::string type = dec.load<std::string>();
TraceL << "notify:" << type; TraceP(this) << "notify:" << type;
if (type == "@setDataFrame") { if (type == "@setDataFrame") {
setMetaData(dec); setMetaData(dec);
} }
...@@ -446,7 +452,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { ...@@ -446,7 +452,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
} }
break; break;
default: default:
WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size()); WarnP(this) << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
break; break;
} }
} }
...@@ -454,7 +460,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { ...@@ -454,7 +460,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
void RtmpSession::onCmd_seek(AMFDecoder &dec) { void RtmpSession::onCmd_seek(AMFDecoder &dec) {
dec.load<AMFValue>();/* NULL */ dec.load<AMFValue>();/* NULL */
auto milliSeconds = dec.load<AMFValue>().as_number(); auto milliSeconds = dec.load<AMFValue>().as_number();
InfoL << "rtmp seekTo(ms):" << milliSeconds; InfoP(this) << "rtmp seekTo(ms):" << milliSeconds;
auto stongSrc = _pPlayerSrc.lock(); auto stongSrc = _pPlayerSrc.lock();
if (stongSrc) { if (stongSrc) {
stongSrc->seekTo(milliSeconds); stongSrc->seekTo(milliSeconds);
......
...@@ -85,7 +85,7 @@ private: ...@@ -85,7 +85,7 @@ private:
if(!force && _pPublisherSrc->readerCount() != 0){ if(!force && _pPublisherSrc->readerCount() != 0){
return false; return false;
} }
InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
safeShutdown(); safeShutdown();
return true; return true;
} }
......
...@@ -76,15 +76,15 @@ RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { ...@@ -76,15 +76,15 @@ RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
pSock->setSendTimeOutSecond(15); pSock->setSendTimeOutSecond(15);
//起始接收buffer缓存设置为4K,节省内存 //起始接收buffer缓存设置为4K,节省内存
pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024)); pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
DebugL << get_peer_ip(); DebugP(this);
} }
RtspSession::~RtspSession() { RtspSession::~RtspSession() {
DebugL << get_peer_ip(); DebugP(this);
} }
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what(); TraceP(this) << err.getErrCode() << " " << err.what();
if (_rtpType == Rtsp::RTP_MULTICAST) { if (_rtpType == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听 //取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
...@@ -113,7 +113,7 @@ void RtspSession::onError(const SockException& err) { ...@@ -113,7 +113,7 @@ void RtspSession::onError(const SockException& err) {
void RtspSession::onManager() { void RtspSession::onManager() {
if (_ticker.createdTime() > 15 * 1000) { if (_ticker.createdTime() > 15 * 1000) {
if (_strSession.size() == 0) { if (_strSession.size() == 0) {
WarnL << "非法链接:" << get_peer_ip(); WarnP(this) << "非法链接";
shutdown(); shutdown();
return; return;
} }
...@@ -122,7 +122,7 @@ void RtspSession::onManager() { ...@@ -122,7 +122,7 @@ void RtspSession::onManager() {
if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) { if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) {
//如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测
WarnL << "RTSP会话超时:" << get_peer_ip(); WarnP(this) << "RTSP会话超时";
shutdown(); shutdown();
return; return;
} }
...@@ -135,7 +135,7 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { ...@@ -135,7 +135,7 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
//http poster的请求数据转发给http getter处理 //http poster的请求数据转发给http getter处理
_onRecv(pBuf); _onRecv(pBuf);
} else { } else {
// TraceL << pBuf->size() << "\r\n" << pBuf->data(); // TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
input(pBuf->data(),pBuf->size()); input(pBuf->data(),pBuf->size());
} }
} }
...@@ -173,7 +173,7 @@ void RtspSession::onWholeRtspPacket(Parser &parser) { ...@@ -173,7 +173,7 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
} }
} else{ } else{
shutdown(); shutdown();
WarnL << "不支持的rtsp命令:" << strCmd; WarnP(this) << "不支持的rtsp命令:" << strCmd;
} }
} }
...@@ -223,7 +223,7 @@ bool RtspSession::handleReq_ANNOUNCE(const Parser &parser) { ...@@ -223,7 +223,7 @@ bool RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
false)); false));
if(src){ if(src){
sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing."); sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
WarnL << "ANNOUNCE:" WarnP(this) << "ANNOUNCE:"
<< "Already publishing:" << "Already publishing:"
<< _mediaInfo._vhost << " " << _mediaInfo._vhost << " "
<< _mediaInfo._app << " " << _mediaInfo._app << " "
...@@ -312,7 +312,7 @@ bool RtspSession::handleReq_Describe(const Parser &parser) { ...@@ -312,7 +312,7 @@ bool RtspSession::handleReq_Describe(const Parser &parser) {
auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src); auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
if (!rtsp_src) { if (!rtsp_src) {
//未找到相应的MediaSource //未找到相应的MediaSource
WarnL << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; WarnP(strongSelf.get()) << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
strongSelf->send_StreamNotFound(); strongSelf->send_StreamNotFound();
strongSelf->shutdown(); strongSelf->shutdown();
return; return;
...@@ -436,7 +436,7 @@ void RtspSession::onAuthBasic(const weak_ptr<RtspSession> &weakSelf,const string ...@@ -436,7 +436,7 @@ void RtspSession::onAuthBasic(const weak_ptr<RtspSession> &weakSelf,const string
//此时必须提供明文密码 //此时必须提供明文密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,realm,user, true,invoker,*strongSelf)){ if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,realm,user, true,invoker,*strongSelf)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件!"; WarnP(strongSelf.get()) << "请监听kBroadcastOnRtspAuth事件!";
//但是我们还是忽略认证以便完成播放 //但是我们还是忽略认证以便完成播放
//我们输入的密码是明文 //我们输入的密码是明文
invoker(false,pwd); invoker(false,pwd);
...@@ -449,7 +449,7 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin ...@@ -449,7 +449,7 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
return; return;
} }
DebugL << strMd5; DebugP(strongSelf.get()) << strMd5;
auto mapTmp = Parser::parseArgs(strMd5,",","="); auto mapTmp = Parser::parseArgs(strMd5,",","=");
decltype(mapTmp) map; decltype(mapTmp) map;
for(auto &pr : mapTmp){ for(auto &pr : mapTmp){
...@@ -457,14 +457,14 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin ...@@ -457,14 +457,14 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
} }
//check realm //check realm
if(realm != map["realm"]){ if(realm != map["realm"]){
TraceL << "realm not mached:" << realm << "," << map["realm"]; TraceP(strongSelf.get()) << "realm not mached:" << realm << "," << map["realm"];
onAuthFailed(weakSelf,realm); onAuthFailed(weakSelf,realm);
return ; return ;
} }
//check nonce //check nonce
auto nonce = map["nonce"]; auto nonce = map["nonce"];
if(strongSelf->_strNonce != nonce){ if(strongSelf->_strNonce != nonce){
TraceL << "nonce not mached:" << nonce << "," << strongSelf->_strNonce; TraceP(strongSelf.get()) << "nonce not mached:" << nonce << "," << strongSelf->_strNonce;
onAuthFailed(weakSelf,realm); onAuthFailed(weakSelf,realm);
return ; return ;
} }
...@@ -473,16 +473,20 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin ...@@ -473,16 +473,20 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
auto uri = map["uri"]; auto uri = map["uri"];
auto response = map["response"]; auto response = map["response"];
if(username.empty() || uri.empty() || response.empty()){ if(username.empty() || uri.empty() || response.empty()){
TraceL << "username/uri/response empty:" << username << "," << uri << "," << response; TraceP(strongSelf.get()) << "username/uri/response empty:" << username << "," << uri << "," << response;
onAuthFailed(weakSelf,realm); onAuthFailed(weakSelf,realm);
return ; return ;
} }
auto realInvoker = [weakSelf,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){ auto realInvoker = [weakSelf,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;
}
if(ignoreAuth){ if(ignoreAuth){
//忽略认证 //忽略认证
onAuthSuccess(weakSelf); onAuthSuccess(weakSelf);
TraceL << "auth ignored"; TraceP(strongSelf.get()) << "auth ignored";
return; return;
} }
/* /*
...@@ -503,11 +507,11 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin ...@@ -503,11 +507,11 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
if(strcasecmp(good_response.data(),response.data()) == 0){ if(strcasecmp(good_response.data(),response.data()) == 0){
//认证成功!md5不区分大小写 //认证成功!md5不区分大小写
onAuthSuccess(weakSelf); onAuthSuccess(weakSelf);
TraceL << "onAuthSuccess"; TraceP(strongSelf.get()) << "onAuthSuccess";
}else{ }else{
//认证失败! //认证失败!
onAuthFailed(weakSelf,realm); onAuthFailed(weakSelf,realm);
TraceL << "onAuthFailed"; TraceP(strongSelf.get()) << "onAuthFailed";
} }
}; };
onAuth invoker = [realInvoker](bool encrypted,const string &good_pwd){ onAuth invoker = [realInvoker](bool encrypted,const string &good_pwd){
...@@ -517,7 +521,7 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin ...@@ -517,7 +521,7 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
//此时可以提供明文或md5加密的密码 //此时可以提供明文或md5加密的密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,realm,username, false,invoker,*strongSelf)){ if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,realm,username, false,invoker,*strongSelf)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件!"; WarnP(strongSelf.get()) << "请监听kBroadcastOnRtspAuth事件!";
//但是我们还是忽略认证以便完成播放 //但是我们还是忽略认证以便完成播放
realInvoker(true,true,""); realInvoker(true,true,"");
} }
...@@ -602,14 +606,14 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { ...@@ -602,14 +606,14 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
auto pSockRtp = std::make_shared<Socket>(_sock->getPoller()); auto pSockRtp = std::make_shared<Socket>(_sock->getPoller());
if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) { if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) {
//分配端口失败 //分配端口失败
WarnL << "分配rtp端口失败"; WarnP(this) << "分配rtp端口失败";
send_NotAcceptable(); send_NotAcceptable();
return false; return false;
} }
auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller()); auto pSockRtcp = std::make_shared<Socket>(_sock->getPoller());
if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) { if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) {
//分配端口失败 //分配端口失败
WarnL << "分配rtcp端口失败"; WarnP(this) << "分配rtcp端口失败";
send_NotAcceptable(); send_NotAcceptable();
return false; return false;
} }
...@@ -637,7 +641,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { ...@@ -637,7 +641,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
//尝试获取客户端nat映射地址 //尝试获取客户端nat映射地址
startListenPeerUdpData(trackIdx); startListenPeerUdpData(trackIdx);
//InfoL << "分配端口:" << srv_port; //InfoP(this) << "分配端口:" << srv_port;
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/UDP;unicast;" {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;"
...@@ -669,7 +673,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { ...@@ -669,7 +673,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1); auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
if (!pSockRtcp) { if (!pSockRtcp) {
//分配端口失败 //分配端口失败
WarnL << "分配rtcp端口失败"; WarnP(this) << "分配rtcp端口失败";
send_NotAcceptable(); send_NotAcceptable();
return false; return false;
} }
...@@ -724,7 +728,7 @@ bool RtspSession::handleReq_Play(const Parser &parser) { ...@@ -724,7 +728,7 @@ bool RtspSession::handleReq_Play(const Parser &parser) {
strStart = "0"; strStart = "0";
} }
auto iStartTime = 1000 * atof(strStart.data()); auto iStartTime = 1000 * atof(strStart.data());
InfoL << "rtsp seekTo(ms):" << iStartTime; InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
useBuf = !pMediaSrc->seekTo(iStartTime); useBuf = !pMediaSrc->seekTo(iStartTime);
}else if(pMediaSrc->readerCount() == 0){ }else if(pMediaSrc->readerCount() == 0){
//第一个消费者 //第一个消费者
...@@ -824,7 +828,7 @@ bool RtspSession::handleReq_Pause(const Parser &parser) { ...@@ -824,7 +828,7 @@ bool RtspSession::handleReq_Pause(const Parser &parser) {
bool RtspSession::handleReq_Teardown(const Parser &parser) { bool RtspSession::handleReq_Teardown(const Parser &parser) {
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
TraceL << "播放器断开连接!"; TraceP(this) << "播放器断开连接!";
return true; return true;
} }
...@@ -850,7 +854,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) { ...@@ -850,7 +854,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) {
//Poster 找到 Getter //Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie); auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) { if (it == g_mapGetter.end()) {
WarnL << "Http Poster未找到Http Getter"; WarnP(this) << "Http Poster未找到Http Getter";
return false; return false;
} }
...@@ -863,7 +867,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) { ...@@ -863,7 +867,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) {
_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){ _onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
auto httpGetterStrong = httpGetterWeak.lock(); auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){ if(!httpGetterStrong){
WarnL << "Http Getter已经释放"; WarnP(this) << "Http Getter已经释放";
shutdown(); shutdown();
return; return;
} }
...@@ -886,7 +890,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) { ...@@ -886,7 +890,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) {
} }
bool RtspSession::handleReq_SET_PARAMETER(const Parser &parser) { bool RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
//TraceL<<endl; //TraceP(this) <<endl;
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
return true; return true;
} }
...@@ -926,15 +930,17 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { ...@@ -926,15 +930,17 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto srcIP = inet_addr(get_peer_ip().data()); auto srcIP = inet_addr(get_peer_ip().data());
auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){ auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
WarnL << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
<< inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
return true;
}
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return false; return false;
} }
if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
<< inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
return true;
}
struct sockaddr addr=*pPeerAddr; struct sockaddr addr=*pPeerAddr;
strongSelf->async([weakSelf,pBuf,addr,intervaled]() { strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
...@@ -958,7 +964,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { ...@@ -958,7 +964,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) {
case Rtsp::RTP_UDP:{ case Rtsp::RTP_UDP:{
auto setEvent = [&](Socket::Ptr &sock,int intervaled){ auto setEvent = [&](Socket::Ptr &sock,int intervaled){
if(!sock){ if(!sock){
WarnL << "udp端口为空:" << intervaled; WarnP(this) << "udp端口为空:" << intervaled;
return; return;
} }
sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){ sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){
...@@ -1012,13 +1018,13 @@ bool RtspSession::sendRtspResponse(const string &res_code, ...@@ -1012,13 +1018,13 @@ bool RtspSession::sendRtspResponse(const string &res_code,
if(!sdp.empty()){ if(!sdp.empty()){
printer << sdp; printer << sdp;
} }
// DebugL << printer; // DebugP(this) << printer;
return send(std::make_shared<BufferString>(printer)) > 0 ; return send(std::make_shared<BufferString>(printer)) > 0 ;
} }
int RtspSession::send(const Buffer::Ptr &pkt){ int RtspSession::send(const Buffer::Ptr &pkt){
// if(!_enableSendRtp){ // if(!_enableSendRtp){
// DebugL << pkt->data(); // DebugP(this) << pkt->data();
// } // }
_ui64TotalBytes += pkt->size(); _ui64TotalBytes += pkt->size();
return TcpSession::send(pkt); return TcpSession::send(pkt);
...@@ -1083,14 +1089,14 @@ bool RtspSession::close(bool force) { ...@@ -1083,14 +1089,14 @@ bool RtspSession::close(bool force) {
if(!force && _pushSrc->readerCount() != 0){ if(!force && _pushSrc->readerCount() != 0){
return false; return false;
} }
InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
safeShutdown(); safeShutdown();
return true; return true;
} }
inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
//InfoL<<(int)pkt.Interleaved; //InfoP(this) <<(int)pkt.Interleaved;
switch (_rtpType) { switch (_rtpType) {
case Rtsp::RTP_TCP: { case Rtsp::RTP_TCP: {
BufferRtp::Ptr buffer(new BufferRtp(pkt)); BufferRtp::Ptr buffer(new BufferRtp(pkt));
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论