RtspSession.cpp 46.9 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9
 */
10

xiongzilaing committed
11
#include <atomic>
12
#include <iomanip>
xiongziliang committed
13
#include "Common/config.h"
xiongzilaing committed
14
#include "UDPServer.h"
xzl committed
15
#include "RtspSession.h"
xiongzilaing committed
16
#include "Util/mini.h"
17
#include "Util/MD5.h"
xiongziliang committed
18
#include "Util/base64.h"
xiongzilaing committed
19
#include "Util/onceToken.h"
xzl committed
20 21
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongzilaing committed
22
#include "Network/sockutil.h"
xzl committed
23

24 25
#define RTSP_SERVER_SEND_RTCP 0

xiongziliang committed
26 27
using namespace std;
using namespace toolkit;
xzl committed
28

xiongziliang committed
29
namespace mediakit {
xzl committed
30

31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
/**
 * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
 * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
 * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输
 * 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输
 * 4: rtp over http,下面着重讲解:rtp over http
 *
 * rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的,
 * 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求,
 * 第二次是http post请求。
 *
 * 这两次请求通过http请求头中的x-sessioncookie键完成绑定
 *
 * 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求
 * 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送
 * 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包,
 * 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接
 * 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据
 *
 * zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理
 */


//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
xiongziliang committed
58

xiongziliang committed
59
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
xiongziliang committed
60
    DebugP(this);
61 62
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
    pSock->setSendTimeOutSecond(keep_alive_sec);
xiongziliang committed
63 64
    //起始接收buffer缓存设置为4K,节省内存
    pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
xzl committed
65 66 67
}

RtspSession::~RtspSession() {
68
    DebugP(this);
xzl committed
69 70 71
}

void RtspSession::onError(const SockException& err) {
xiongziliang committed
72
    bool isPlayer = !_pushSrc;
73
    uint64_t duration = _ticker.createdTime()/1000;
74
    WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(")
xiongziliang committed
75 76 77
                << _mediaInfo._vhost << "/"
                << _mediaInfo._app << "/"
                << _mediaInfo._streamid
78 79
                << ")断开:" << err.what()
                << ",耗时(s):" << duration;
xiongziliang committed
80

81 82 83 84
    if (_rtpType == Rtsp::RTP_MULTICAST) {
        //取消UDP端口监听
        UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
    }
xzl committed
85

86 87 88 89 90
    if (_http_x_sessioncookie.size() != 0) {
        //移除http getter的弱引用记录
        lock_guard<recursive_mutex> lock(g_mtxGetter);
        g_mapGetter.erase(_http_x_sessioncookie);
    }
91 92

    //流量统计事件广播
93
    GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
94
    if(_ui64TotalBytes > iFlowThreshold * 1024){
95
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast<SockInfo &>(*this));
96
    }
97

xzl committed
98 99 100
}

void RtspSession::onManager() {
101
    GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
102 103 104
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);

    if (_ticker.createdTime() > handshake_sec * 1000) {
105 106 107 108 109
        if (_strSession.size() == 0) {
            shutdown(SockException(Err_timeout,"illegal connection"));
            return;
        }
    }
110

111

112 113
    if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
        //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测
xiongziliang committed
114 115
        shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
        return;
116
    }
xzl committed
117 118
}

xiongziliang committed
119
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
120
    _ticker.resetTime();
xiongziliang committed
121 122
    _ui64TotalBytes += pBuf->size();
    if (_onRecv) {
123 124 125
        //http poster的请求数据转发给http getter处理
        _onRecv(pBuf);
    } else {
126
//    	TraceP(this) << pBuf->size() << "\r\n" << pBuf->data();
127 128
        input(pBuf->data(),pBuf->size());
    }
xiongziliang committed
129
}
xiongziliang committed
130

131 132 133 134 135 136
//字符串是否以xx结尾
static inline bool end_of(const string &str, const string &substr){
    auto pos = str.rfind(substr);
    return pos != string::npos && pos == str.size() - substr.size();
};

xiongziliang committed
137
void RtspSession::onWholeRtspPacket(Parser &parser) {
138 139 140 141 142
    string strCmd = parser.Method(); //提取出请求命令字
    _iCseq = atoi(parser["CSeq"].data());
    if(_strContentBase.empty() && strCmd != "GET"){
        _strContentBase = parser.Url();
        _mediaInfo.parse(parser.FullUrl());
143 144
        _mediaInfo._schema = RTSP_SCHEMA;
    }
xzl committed
145

146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
    static unordered_map<string, rtsp_request_handler> s_cmd_functions;
    static onceToken token( []() {
        s_cmd_functions.emplace("OPTIONS",&RtspSession::handleReq_Options);
        s_cmd_functions.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
        s_cmd_functions.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
        s_cmd_functions.emplace("RECORD",&RtspSession::handleReq_RECORD);
        s_cmd_functions.emplace("SETUP",&RtspSession::handleReq_Setup);
        s_cmd_functions.emplace("PLAY",&RtspSession::handleReq_Play);
        s_cmd_functions.emplace("PAUSE",&RtspSession::handleReq_Pause);
        s_cmd_functions.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
        s_cmd_functions.emplace("GET",&RtspSession::handleReq_Get);
        s_cmd_functions.emplace("POST",&RtspSession::handleReq_Post);
        s_cmd_functions.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
        s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
    }, []() {});

    auto it = s_cmd_functions.find(strCmd);
    if (it == s_cmd_functions.end()) {
165
        sendRtspResponse("403 Forbidden");
xiongziliang committed
166
        shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd));
167
        return;
168
    }
169 170 171 172 173 174 175 176 177 178 179 180

    auto &fun = it->second;
    try {
        (this->*fun)(parser);
    }catch (SockException &ex){
        if(ex){
            shutdown(ex);
        }
    }catch (exception &ex){
        shutdown(SockException(Err_shutdown,ex.what()));
    }
    parser.Clear();
xiongziliang committed
181 182
}

xiongziliang committed
183
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
184 185 186 187 188 189 190 191
    if(!_pushSrc){
        return;
    }

    int trackIdx = -1;
    uint8_t interleaved = data[1];
    if(interleaved %2 == 0){
        trackIdx = getTrackIndexByInterleaved(interleaved);
192 193 194
        if (trackIdx != -1) {
            handleOneRtp(trackIdx,_aTrackInfo[trackIdx],(unsigned char *)data + 4, len - 4);
        }
195
    }else{
196 197
        trackIdx = getTrackIndexByInterleaved(interleaved - 1);
        if (trackIdx != -1) {
198
            onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4);
199
        }
200
    }
xiongziliang committed
201 202
}

203
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){
204 205

}
xiongziliang committed
206
int64_t RtspSession::getContentLength(Parser &parser) {
207 208 209 210 211
    if(parser.Method() == "POST"){
        //http post请求的content数据部分是base64编码后的rtsp请求信令包
        return remainDataSize();
    }
    return RtspSplitter::getContentLength(parser);
xzl committed
212 213
}

xiongziliang committed
214

215
void RtspSession::handleReq_Options(const Parser &parser) {
216 217
    //支持这些命令
    sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
xzl committed
218 219
}

220
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
221 222 223
    auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTSP_SCHEMA,
                                                                       _mediaInfo._vhost,
                                                                       _mediaInfo._app,
224
                                                                       _mediaInfo._streamid));
225 226
    if(src){
        sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
227 228 229 230 231
        string err = StrPrinter << "ANNOUNCE:"
                                << "Already publishing:"
                                << _mediaInfo._vhost << " "
                                << _mediaInfo._app << " "
                                << _mediaInfo._streamid << endl;
232 233
        throw SockException(Err_shutdown,err);
    }
xiongziliang committed
234

235 236 237
    auto full_url = parser.FullUrl();
    if(end_of(full_url,".sdp")){
        //去除.sdp后缀,防止EasyDarwin推流器强制添加.sdp后缀
238
        full_url = full_url.substr(0,full_url.length() - 4);
239 240 241
        _mediaInfo.parse(full_url);
    }

242 243 244 245 246 247
    if(_mediaInfo._app.empty() || _mediaInfo._streamid.empty()){
        //推流rtsp url必须最少两级(rtsp://host/app/stream_id),不允许莫名其妙的推流url
        sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url");
        throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url);
    }

248 249 250
    SdpParser sdpParser(parser.Content());
    _strSession = makeRandStr(12);
    _aTrackInfo = sdpParser.getAvailableTrack();
xiongziliang committed
251

252 253
    _pushSrc = std::make_shared<RtspMediaSourceImp>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid);
    _pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
254
    _pushSrc->setSdp(sdpParser.toString());
255

256
    sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"});
257 258
}

259
void RtspSession::handleReq_RECORD(const Parser &parser){
260 261
    if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
        send_SessionNotFound();
262
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record");
263 264 265 266 267 268 269 270
    }
    auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
        bool authSuccess = err.empty();
        if(!authSuccess){
            sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
            shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
            return;
        }
271

272 273 274
        //设置转协议
        _pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);

275 276 277 278
        _StrPrinter rtp_info;
        for(auto &track : _aTrackInfo){
            if (track->_inited == false) {
                //还有track没有setup
xiongziliang committed
279 280
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
281 282 283 284 285 286 287 288 289
            }
            rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ",";
        }

        rtp_info.pop_back();
        sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
        if(_rtpType == Rtsp::RTP_TCP){
            //如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能
            _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
290
            setSocketFlags();
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
        }
    };

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
            onRes(err,enableRtxp,enableHls,enableMP4);
        });
    };

    //rtsp推流需要鉴权
310
    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
311 312
    if(!flag){
        //该事件无人监听,默认不鉴权
313 314 315 316 317
        GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
        GET_CONFIG(bool,toHls,General::kPublishToHls);
        GET_CONFIG(bool,toMP4,General::kPublishToMP4);
        onRes("",toRtxp,toHls,toMP4);
    }
318 319
}

xiongziliang committed
320
void RtspSession::emitOnPlay(){
321
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
xiongziliang committed
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
    //url鉴权回调
    auto onRes = [weakSelf](const string &err) {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            return;
        }
        if (!err.empty()) {
            //播放url鉴权失败
            strongSelf->sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
            strongSelf->shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err));
            return;
        }
        strongSelf->onAuthSuccess();
    };

    Broadcast::AuthInvoker invoker = [weakSelf, onRes](const string &err) {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            return;
        }
        strongSelf->async([onRes, err, weakSelf]() {
            onRes(err);
        });
    };

    //广播通用播放url鉴权事件
    auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, static_cast<SockInfo &>(*this));
    if (!flag) {
        //该事件无人监听,默认不鉴权
        onRes("");
    }
    //已经鉴权过了
    _emit_on_play = true;
}

void RtspSession::handleReq_Describe(const Parser &parser) {
358
    //该请求中的认证信息
359
    auto authorization = parser["Authorization"];
xiongziliang committed
360 361 362
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    //rtsp专属鉴权是否开启事件回调
    onGetRealm invoker = [weakSelf, authorization](const string &realm) {
363
        auto strongSelf = weakSelf.lock();
xiongziliang committed
364
        if (!strongSelf) {
365 366 367 368
            //本对象已经销毁
            return;
        }
        //切换到自己的线程然后执行
xiongziliang committed
369
        strongSelf->async([weakSelf, realm, authorization]() {
370
            auto strongSelf = weakSelf.lock();
xiongziliang committed
371
            if (!strongSelf) {
372 373 374
                //本对象已经销毁
                return;
            }
xiongziliang committed
375 376 377
            if (realm.empty()) {
                //无需rtsp专属认证, 那么继续url通用鉴权认证(on_play)
                strongSelf->emitOnPlay();
378 379
                return;
            }
xiongziliang committed
380 381 382
            //该流需要rtsp专属认证,开启rtsp专属认证后,将不再触发url通用鉴权认证(on_play)
            strongSelf->_rtsp_realm = realm;
            strongSelf->onAuthUser(realm, authorization);
383 384 385
        });
    };

xiongziliang committed
386 387 388 389 390 391 392 393
    if(_rtsp_realm.empty()){
        //广播是否需要rtsp专属认证事件
        if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _mediaInfo, invoker, static_cast<SockInfo &>(*this))) {
            //无人监听此事件,说明无需认证
            invoker("");
        }
    }else{
        invoker(_rtsp_realm);
394 395 396 397
    }
}
void RtspSession::onAuthSuccess() {
    TraceP(this);
398
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
xiongziliang committed
399
    MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){
400 401 402 403 404 405 406
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
        if (!rtsp_src) {
            //未找到相应的MediaSource
xiongziliang committed
407
            string err = StrPrinter << "no such stream:" <<  strongSelf->_mediaInfo._vhost << " " <<  strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid;
408
            strongSelf->send_StreamNotFound();
xiongziliang committed
409
            strongSelf->shutdown(SockException(Err_shutdown,err));
410 411
            return;
        }
3503207480@qq.com committed
412
        //找到了相应的rtsp流
413
        strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
414 415
        if (strongSelf->_aTrackInfo.empty()) {
            //该流无效
416
            DebugL << "无trackInfo,该流无效";
417
            strongSelf->send_StreamNotFound();
xiongziliang committed
418
            strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
419 420 421 422 423 424 425 426 427
            return;
        }
        strongSelf->_strSession = makeRandStr(12);
        strongSelf->_pMediaSrc = rtsp_src;
        for(auto &track : strongSelf->_aTrackInfo){
            track->_ssrc = rtsp_src->getSsrc(track->_type);
            track->_seq = rtsp_src->getSeqence(track->_type);
            track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
        }
xiongziliang committed
428

429 430 431 432
        strongSelf->sendRtspResponse("200 OK",
                                     {"Content-Base",strongSelf->_strContentBase + "/",
                                      "x-Accept-Retransmit","our-retransmit",
                                      "x-Accept-Dynamic-Rate","1"
433
                                     },rtsp_src->getSdp());
434
    });
xzl committed
435
}
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
    GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
    if (!authBasic) {
        //我们需要客户端优先以md5方式认证
        _strNonce = makeRandStr(32);
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" });
    }else {
        //当然我们也支持base64认证,但是我们不建议这样做
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Basic realm=\"" << realm << "\"" });
    }
    if(close){
        shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << why));
452 453 454
    }
}

455
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
456 457 458
    //base64认证
    char user_pwd_buf[512];
    av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size());
459
    auto user_pwd_vec = split(user_pwd_buf,":");
460 461
    if(user_pwd_vec.size() < 2){
        //认证信息格式不合法,回复401 Unauthorized
462
        onAuthFailed(realm,"can not find user and passwd when basic64 auth");
463 464 465 466
        return;
    }
    auto user = user_pwd_vec[0];
    auto pwd = user_pwd_vec[1];
467
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
468
    onAuth invoker = [pwd,realm,weakSelf](bool encrypted,const string &good_pwd){
469 470 471 472
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            //本对象已经销毁
            return;
473
        }
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
        //切换到自己的线程执行
        strongSelf->async([weakSelf,good_pwd,pwd,realm](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //本对象已经销毁
                return;
            }
            //base64忽略encrypted参数,上层必须传入明文密码
            if(pwd == good_pwd){
                //提供的密码且匹配正确
                strongSelf->onAuthSuccess();
                return;
            }
            //密码错误
            strongSelf->onAuthFailed(realm,StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd);
        });
490
    };
491

492
    //此时必须提供明文密码
493
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,static_cast<SockInfo &>(*this))){
494
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
495
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
496 497 498 499 500
        //但是我们还是忽略认证以便完成播放
        //我们输入的密码是明文
        invoker(false,pwd);
    }
}
501

502
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
503
    DebugP(this) << strMd5;
504 505 506
    auto mapTmp = Parser::parseArgs(strMd5,",","=");
    decltype(mapTmp) map;
    for(auto &pr : mapTmp){
507
        map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
508 509 510
    }
    //check realm
    if(realm != map["realm"]){
511
        onAuthFailed(realm,StrPrinter << "realm not mached:" << realm << " != " << map["realm"]);
512 513 514 515
        return ;
    }
    //check nonce
    auto nonce = map["nonce"];
516 517
    if(_strNonce != nonce){
        onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce);
518 519 520 521 522 523 524
        return ;
    }
    //check username and uri
    auto username = map["username"];
    auto uri = map["uri"];
    auto response = map["response"];
    if(username.empty() || uri.empty() || response.empty()){
525
        onAuthFailed(realm,StrPrinter << "username/uri/response empty:" << username << "," << uri << "," << response);
526 527 528
        return ;
    }

529
    auto realInvoker = [this,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
530 531
        if(ignoreAuth){
            //忽略认证
532 533
            TraceP(this) << "auth ignored";
            onAuthSuccess();
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
            return;
        }
        /*
        response计算方法如下:
        RTSP客户端应该使用username + password并计算response如下:
        (1)当password为MD5编码,则
            response = md5( password:nonce:md5(public_method:url)  );
        (2)当password为ANSI字符串,则
            response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
         */
        auto encrypted_pwd = good_pwd;
        if(!encrypted){
            //提供的是明文密码
            encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
        }

        auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
xiongziliang committed
551
        if(strcasecmp(good_response.data(),response.data()) == 0){
552
            //认证成功!md5不区分大小写
553
            onAuthSuccess();
554 555
        }else{
            //认证失败!
556
            onAuthFailed(realm, StrPrinter << "password mismatch when md5 auth:" << good_response << " != " << response );
557 558
        }
    };
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        //切换到自己的线程确保realInvoker执行时,this指针有效
        strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
            realInvoker(false,encrypted,good_pwd);
        });
574 575 576
    };

    //此时可以提供明文或md5加密的密码
577
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,static_cast<SockInfo &>(*this))){
578
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
579
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
580 581 582 583 584
        //但是我们还是忽略认证以便完成播放
        realInvoker(true,true,"");
    }
}

585 586 587 588 589
void RtspSession::onAuthUser(const string &realm,const string &authorization){
    if(authorization.empty()){
        onAuthFailed(realm,"", false);
        return;
    }
590 591
    //请求中包含认证信息
    auto authType = FindField(authorization.data(),NULL," ");
592
    auto authStr = FindField(authorization.data()," ",NULL);
593 594
    if(authType.empty() || authStr.empty()){
        //认证信息格式不合法,回复401 Unauthorized
595
        onAuthFailed(realm,"can not find auth type or auth string");
596 597 598 599
        return;
    }
    if(authType == "Basic"){
        //base64认证,需要明文密码
600
        onAuthBasic(realm,authStr);
601 602
    }else if(authType == "Digest"){
        //md5认证
603
        onAuthDigest(realm,authStr);
604 605
    }else{
        //其他认证方式?不支持!
606
        onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
607 608
    }
}
xzl committed
609
inline void RtspSession::send_StreamNotFound() {
610
    sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
xzl committed
611 612
}
inline void RtspSession::send_UnsupportedTransport() {
613
    sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
xzl committed
614 615 616
}

inline void RtspSession::send_SessionNotFound() {
617
    sendRtspResponse("454 Session Not Found",{"Connection","Close"});
xzl committed
618
}
619 620

void RtspSession::handleReq_Setup(const Parser &parser) {
xzl committed
621
//处理setup命令,该函数可能进入多次
622
    auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size());
xiongziliang committed
623
    if(controlSuffix.front() == '/'){
624
        controlSuffix = controlSuffix.substr(1);
xiongziliang committed
625
    }
626 627 628
    int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
    if (trackIdx == -1) {
        //未找到相应track
629 630
        throw SockException(Err_shutdown, StrPrinter << "can not find any track by control suffix:" << controlSuffix);
    }
631 632 633
    SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
    if (trackRef->_inited) {
        //已经初始化过该Track
634
        throw SockException(Err_shutdown, "can not setup one track twice");
635 636 637 638 639 640 641 642 643 644 645 646 647
    }
    trackRef->_inited = true; //现在初始化

    if(_rtpType == Rtsp::RTP_Invalid){
        auto &strTransport = parser["Transport"];
        if(strTransport.find("TCP") != string::npos){
            _rtpType = Rtsp::RTP_TCP;
        }else if(strTransport.find("multicast") != string::npos){
            _rtpType = Rtsp::RTP_MULTICAST;
        }else{
            _rtpType = Rtsp::RTP_UDP;
        }
    }
xzl committed
648

649
    //允许接收rtp、rtcp包
650
    RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
xiongziliang committed
651

652 653
    switch (_rtpType) {
    case Rtsp::RTP_TCP: {
654 655 656 657 658 659 660 661 662 663 664 665 666
        if(_pushSrc){
            //rtsp推流时,interleaved由推流者决定
            auto key_values =  Parser::parseArgs(parser["Transport"],";","=");
            int interleaved_rtp = -1 , interleaved_rtcp = -1;
            if(2 == sscanf(key_values["interleaved"].data(),"%d-%d",&interleaved_rtp,&interleaved_rtcp)){
                trackRef->_interleaved = interleaved_rtp;
            }else{
                throw SockException(Err_shutdown, "can not find interleaved when setup of rtp over tcp");
            }
        }else{
            //rtsp播放时,由于数据共享分发,所以interleaved必须由服务器决定
            trackRef->_interleaved = 2 * trackRef->_type;
        }
667 668 669 670 671 672 673 674 675 676
        sendRtspResponse("200 OK",
                         {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
                                                 << "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";"
                                                 << "ssrc=" << printSSRC(trackRef->_ssrc),
                          "x-Transport-Options" , "late-tolerance=1.400000",
                          "x-Dynamic-Rate" , "1"
                         });
    }
        break;
    case Rtsp::RTP_UDP: {
677 678 679 680
        std::pair<Socket::Ptr, Socket::Ptr> pr;
        try{
            pr = makeSockPair(_sock->getPoller(), get_local_ip());
        }catch(std::exception &ex) {
681 682
            //分配端口失败
            send_NotAcceptable();
683
            throw SockException(Err_shutdown, ex.what());
684
        }
685

686 687
        _apRtpSock[trackIdx] = pr.first;
        _apRtcpSock[trackIdx] = pr.second;
688

689 690 691
        //设置客户端内网端口信息
        string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
        uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
692 693 694 695
        uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data());

        struct sockaddr_in peerAddr;
        //设置rtp发送目标地址
696 697 698 699
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
700
        pr.first->setSendPeerAddr((struct sockaddr *)(&peerAddr));
701

702
        //设置rtcp发送目标地址
703 704 705 706
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtcpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
707
        pr.second->setSendPeerAddr((struct sockaddr *)(&peerAddr));
708

709 710 711 712 713
        //尝试获取客户端nat映射地址
        startListenPeerUdpData(trackIdx);
        //InfoP(this) << "分配端口:" << srv_port;

        sendRtspResponse("200 OK",
714 715 716 717
                         {"Transport", StrPrinter << "RTP/AVP/UDP;unicast;"
                                                  << "client_port=" << strClientPort << ";"
                                                  << "server_port=" << pr.first->get_local_port() << "-" << pr.second->get_local_port() << ";"
                                                  << "ssrc=" << printSSRC(trackRef->_ssrc)
718 719 720 721 722 723 724 725
                         });
    }
        break;
    case Rtsp::RTP_MULTICAST: {
        if(!_multicaster){
            _multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
            if (!_multicaster) {
                send_NotAcceptable();
726 727
                throw SockException(Err_shutdown, "can not get a available udp multicast socket");
            }
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
            weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
            _multicaster->setDetachCB(this, [weakSelf]() {
                auto strongSelf = weakSelf.lock();
                if(!strongSelf) {
                    return;
                }
                strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
            });
        }
        int iSrvPort = _multicaster->getPort(trackRef->_type);
        //我们用trackIdx区分rtp和rtcp包
        //由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
        auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
        if (!pSockRtcp) {
            //分配端口失败
            send_NotAcceptable();
744
            throw SockException(Err_shutdown, "open shared rtcp socket failed");
745 746
        }
        startListenPeerUdpData(trackIdx);
747
        GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
748

749 750 751 752 753 754 755 756 757 758 759 760 761
        sendRtspResponse("200 OK",
                         {"Transport",StrPrinter << "RTP/AVP;multicast;"
                                                 << "destination=" << _multicaster->getIP() << ";"
                                                 << "source=" << get_local_ip() << ";"
                                                 << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
                                                 << "ttl=" << udpTTL << ";"
                                                 << "ssrc=" << printSSRC(trackRef->_ssrc)
                         });
    }
        break;
    default:
        break;
    }
xzl committed
762 763
}

764
void RtspSession::handleReq_Play(const Parser &parser) {
765 766
    if (_aTrackInfo.empty() || parser["Session"] != _strSession) {
        send_SessionNotFound();
xiongziliang committed
767
        throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any available track when play" : "session not found when play");
768
    }
xiongziliang committed
769 770 771 772 773 774 775 776 777 778
    auto pMediaSrc = _pMediaSrc.lock();
    if(!pMediaSrc){
        send_StreamNotFound();
        shutdown(SockException(Err_shutdown,"rtsp stream released"));
        return;
    }

    bool useBuf = true;
    _enableSendRtp = false;
    float iStartTime = 0;
779
    auto strRange = parser["Range"];
xiongziliang committed
780 781 782 783 784
    if (strRange.size()) {
        //这个是seek操作
        auto strStart = FindField(strRange.data(), "npt=", "-");
        if (strStart == "now") {
            strStart = "0";
785
        }
xiongziliang committed
786 787 788 789 790 791 792 793 794 795 796 797 798
        iStartTime = 1000 * atof(strStart.data());
        InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
        useBuf = !pMediaSrc->seekTo(iStartTime);
    } else if (pMediaSrc->totalReaderCount() == 0) {
        //第一个消费者
        pMediaSrc->seekTo(0);
    }

    _StrPrinter rtp_info;
    for (auto &track : _aTrackInfo) {
        if (track->_inited == false) {
            //还有track没有setup
            shutdown(SockException(Err_shutdown, "track not setuped"));
799
            return;
800
        }
xiongziliang committed
801 802 803
        track->_ssrc = pMediaSrc->getSsrc(track->_type);
        track->_seq = pMediaSrc->getSeqence(track->_type);
        track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
804

xiongziliang committed
805 806 807 808
        rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";"
                 << "seq=" << track->_seq << ";"
                 << "rtptime=" << (int) (track->_time_stamp * (track->_samplerate / 1000)) << ",";
    }
809

xiongziliang committed
810 811 812 813 814
    rtp_info.pop_back();
    sendRtspResponse("200 OK",
                     {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useBuf? pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000),
                      "RTP-Info",rtp_info
                     });
815

xiongziliang committed
816 817
    _enableSendRtp = true;
    setSocketFlags();
xiongziliang committed
818

xiongziliang committed
819 820 821 822
    if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
        weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
        _pRtpReader = pMediaSrc->getRing()->attach(getPoller(), useBuf);
        _pRtpReader->setDetachCB([weakSelf]() {
823
            auto strongSelf = weakSelf.lock();
xiongziliang committed
824
            if (!strongSelf) {
825 826
                return;
            }
xiongziliang committed
827 828 829 830 831 832 833 834 835 836
            strongSelf->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
        });
        _pRtpReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
            }
            if (strongSelf->_enableSendRtp) {
                strongSelf->sendRtpPacket(pack);
            }
837 838
        });
    }
xzl committed
839 840
}

841
void RtspSession::handleReq_Pause(const Parser &parser) {
842 843
    if (parser["Session"] != _strSession) {
        send_SessionNotFound();
844 845
        throw SockException(Err_shutdown,"session not found when pause");
    }
846

847 848
    sendRtspResponse("200 OK");
    _enableSendRtp = false;
xzl committed
849 850
}

851
void RtspSession::handleReq_Teardown(const Parser &parser) {
852
    sendRtspResponse("200 OK");
853
    throw SockException(Err_shutdown,"rtsp player send teardown request");
xzl committed
854 855
}

856
void RtspSession::handleReq_Get(const Parser &parser) {
857 858 859 860 861 862 863 864 865 866
    _http_x_sessioncookie = parser["x-sessioncookie"];
    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");

    //注册http getter,以便http poster绑定
    lock_guard<recursive_mutex> lock(g_mtxGetter);
    g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
xzl committed
867 868
}

869
void RtspSession::handleReq_Post(const Parser &parser) {
870 871 872 873 874
    lock_guard<recursive_mutex> lock(g_mtxGetter);
    string sessioncookie = parser["x-sessioncookie"];
    //Poster 找到 Getter
    auto it = g_mapGetter.find(sessioncookie);
    if (it == g_mapGetter.end()) {
875 876
        throw SockException(Err_shutdown,"can not find http getter by x-sessioncookie");
    }
877

878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
    //Poster 找到Getter的SOCK
    auto httpGetterWeak = it->second;
    //移除http getter的弱引用记录
    g_mapGetter.erase(sessioncookie);

    //http poster收到请求后转发给http getter处理
    _onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){
        auto httpGetterStrong = httpGetterWeak.lock();
        if(!httpGetterStrong){
            shutdown(SockException(Err_shutdown,"http getter released"));
            return;
        }

        //切换到http getter的线程
        httpGetterStrong->async([pBuf,httpGetterWeak](){
            auto httpGetterStrong = httpGetterWeak.lock();
            if(!httpGetterStrong){
                return;
            }
            httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size()))));
        });
    };

    if(!parser.Content().empty()){
        //http poster后面的粘包
        _onRecv(std::make_shared<BufferString>(parser.Content()));
    }
905 906 907 908 909 910

    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");
xzl committed
911 912
}

913
void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
914 915
    //TraceP(this) <<endl;
    sendRtspResponse("200 OK");
xzl committed
916 917 918
}

inline void RtspSession::send_NotAcceptable() {
919
    sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
xzl committed
920
}
921

922

xiongziliang committed
923
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) {
924
    _pushSrc->onWrite(rtppt, false);
xiongziliang committed
925
}
926
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
927 928 929 930 931 932 933 934
    //这是rtcp心跳包,说明播放器还存活
    _ticker.resetTime();

    if(intervaled % 2 == 0){
        if(_pushSrc){
            //这是rtsp推流上来的rtp包
            handleOneRtp(intervaled / 2,_aTrackInfo[intervaled / 2],( unsigned char *)pBuf->data(),pBuf->size());
        }else if(!_udpSockConnected.count(intervaled)){
935
            //这是rtsp播放器的rtp打洞包
936 937
            _udpSockConnected.emplace(intervaled);
            _apRtpSock[intervaled / 2]->setSendPeerAddr(&addr);
938 939 940
        }
    }else{
        //rtcp包
941 942
        if(!_udpSockConnected.count(intervaled)){
            _udpSockConnected.emplace(intervaled);
943 944
            _apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr);
        }
945
        onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size());
946
    }
xzl committed
947 948 949
}


950
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
951
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
952
    auto srcIP = inet_addr(get_peer_ip().data());
953 954 955 956 957
    auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return false;
        }
958 959 960

        if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) {
            WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
961
                                    << SockUtil::inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr);
962 963 964
            return true;
        }

965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002
        struct sockaddr addr=*pPeerAddr;
        strongSelf->async([weakSelf,pBuf,addr,intervaled]() {
            auto strongSelf=weakSelf.lock();
            if(!strongSelf) {
                return;
            }
            strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr);
        });
        return true;
    };

    switch (_rtpType){
        case Rtsp::RTP_MULTICAST:{
            //组播使用的共享rtcp端口
            UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
                    int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
                return onUdpData(pBuf,pPeerAddr,intervaled);
            });
        }
            break;
        case Rtsp::RTP_UDP:{
            auto setEvent = [&](Socket::Ptr &sock,int intervaled){
                if(!sock){
                    WarnP(this) << "udp端口为空:" << intervaled;
                    return;
                }
                sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
                    onUdpData(pBuf,pPeerAddr,intervaled);
                });
            };
            setEvent(_apRtpSock[trackIdx], 2*trackIdx );
            setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 );
        }
            break;

        default:
            break;
    }
1003

xzl committed
1004 1005
}

1006
static string dateStr(){
1007 1008 1009 1010
    char buf[64];
    time_t tt = time(NULL);
    strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
    return buf;
1011 1012 1013
}

bool RtspSession::sendRtspResponse(const string &res_code,
1014 1015 1016 1017 1018 1019 1020 1021 1022
                                   const StrCaseMap &header_const,
                                   const string &sdp,
                                   const char *protocol){
    auto header = header_const;
    header.emplace("CSeq",StrPrinter << _iCseq);
    if(!_strSession.empty()){
        header.emplace("Session",_strSession);
    }

xiongziliang committed
1023
    header.emplace("Server",SERVER_NAME);
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
    header.emplace("Date",dateStr());

    if(!sdp.empty()){
        header.emplace("Content-Length",StrPrinter << sdp.size());
        header.emplace("Content-Type","application/sdp");
    }

    _StrPrinter printer;
    printer << protocol << " " << res_code << "\r\n";
    for (auto &pr : header){
        printer << pr.first << ": " << pr.second << "\r\n";
    }

    printer << "\r\n";

    if(!sdp.empty()){
        printer << sdp;
    }
1042
//	DebugP(this) << printer;
1043
    return send(std::make_shared<BufferString>(printer)) > 0 ;
1044 1045 1046
}

int RtspSession::send(const Buffer::Ptr &pkt){
xiongziliang committed
1047
//	if(!_enableSendRtp){
1048
//		DebugP(this) << pkt->data();
xiongziliang committed
1049
//	}
1050 1051
    _ui64TotalBytes += pkt->size();
    return TcpSession::send(pkt);
1052 1053 1054
}

bool RtspSession::sendRtspResponse(const string &res_code,
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
                                   const std::initializer_list<string> &header,
                                   const string &sdp,
                                   const char *protocol) {
    string key;
    StrCaseMap header_map;
    int i = 0;
    for(auto &val : header){
        if(++i % 2 == 0){
            header_map.emplace(key,val);
        }else{
            key = val;
        }
    }
    return sendRtspResponse(res_code,header_map,sdp,protocol);
1069 1070 1071
}

inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
1072 1073 1074 1075 1076
    for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
        if (type == _aTrackInfo[i]->_type) {
            return i;
        }
    }
xiongziliang committed
1077 1078 1079
    if(_aTrackInfo.size() == 1){
        return 0;
    }
1080
    return -1;
1081 1082
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
1083 1084 1085 1086 1087 1088
    for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
        if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
            return i;
        }
    }
    if(_aTrackInfo.size() == 1){
xiongziliang committed
1089
        return 0;
1090 1091
    }
    return -1;
1092 1093
}

xiongziliang committed
1094
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
1095 1096 1097 1098 1099
    for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
        if (_aTrackInfo[i]->_interleaved == interleaved) {
            return i;
        }
    }
xiongziliang committed
1100 1101 1102
    if(_aTrackInfo.size() == 1){
        return 0;
    }
1103
    return -1;
xiongziliang committed
1104 1105
}

xiongziliang committed
1106
bool RtspSession::close(MediaSource &sender,bool force) {
1107
    //此回调在其他线程触发
1108
    if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){
1109 1110
        return false;
    }
1111 1112 1113
    string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
    safeShutdown(SockException(Err_shutdown,err));
    return true;
1114
}
1115

1116 1117 1118
int RtspSession::totalReaderCount(MediaSource &sender) {
    return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount();
}
1119

1120
void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
1121
    //InfoP(this) <<(int)pkt.Interleaved;
1122 1123
    switch (_rtpType) {
        case Rtsp::RTP_TCP: {
1124 1125 1126 1127 1128 1129 1130 1131 1132
            int i = 0;
            int size = pkt->size();
            setSendFlushFlag(false);
            pkt->for_each([&](const RtpPacket::Ptr &rtp) {
                if (++i == size) {
                    setSendFlushFlag(true);
                }
                send(rtp);
            });
1133 1134 1135
        }
            break;
        case Rtsp::RTP_UDP: {
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
            int i = 0;
            int size = pkt->size();
            pkt->for_each([&](const RtpPacket::Ptr &rtp) {
                int iTrackIndex = getTrackIndexByTrackType(rtp->type);
                auto &pSock = _apRtpSock[iTrackIndex];
                if (!pSock) {
                    shutdown(SockException(Err_shutdown, "udp sock not opened yet"));
                    return;
                }

                BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
                _ui64TotalBytes += buffer->size();
                pSock->send(buffer, nullptr, 0, ++i == size);
            });
1150 1151 1152 1153 1154 1155
        }
            break;
        default:
            break;
    }

1156
#if RTSP_SERVER_SEND_RTCP
1157
    int iTrackIndex = getTrackIndexByTrackType(pkt->type);
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
    if(iTrackIndex == -1){
        return;
    }
    RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
    counter.pktCnt += 1;
    counter.octCount += (pkt->length - pkt->offset);
    auto &ticker = _aRtcpTicker[iTrackIndex];
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        ticker.resetTime();
        //直接保存网络字节序
        memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
        sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
    }
1172
#endif
1173 1174
}

xiongziliang committed
1175
void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
xiongziliang committed
1176 1177
    static const char s_cname[] = "ZLMediaKitRtsp";
    uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
1178 1179 1180 1181 1182 1183
    uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
    auto &track = _aTrackInfo[iTrackIndex];
    auto &counter = _aRtcpCnt[iTrackIndex];

    aui8Rtcp[0] = '$';
    aui8Rtcp[1] = track->_interleaved + 1;
xiongziliang committed
1184 1185
    aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8;
    aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF;
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224

    pui8Rtcp_SR[0] = 0x80;
    pui8Rtcp_SR[1] = 0xC8;
    pui8Rtcp_SR[2] = 0x00;
    pui8Rtcp_SR[3] = 0x06;

    uint32_t ssrc=htonl(track->_ssrc);
    memcpy(&pui8Rtcp_SR[4], &ssrc, 4);

    uint64_t msw;
    uint64_t lsw;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
    lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);

    msw = htonl(msw);
    memcpy(&pui8Rtcp_SR[8], &msw, 4);

    lsw = htonl(lsw);
    memcpy(&pui8Rtcp_SR[12], &lsw, 4);
    //直接使用网络字节序
    memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4);

    uint32_t pktCnt = htonl(counter.pktCnt);
    memcpy(&pui8Rtcp_SR[20], &pktCnt, 4);

    uint32_t octCount = htonl(counter.octCount);
    memcpy(&pui8Rtcp_SR[24], &octCount, 4);

    pui8Rtcp_SDES[0] = 0x81;
    pui8Rtcp_SDES[1] = 0xCA;
    pui8Rtcp_SDES[2] = 0x00;
    pui8Rtcp_SDES[3] = 0x06;

    memcpy(&pui8Rtcp_SDES[4], &ssrc, 4);

    pui8Rtcp_SDES[8] = 0x01;
    pui8Rtcp_SDES[9] = 0x0f;
xiongziliang committed
1225 1226
    memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
    pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
1227 1228 1229 1230 1231 1232

    if(overTcp){
        send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
    }else {
        _apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
    }
xzl committed
1233 1234
}

1235
void RtspSession::setSocketFlags(){
1236 1237
    GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
    if(mergeWriteMS > 0) {
1238 1239 1240
        //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
        SockUtil::setNoDelay(_sock->rawFD(), false);
        //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
xiongziliang committed
1241
        setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
1242 1243 1244
    }
}

xzl committed
1245
}
xiongziliang committed
1246
/* namespace mediakit */
xzl committed
1247