RtspSession.cpp 47.1 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9
 */
10

xiongzilaing committed
11
#include <atomic>
12
#include <iomanip>
xiongziliang committed
13
#include "Common/config.h"
xiongzilaing committed
14
#include "UDPServer.h"
xzl committed
15
#include "RtspSession.h"
xiongzilaing committed
16
#include "Util/mini.h"
17
#include "Util/MD5.h"
xiongziliang committed
18
#include "Util/base64.h"
xiongzilaing committed
19
#include "Util/onceToken.h"
xzl committed
20 21
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
xiongzilaing committed
22
#include "Network/sockutil.h"
xzl committed
23

24 25
#define RTSP_SERVER_SEND_RTCP 0

xiongziliang committed
26 27
using namespace std;
using namespace toolkit;
xzl committed
28

xiongziliang committed
29
namespace mediakit {
xzl committed
30

31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
/**
 * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种
 * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输
 * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输
 * 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输
 * 4: rtp over http,下面着重讲解:rtp over http
 *
 * rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的,
 * 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求,
 * 第二次是http post请求。
 *
 * 这两次请求通过http请求头中的x-sessioncookie键完成绑定
 *
 * 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求
 * 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送
 * 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包,
 * 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接
 * 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据
 *
 * zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理
 */


//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter;
xiongziliang committed
58

59
RtspSession::RtspSession(const Socket::Ptr &sock) : TcpSession(sock) {
xiongziliang committed
60
    DebugP(this);
61
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
62
    sock->setSendTimeOutSecond(keep_alive_sec);
xiongziliang committed
63
    //起始接收buffer缓存设置为4K,节省内存
64
    sock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
xzl committed
65 66 67
}

RtspSession::~RtspSession() {
68
    DebugP(this);
xzl committed
69 70
}

71 72 73
void RtspSession::onError(const SockException &err) {
    bool isPlayer = !_push_src;
    uint64_t duration = _alive_ticker.createdTime() / 1000;
74
    WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(")
75 76 77
                << _media_info._vhost << "/"
                << _media_info._app << "/"
                << _media_info._streamid
78 79
                << ")断开:" << err.what()
                << ",耗时(s):" << duration;
xiongziliang committed
80

81
    if (_rtp_type == Rtsp::RTP_MULTICAST) {
82 83 84
        //取消UDP端口监听
        UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
    }
xzl committed
85

86 87 88 89 90
    if (_http_x_sessioncookie.size() != 0) {
        //移除http getter的弱引用记录
        lock_guard<recursive_mutex> lock(g_mtxGetter);
        g_mapGetter.erase(_http_x_sessioncookie);
    }
91 92

    //流量统计事件广播
93
    GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
94 95
    if(_bytes_usage > iFlowThreshold * 1024){
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, isPlayer, static_cast<SockInfo &>(*this));
96
    }
97

xzl committed
98 99 100
}

void RtspSession::onManager() {
101
    GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
102 103
    GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);

104 105
    if (_alive_ticker.createdTime() > handshake_sec * 1000) {
        if (_sessionid.size() == 0) {
106 107 108 109
            shutdown(SockException(Err_timeout,"illegal connection"));
            return;
        }
    }
110

111
    if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000) {
112
        //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测
xiongziliang committed
113 114
        shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
        return;
115
    }
xzl committed
116 117
}

118 119 120 121
void RtspSession::onRecv(const Buffer::Ptr &buf) {
    _alive_ticker.resetTime();
    _bytes_usage += buf->size();
    if (_on_recv) {
122
        //http poster的请求数据转发给http getter处理
123
        _on_recv(buf);
124
    } else {
125
        input(buf->data(), buf->size());
126
    }
xiongziliang committed
127
}
xiongziliang committed
128

129 130 131 132
//字符串是否以xx结尾
static inline bool end_of(const string &str, const string &substr){
    auto pos = str.rfind(substr);
    return pos != string::npos && pos == str.size() - substr.size();
133
}
134

xiongziliang committed
135
void RtspSession::onWholeRtspPacket(Parser &parser) {
136 137 138 139 140 141
    string method = parser.Method(); //提取出请求命令字
    _cseq = atoi(parser["CSeq"].data());
    if(_content_base.empty() && method != "GET"){
        _content_base = parser.Url();
        _media_info.parse(parser.FullUrl());
        _media_info._schema = RTSP_SCHEMA;
142
    }
xzl committed
143

144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
    typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
    static unordered_map<string, rtsp_request_handler> s_cmd_functions;
    static onceToken token( []() {
        s_cmd_functions.emplace("OPTIONS",&RtspSession::handleReq_Options);
        s_cmd_functions.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
        s_cmd_functions.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
        s_cmd_functions.emplace("RECORD",&RtspSession::handleReq_RECORD);
        s_cmd_functions.emplace("SETUP",&RtspSession::handleReq_Setup);
        s_cmd_functions.emplace("PLAY",&RtspSession::handleReq_Play);
        s_cmd_functions.emplace("PAUSE",&RtspSession::handleReq_Pause);
        s_cmd_functions.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
        s_cmd_functions.emplace("GET",&RtspSession::handleReq_Get);
        s_cmd_functions.emplace("POST",&RtspSession::handleReq_Post);
        s_cmd_functions.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
        s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
    }, []() {});

161
    auto it = s_cmd_functions.find(method);
162
    if (it == s_cmd_functions.end()) {
163
        sendRtspResponse("403 Forbidden");
164
        shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << method));
165
        return;
166
    }
167 168 169 170 171 172 173 174 175 176 177 178

    auto &fun = it->second;
    try {
        (this->*fun)(parser);
    }catch (SockException &ex){
        if(ex){
            shutdown(ex);
        }
    }catch (exception &ex){
        shutdown(SockException(Err_shutdown,ex.what()));
    }
    parser.Clear();
xiongziliang committed
179 180
}

xiongziliang committed
181
void RtspSession::onRtpPacket(const char *data, uint64_t len) {
182
    if(!_push_src){
183 184 185 186 187
        return;
    }

    uint8_t interleaved = data[1];
    if(interleaved %2 == 0){
188 189
        auto track_idx = getTrackIndexByInterleaved(interleaved);
        handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (unsigned char *) data + 4, len - 4);
190
    }else{
191 192
        auto track_idx = getTrackIndexByInterleaved(interleaved - 1);
        onRtcpPacket(track_idx, _sdp_track[track_idx], (unsigned char *) data + 4, len - 4);
193
    }
xiongziliang committed
194 195
}

196
void RtspSession::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){}
197

xiongziliang committed
198
int64_t RtspSession::getContentLength(Parser &parser) {
199 200 201 202 203
    if(parser.Method() == "POST"){
        //http post请求的content数据部分是base64编码后的rtsp请求信令包
        return remainDataSize();
    }
    return RtspSplitter::getContentLength(parser);
xzl committed
204 205
}

206
void RtspSession::handleReq_Options(const Parser &parser) {
207 208
    //支持这些命令
    sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
xzl committed
209 210
}

211
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
212
    auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA,
213 214 215
                                                                       _media_info._vhost,
                                                                       _media_info._app,
                                                                       _media_info._streamid));
216 217
    if(src){
        sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
218 219
        string err = StrPrinter << "ANNOUNCE:"
                                << "Already publishing:"
220 221 222
                                << _media_info._vhost << " "
                                << _media_info._app << " "
                                << _media_info._streamid << endl;
223 224
        throw SockException(Err_shutdown,err);
    }
xiongziliang committed
225

226 227 228
    auto full_url = parser.FullUrl();
    if(end_of(full_url,".sdp")){
        //去除.sdp后缀,防止EasyDarwin推流器强制添加.sdp后缀
229
        full_url = full_url.substr(0,full_url.length() - 4);
230
        _media_info.parse(full_url);
231 232
    }

233
    if(_media_info._app.empty() || _media_info._streamid.empty()){
234 235 236 237 238
        //推流rtsp url必须最少两级(rtsp://host/app/stream_id),不允许莫名其妙的推流url
        sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url");
        throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url);
    }

239
    SdpParser sdpParser(parser.Content());
240 241
    _sessionid = makeRandStr(12);
    _sdp_track = sdpParser.getAvailableTrack();
xiongziliang committed
242

243 244 245
    _push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
    _push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
    _push_src->setSdp(sdpParser.toString());
246

247
    sendRtspResponse("200 OK",{"Content-Base", _content_base + "/"});
248 249
}

250
void RtspSession::handleReq_RECORD(const Parser &parser){
251
    if (_sdp_track.empty() || parser["Session"] != _sessionid) {
252
        send_SessionNotFound();
253
        throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any availabe track when record" : "session not found when record");
254
    }
255
    auto onRes = [this](const string &err, bool enableHls, bool enableMP4){
256 257 258 259 260 261
        bool authSuccess = err.empty();
        if(!authSuccess){
            sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
            shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
            return;
        }
262

263
        //设置转协议
264
        _push_src->setProtocolTranslation(enableHls, enableMP4);
265

266
        _StrPrinter rtp_info;
267
        for(auto &track : _sdp_track){
268 269
            if (track->_inited == false) {
                //还有track没有setup
xiongziliang committed
270 271
                shutdown(SockException(Err_shutdown,"track not setuped"));
                return;
272
            }
273
            rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ",";
274 275 276 277
        }

        rtp_info.pop_back();
        sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
278
        if(_rtp_type == Rtsp::RTP_TCP){
279
            //如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能
280
            getSock()->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
281
            setSocketFlags();
282 283 284 285
        }
    };

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
286
    Broadcast::PublishAuthInvoker invoker = [weakSelf, onRes](const string &err, bool enableHls, bool enableMP4) {
287
        auto strongSelf = weakSelf.lock();
288
        if (!strongSelf) {
289 290
            return;
        }
291
        strongSelf->async([weakSelf, onRes, err, enableHls, enableMP4]() {
292
            auto strongSelf = weakSelf.lock();
293
            if (!strongSelf) {
294 295
                return;
            }
296
            onRes(err, enableHls, enableMP4);
297 298 299 300
        });
    };

    //rtsp推流需要鉴权
301
    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
302 303
    if(!flag){
        //该事件无人监听,默认不鉴权
304 305
        GET_CONFIG(bool,toHls,General::kPublishToHls);
        GET_CONFIG(bool,toMP4,General::kPublishToMP4);
306
        onRes("",toHls,toMP4);
307
    }
308 309
}

xiongziliang committed
310
void RtspSession::emitOnPlay(){
311
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
xiongziliang committed
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
    //url鉴权回调
    auto onRes = [weakSelf](const string &err) {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            return;
        }
        if (!err.empty()) {
            //播放url鉴权失败
            strongSelf->sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
            strongSelf->shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err));
            return;
        }
        strongSelf->onAuthSuccess();
    };

    Broadcast::AuthInvoker invoker = [weakSelf, onRes](const string &err) {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            return;
        }
        strongSelf->async([onRes, err, weakSelf]() {
            onRes(err);
        });
    };

    //广播通用播放url鉴权事件
338
    auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this));
xiongziliang committed
339 340 341 342 343 344 345 346 347
    if (!flag) {
        //该事件无人监听,默认不鉴权
        onRes("");
    }
    //已经鉴权过了
    _emit_on_play = true;
}

void RtspSession::handleReq_Describe(const Parser &parser) {
348
    //该请求中的认证信息
349
    auto authorization = parser["Authorization"];
xiongziliang committed
350 351 352
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    //rtsp专属鉴权是否开启事件回调
    onGetRealm invoker = [weakSelf, authorization](const string &realm) {
353
        auto strongSelf = weakSelf.lock();
xiongziliang committed
354
        if (!strongSelf) {
355 356 357 358
            //本对象已经销毁
            return;
        }
        //切换到自己的线程然后执行
xiongziliang committed
359
        strongSelf->async([weakSelf, realm, authorization]() {
360
            auto strongSelf = weakSelf.lock();
xiongziliang committed
361
            if (!strongSelf) {
362 363 364
                //本对象已经销毁
                return;
            }
xiongziliang committed
365 366 367
            if (realm.empty()) {
                //无需rtsp专属认证, 那么继续url通用鉴权认证(on_play)
                strongSelf->emitOnPlay();
368 369
                return;
            }
xiongziliang committed
370 371 372
            //该流需要rtsp专属认证,开启rtsp专属认证后,将不再触发url通用鉴权认证(on_play)
            strongSelf->_rtsp_realm = realm;
            strongSelf->onAuthUser(realm, authorization);
373 374 375
        });
    };

xiongziliang committed
376 377
    if(_rtsp_realm.empty()){
        //广播是否需要rtsp专属认证事件
378
        if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _media_info, invoker, static_cast<SockInfo &>(*this))) {
xiongziliang committed
379 380 381 382 383
            //无人监听此事件,说明无需认证
            invoker("");
        }
    }else{
        invoker(_rtsp_realm);
384 385
    }
}
386

387 388
void RtspSession::onAuthSuccess() {
    TraceP(this);
389
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
390
    MediaSource::findAsync(_media_info, weakSelf.lock(), [weakSelf](const MediaSource::Ptr &src){
391 392 393 394 395 396 397
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
        if (!rtsp_src) {
            //未找到相应的MediaSource
398
            string err = StrPrinter << "no such stream:" << strongSelf->_media_info._vhost << " " << strongSelf->_media_info._app << " " << strongSelf->_media_info._streamid;
399
            strongSelf->send_StreamNotFound();
xiongziliang committed
400
            strongSelf->shutdown(SockException(Err_shutdown,err));
401 402
            return;
        }
3503207480@qq.com committed
403
        //找到了相应的rtsp流
404 405
        strongSelf->_sdp_track = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
        if (strongSelf->_sdp_track.empty()) {
406
            //该流无效
407
            DebugL << "无trackInfo,该流无效";
408
            strongSelf->send_StreamNotFound();
xiongziliang committed
409
            strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
410 411
            return;
        }
412 413 414
        strongSelf->_sessionid = makeRandStr(12);
        strongSelf->_play_src = rtsp_src;
        for(auto &track : strongSelf->_sdp_track){
415 416 417 418
            track->_ssrc = rtsp_src->getSsrc(track->_type);
            track->_seq = rtsp_src->getSeqence(track->_type);
            track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
        }
xiongziliang committed
419

420
        strongSelf->sendRtspResponse("200 OK",
421
                                     {"Content-Base", strongSelf->_content_base + "/",
422 423
                                      "x-Accept-Retransmit","our-retransmit",
                                      "x-Accept-Dynamic-Rate","1"
424
                                     },rtsp_src->getSdp());
425
    });
xzl committed
426
}
427

428 429 430 431
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
    GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
    if (!authBasic) {
        //我们需要客户端优先以md5方式认证
432
        _auth_nonce = makeRandStr(32);
433 434
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
435
                          StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _auth_nonce << "\"" });
436 437 438 439 440 441 442 443
    }else {
        //当然我们也支持base64认证,但是我们不建议这样做
        sendRtspResponse("401 Unauthorized",
                         {"WWW-Authenticate",
                          StrPrinter << "Basic realm=\"" << realm << "\"" });
    }
    if(close){
        shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << why));
444 445 446
    }
}

447
void RtspSession::onAuthBasic(const string &realm,const string &auth_base64){
448 449
    //base64认证
    char user_pwd_buf[512];
450 451 452
    av_base64_decode((uint8_t *) user_pwd_buf, auth_base64.data(), auth_base64.size());
    auto user_pwd_vec = split(user_pwd_buf, ":");
    if (user_pwd_vec.size() < 2) {
453
        //认证信息格式不合法,回复401 Unauthorized
454
        onAuthFailed(realm, "can not find user and passwd when basic64 auth");
455 456 457 458
        return;
    }
    auto user = user_pwd_vec[0];
    auto pwd = user_pwd_vec[1];
459
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
460
    onAuth invoker = [pwd, realm, weakSelf](bool encrypted, const string &good_pwd) {
461
        auto strongSelf = weakSelf.lock();
462
        if (!strongSelf) {
463 464
            //本对象已经销毁
            return;
465
        }
466
        //切换到自己的线程执行
467
        strongSelf->async([weakSelf, good_pwd, pwd, realm]() {
468
            auto strongSelf = weakSelf.lock();
469
            if (!strongSelf) {
470 471 472 473
                //本对象已经销毁
                return;
            }
            //base64忽略encrypted参数,上层必须传入明文密码
474
            if (pwd == good_pwd) {
475 476 477 478 479
                //提供的密码且匹配正确
                strongSelf->onAuthSuccess();
                return;
            }
            //密码错误
480
            strongSelf->onAuthFailed(realm, StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd);
481
        });
482
    };
483

484
    //此时必须提供明文密码
485
    if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, user, true, invoker, static_cast<SockInfo &>(*this))) {
486
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
487
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
488 489
        //但是我们还是忽略认证以便完成播放
        //我们输入的密码是明文
490
        invoker(false, pwd);
491 492
    }
}
493

494 495 496
void RtspSession::onAuthDigest(const string &realm,const string &auth_md5){
    DebugP(this) << auth_md5;
    auto mapTmp = Parser::parseArgs(auth_md5, ",", "=");
497 498
    decltype(mapTmp) map;
    for(auto &pr : mapTmp){
499
        map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
500 501 502
    }
    //check realm
    if(realm != map["realm"]){
503
        onAuthFailed(realm,StrPrinter << "realm not mached:" << realm << " != " << map["realm"]);
504 505 506 507
        return ;
    }
    //check nonce
    auto nonce = map["nonce"];
508 509
    if(_auth_nonce != nonce){
        onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _auth_nonce);
510 511 512 513 514 515 516
        return ;
    }
    //check username and uri
    auto username = map["username"];
    auto uri = map["uri"];
    auto response = map["response"];
    if(username.empty() || uri.empty() || response.empty()){
517
        onAuthFailed(realm,StrPrinter << "username/uri/response empty:" << username << "," << uri << "," << response);
518 519 520
        return ;
    }

521
    auto realInvoker = [this,realm,nonce,uri,username,response](bool ignoreAuth,bool encrypted,const string &good_pwd){
522 523
        if(ignoreAuth){
            //忽略认证
524 525
            TraceP(this) << "auth ignored";
            onAuthSuccess();
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
            return;
        }
        /*
        response计算方法如下:
        RTSP客户端应该使用username + password并计算response如下:
        (1)当password为MD5编码,则
            response = md5( password:nonce:md5(public_method:url)  );
        (2)当password为ANSI字符串,则
            response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
         */
        auto encrypted_pwd = good_pwd;
        if(!encrypted){
            //提供的是明文密码
            encrypted_pwd = MD5(username+ ":" + realm + ":" + good_pwd).hexdigest();
        }

        auto good_response = MD5( encrypted_pwd + ":" + nonce + ":" + MD5(string("DESCRIBE") + ":" + uri).hexdigest()).hexdigest();
xiongziliang committed
543
        if(strcasecmp(good_response.data(),response.data()) == 0){
544
            //认证成功!md5不区分大小写
545
            onAuthSuccess();
546 547
        }else{
            //认证失败!
548
            onAuthFailed(realm, StrPrinter << "password mismatch when md5 auth:" << good_response << " != " << response );
549 550
        }
    };
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565

    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
    onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
        //切换到自己的线程确保realInvoker执行时,this指针有效
        strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
            realInvoker(false,encrypted,good_pwd);
        });
566 567 568
    };

    //此时可以提供明文或md5加密的密码
569
    if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, username, false, invoker, static_cast<SockInfo &>(*this))){
570
        //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
571
        WarnP(this) << "请监听kBroadcastOnRtspAuth事件!";
572 573 574 575 576
        //但是我们还是忽略认证以便完成播放
        realInvoker(true,true,"");
    }
}

577 578 579 580 581
void RtspSession::onAuthUser(const string &realm,const string &authorization){
    if(authorization.empty()){
        onAuthFailed(realm,"", false);
        return;
    }
582 583
    //请求中包含认证信息
    auto authType = FindField(authorization.data(),NULL," ");
584
    auto authStr = FindField(authorization.data()," ",NULL);
585 586
    if(authType.empty() || authStr.empty()){
        //认证信息格式不合法,回复401 Unauthorized
587
        onAuthFailed(realm,"can not find auth type or auth string");
588 589 590 591
        return;
    }
    if(authType == "Basic"){
        //base64认证,需要明文密码
592
        onAuthBasic(realm,authStr);
593 594
    }else if(authType == "Digest"){
        //md5认证
595
        onAuthDigest(realm,authStr);
596 597
    }else{
        //其他认证方式?不支持!
598
        onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
599 600
    }
}
601

xzl committed
602
inline void RtspSession::send_StreamNotFound() {
603
    sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
xzl committed
604
}
605

xzl committed
606
inline void RtspSession::send_UnsupportedTransport() {
607
    sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
xzl committed
608 609 610
}

inline void RtspSession::send_SessionNotFound() {
611
    sendRtspResponse("454 Session Not Found",{"Connection","Close"});
xzl committed
612
}
613 614

void RtspSession::handleReq_Setup(const Parser &parser) {
615 616
    //处理setup命令,该函数可能进入多次
    auto controlSuffix = split(parser.FullUrl(),"/").back();
xiongziliang committed
617
    if(controlSuffix.front() == '/'){
618
        controlSuffix = controlSuffix.substr(1);
xiongziliang committed
619
    }
620
    int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
621
    SdpTrack::Ptr &trackRef = _sdp_track[trackIdx];
622 623
    if (trackRef->_inited) {
        //已经初始化过该Track
624
        throw SockException(Err_shutdown, "can not setup one track twice");
625 626 627
    }
    trackRef->_inited = true; //现在初始化

628
    if(_rtp_type == Rtsp::RTP_Invalid){
629 630
        auto &strTransport = parser["Transport"];
        if(strTransport.find("TCP") != string::npos){
631
            _rtp_type = Rtsp::RTP_TCP;
632
        }else if(strTransport.find("multicast") != string::npos){
633
            _rtp_type = Rtsp::RTP_MULTICAST;
634
        }else{
635
            _rtp_type = Rtsp::RTP_UDP;
636 637
        }
    }
xzl committed
638

639
    //允许接收rtp、rtcp包
640
    RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP);
xiongziliang committed
641

642
    switch (_rtp_type) {
643
    case Rtsp::RTP_TCP: {
644
        if(_push_src){
645 646 647 648 649 650 651 652 653 654 655 656
            //rtsp推流时,interleaved由推流者决定
            auto key_values =  Parser::parseArgs(parser["Transport"],";","=");
            int interleaved_rtp = -1 , interleaved_rtcp = -1;
            if(2 == sscanf(key_values["interleaved"].data(),"%d-%d",&interleaved_rtp,&interleaved_rtcp)){
                trackRef->_interleaved = interleaved_rtp;
            }else{
                throw SockException(Err_shutdown, "can not find interleaved when setup of rtp over tcp");
            }
        }else{
            //rtsp播放时,由于数据共享分发,所以interleaved必须由服务器决定
            trackRef->_interleaved = 2 * trackRef->_type;
        }
657
        sendRtspResponse("200 OK",
658 659 660 661 662 663
                         {"Transport", StrPrinter << "RTP/AVP/TCP;unicast;"
                                                  << "interleaved=" << (int) trackRef->_interleaved << "-"
                                                  << (int) trackRef->_interleaved + 1 << ";"
                                                  << "ssrc=" << printSSRC(trackRef->_ssrc),
                          "x-Transport-Options", "late-tolerance=1.400000",
                          "x-Dynamic-Rate", "1"
664 665 666
                         });
    }
        break;
667

668
    case Rtsp::RTP_UDP: {
669 670 671 672
        std::pair<Socket::Ptr, Socket::Ptr> pr = std::make_pair(createSocket(),createSocket());
        try {
            makeSockPair(pr, get_local_ip());
        } catch (std::exception &ex) {
673 674
            //分配端口失败
            send_NotAcceptable();
675
            throw SockException(Err_shutdown, ex.what());
676
        }
677

678 679
        _rtp_socks[trackIdx] = pr.first;
        _rtcp_socks[trackIdx] = pr.second;
680

681 682
        //设置客户端内网端口信息
        string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
683 684
        uint16_t ui16RtpPort = atoi(FindField(strClientPort.data(), NULL, "-").data());
        uint16_t ui16RtcpPort = atoi(FindField(strClientPort.data(), "-", NULL).data());
685 686 687

        struct sockaddr_in peerAddr;
        //设置rtp发送目标地址
688 689 690 691
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
692
        pr.first->setSendPeerAddr((struct sockaddr *) (&peerAddr));
693

694
        //设置rtcp发送目标地址
695 696 697 698
        peerAddr.sin_family = AF_INET;
        peerAddr.sin_port = htons(ui16RtcpPort);
        peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
        bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
699
        pr.second->setSendPeerAddr((struct sockaddr *) (&peerAddr));
700

701 702 703 704 705
        //尝试获取客户端nat映射地址
        startListenPeerUdpData(trackIdx);
        //InfoP(this) << "分配端口:" << srv_port;

        sendRtspResponse("200 OK",
706 707
                         {"Transport", StrPrinter << "RTP/AVP/UDP;unicast;"
                                                  << "client_port=" << strClientPort << ";"
708 709
                                                  << "server_port=" << pr.first->get_local_port() << "-"
                                                  << pr.second->get_local_port() << ";"
710
                                                  << "ssrc=" << printSSRC(trackRef->_ssrc)
711 712 713 714 715
                         });
    }
        break;
    case Rtsp::RTP_MULTICAST: {
        if(!_multicaster){
716
            _multicaster = RtpMultiCaster::get(*this, get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid);
717 718
            if (!_multicaster) {
                send_NotAcceptable();
719 720
                throw SockException(Err_shutdown, "can not get a available udp multicast socket");
            }
721 722 723 724 725 726 727 728 729
            weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
            _multicaster->setDetachCB(this, [weakSelf]() {
                auto strongSelf = weakSelf.lock();
                if(!strongSelf) {
                    return;
                }
                strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
            });
        }
730
        int iSrvPort = _multicaster->getMultiCasterPort(trackRef->_type);
731 732
        //我们用trackIdx区分rtp和rtcp包
        //由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口
733
        auto pSockRtcp = UDPServer::Instance().getSock(*this, get_local_ip().data(), 2 * trackIdx + 1, iSrvPort + 1);
734 735 736
        if (!pSockRtcp) {
            //分配端口失败
            send_NotAcceptable();
737
            throw SockException(Err_shutdown, "open shared rtcp socket failed");
738 739
        }
        startListenPeerUdpData(trackIdx);
740
        GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
741

742
        sendRtspResponse("200 OK",
743
                         {"Transport", StrPrinter << "RTP/AVP;multicast;"
744
                                                  << "destination=" << _multicaster->getMultiCasterIP() << ";"
745 746 747 748
                                                  << "source=" << get_local_ip() << ";"
                                                  << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
                                                  << "ttl=" << udpTTL << ";"
                                                  << "ssrc=" << printSSRC(trackRef->_ssrc)
749 750 751 752 753 754
                         });
    }
        break;
    default:
        break;
    }
xzl committed
755 756
}

757
void RtspSession::handleReq_Play(const Parser &parser) {
758
    if (_sdp_track.empty() || parser["Session"] != _sessionid) {
759
        send_SessionNotFound();
760
        throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any available track when play" : "session not found when play");
761
    }
762 763
    auto play_src = _play_src.lock();
    if(!play_src){
xiongziliang committed
764 765 766 767 768
        send_StreamNotFound();
        shutdown(SockException(Err_shutdown,"rtsp stream released"));
        return;
    }

769 770
    bool useGOP = true;
    _enable_send_rtp = false;
xiongziliang committed
771
    float iStartTime = 0;
772
    auto strRange = parser["Range"];
xiongziliang committed
773 774 775 776 777
    if (strRange.size()) {
        //这个是seek操作
        auto strStart = FindField(strRange.data(), "npt=", "-");
        if (strStart == "now") {
            strStart = "0";
778
        }
xiongziliang committed
779 780
        iStartTime = 1000 * atof(strStart.data());
        InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
781 782
        useGOP = !play_src->seekTo(iStartTime);
    } else if (play_src->totalReaderCount() == 0) {
xiongziliang committed
783
        //第一个消费者
784
        play_src->seekTo(0);
xiongziliang committed
785 786 787
    }

    _StrPrinter rtp_info;
788
    for (auto &track : _sdp_track) {
xiongziliang committed
789 790 791
        if (track->_inited == false) {
            //还有track没有setup
            shutdown(SockException(Err_shutdown, "track not setuped"));
792
            return;
793
        }
794 795 796
        track->_ssrc = play_src->getSsrc(track->_type);
        track->_seq = play_src->getSeqence(track->_type);
        track->_time_stamp = play_src->getTimeStamp(track->_type);
797

798
        rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ";"
xiongziliang committed
799 800 801
                 << "seq=" << track->_seq << ";"
                 << "rtptime=" << (int) (track->_time_stamp * (track->_samplerate / 1000)) << ",";
    }
802

xiongziliang committed
803 804
    rtp_info.pop_back();
    sendRtspResponse("200 OK",
805
                     {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useGOP ? play_src->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000),
xiongziliang committed
806 807
                      "RTP-Info",rtp_info
                     });
808

809
    _enable_send_rtp = true;
xiongziliang committed
810
    setSocketFlags();
xiongziliang committed
811

812
    if (!_play_reader && _rtp_type != Rtsp::RTP_MULTICAST) {
xiongziliang committed
813
        weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
814 815
        _play_reader = play_src->getRing()->attach(getPoller(), useGOP);
        _play_reader->setDetachCB([weakSelf]() {
816
            auto strongSelf = weakSelf.lock();
xiongziliang committed
817
            if (!strongSelf) {
818 819
                return;
            }
xiongziliang committed
820 821
            strongSelf->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
        });
822
        _play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
xiongziliang committed
823 824 825 826
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
            }
827
            if (strongSelf->_enable_send_rtp) {
xiongziliang committed
828 829
                strongSelf->sendRtpPacket(pack);
            }
830 831
        });
    }
xzl committed
832 833
}

834
void RtspSession::handleReq_Pause(const Parser &parser) {
835
    if (parser["Session"] != _sessionid) {
836
        send_SessionNotFound();
837 838
        throw SockException(Err_shutdown,"session not found when pause");
    }
839

840
    sendRtspResponse("200 OK");
841
    _enable_send_rtp = false;
xzl committed
842 843
}

844
void RtspSession::handleReq_Teardown(const Parser &parser) {
845
    sendRtspResponse("200 OK");
846
    throw SockException(Err_shutdown,"rtsp player send teardown request");
xzl committed
847 848
}

849
void RtspSession::handleReq_Get(const Parser &parser) {
850 851 852 853 854 855 856 857 858 859
    _http_x_sessioncookie = parser["x-sessioncookie"];
    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");

    //注册http getter,以便http poster绑定
    lock_guard<recursive_mutex> lock(g_mtxGetter);
    g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
xzl committed
860 861
}

862
void RtspSession::handleReq_Post(const Parser &parser) {
863 864 865 866 867
    lock_guard<recursive_mutex> lock(g_mtxGetter);
    string sessioncookie = parser["x-sessioncookie"];
    //Poster 找到 Getter
    auto it = g_mapGetter.find(sessioncookie);
    if (it == g_mapGetter.end()) {
868 869
        throw SockException(Err_shutdown,"can not find http getter by x-sessioncookie");
    }
870

871 872 873 874 875 876
    //Poster 找到Getter的SOCK
    auto httpGetterWeak = it->second;
    //移除http getter的弱引用记录
    g_mapGetter.erase(sessioncookie);

    //http poster收到请求后转发给http getter处理
877
    _on_recv = [this,httpGetterWeak](const Buffer::Ptr &buf){
878 879 880 881 882 883 884
        auto httpGetterStrong = httpGetterWeak.lock();
        if(!httpGetterStrong){
            shutdown(SockException(Err_shutdown,"http getter released"));
            return;
        }

        //切换到http getter的线程
885
        httpGetterStrong->async([buf,httpGetterWeak](){
886 887 888 889
            auto httpGetterStrong = httpGetterWeak.lock();
            if(!httpGetterStrong){
                return;
            }
890
            httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(buf->data(), buf->size()))));
891 892 893 894 895
        });
    };

    if(!parser.Content().empty()){
        //http poster后面的粘包
896
        _on_recv(std::make_shared<BufferString>(parser.Content()));
897
    }
898 899 900 901 902 903

    sendRtspResponse("200 OK",
                     {"Cache-Control","no-store",
                      "Pragma","no-store",
                      "Content-Type","application/x-rtsp-tunnelled",
                     },"","HTTP/1.0");
xzl committed
904 905
}

906
void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
907 908
    //TraceP(this) <<endl;
    sendRtspResponse("200 OK");
xzl committed
909 910 911
}

inline void RtspSession::send_NotAcceptable() {
912
    sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
xzl committed
913
}
914

915 916 917 918 919 920 921
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) {
    if (_start_stamp[track_idx] == -1) {
        //记录起始时间戳
        _start_stamp[track_idx] = rtp->timeStamp;
    }
    //时间戳增量
    rtp->timeStamp -= _start_stamp[track_idx];
922
    _push_src->onWrite(rtp, false);
xiongziliang committed
923
}
924 925

inline void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr) {
926
    //这是rtcp心跳包,说明播放器还存活
927
    _alive_ticker.resetTime();
928

929 930
    if (interleaved % 2 == 0) {
        if (_push_src) {
931
            //这是rtsp推流上来的rtp包
932 933 934
            auto &ref = _sdp_track[interleaved / 2];
            handleOneRtp(interleaved / 2, ref->_type, ref->_samplerate, (unsigned char *) buf->data(), buf->size());
        } else if (!_udp_connected_flags.count(interleaved)) {
935
            //这是rtsp播放器的rtp打洞包
936 937
            _udp_connected_flags.emplace(interleaved);
            _rtp_socks[interleaved / 2]->setSendPeerAddr(&addr);
938
        }
939
    } else {
940
        //rtcp包
941 942 943
        if (!_udp_connected_flags.count(interleaved)) {
            _udp_connected_flags.emplace(interleaved);
            _rtcp_socks[(interleaved - 1) / 2]->setSendPeerAddr(&addr);
944
        }
945
        onRtcpPacket((interleaved - 1) / 2, _sdp_track[(interleaved - 1) / 2], (unsigned char *) buf->data(), buf->size());
946
    }
xzl committed
947 948
}

949
inline void RtspSession::startListenPeerUdpData(int track_idx) {
950
    weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
951
    auto srcIP = inet_addr(get_peer_ip().data());
952 953 954
    auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &buf, struct sockaddr *peer_addr, int interleaved){
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
955 956
            return false;
        }
957

958 959 960
        if (((struct sockaddr_in *) peer_addr)->sin_addr.s_addr != srcIP) {
            WarnP(strongSelf.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
                                    << SockUtil::inet_ntoa(((struct sockaddr_in *) peer_addr)->sin_addr);
961 962 963
            return true;
        }

964 965 966 967
        struct sockaddr addr = *peer_addr;
        strongSelf->async([weakSelf, buf, addr, interleaved]() {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
968 969
                return;
            }
970
            strongSelf->onRcvPeerUdpData(interleaved, buf, addr);
971 972 973 974
        });
        return true;
    };

975
    switch (_rtp_type){
976 977
        case Rtsp::RTP_MULTICAST:{
            //组播使用的共享rtcp端口
978 979 980
            UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
                    [onUdpData]( int interleaved, const Buffer::Ptr &buf, struct sockaddr *peer_addr) {
                return onUdpData(buf, peer_addr, interleaved);
981 982 983 984
            });
        }
            break;
        case Rtsp::RTP_UDP:{
985
            auto setEvent = [&](Socket::Ptr &sock,int interleaved){
986
                if(!sock){
987
                    WarnP(this) << "udp端口为空:" << interleaved;
988 989
                    return;
                }
990 991
                sock->setOnRead([onUdpData,interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
                    onUdpData(pBuf, pPeerAddr, interleaved);
992 993
                });
            };
994 995
            setEvent(_rtp_socks[track_idx], 2 * track_idx );
            setEvent(_rtcp_socks[track_idx], 2 * track_idx + 1 );
996 997 998 999 1000 1001
        }
            break;

        default:
            break;
    }
1002

xzl committed
1003 1004
}

1005
static string dateStr(){
1006 1007 1008 1009
    char buf[64];
    time_t tt = time(NULL);
    strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
    return buf;
1010 1011
}

1012
bool RtspSession::sendRtspResponse(const string &res_code, const StrCaseMap &header_const, const string &sdp, const char *protocol){
1013
    auto header = header_const;
1014 1015 1016
    header.emplace("CSeq",StrPrinter << _cseq);
    if(!_sessionid.empty()){
        header.emplace("Session", _sessionid);
1017 1018
    }

xiongziliang committed
1019
    header.emplace("Server",SERVER_NAME);
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
    header.emplace("Date",dateStr());

    if(!sdp.empty()){
        header.emplace("Content-Length",StrPrinter << sdp.size());
        header.emplace("Content-Type","application/sdp");
    }

    _StrPrinter printer;
    printer << protocol << " " << res_code << "\r\n";
    for (auto &pr : header){
        printer << pr.first << ": " << pr.second << "\r\n";
    }

    printer << "\r\n";

    if(!sdp.empty()){
        printer << sdp;
    }
1038
//	DebugP(this) << printer;
1039
    return send(std::make_shared<BufferString>(printer)) > 0 ;
1040 1041 1042
}

int RtspSession::send(const Buffer::Ptr &pkt){
xiongziliang committed
1043
//	if(!_enableSendRtp){
1044
//		DebugP(this) << pkt->data();
xiongziliang committed
1045
//	}
1046
    _bytes_usage += pkt->size();
1047
    return TcpSession::send(pkt);
1048 1049
}

1050
bool RtspSession::sendRtspResponse(const string &res_code, const std::initializer_list<string> &header, const string &sdp, const char *protocol) {
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
    string key;
    StrCaseMap header_map;
    int i = 0;
    for(auto &val : header){
        if(++i % 2 == 0){
            header_map.emplace(key,val);
        }else{
            key = val;
        }
    }
    return sendRtspResponse(res_code,header_map,sdp,protocol);
1062 1063 1064
}

inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
1065 1066
    for (unsigned int i = 0; i < _sdp_track.size(); i++) {
        if (type == _sdp_track[i]->_type) {
1067 1068 1069
            return i;
        }
    }
1070
    if(_sdp_track.size() == 1){
xiongziliang committed
1071 1072
        return 0;
    }
1073
    throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type);
1074
}
1075

1076
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
1077 1078
    for (unsigned int i = 0; i < _sdp_track.size(); i++) {
        if (controlSuffix == _sdp_track[i]->_control_surffix) {
1079 1080 1081
            return i;
        }
    }
1082
    if(_sdp_track.size() == 1){
xiongziliang committed
1083
        return 0;
1084
    }
1085
    throw SockException(Err_shutdown, StrPrinter << "no such track with suffix:" << controlSuffix);
1086 1087
}

xiongziliang committed
1088
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
1089 1090
    for (unsigned int i = 0; i < _sdp_track.size(); i++) {
        if (_sdp_track[i]->_interleaved == interleaved) {
1091 1092 1093
            return i;
        }
    }
1094
    if(_sdp_track.size() == 1){
xiongziliang committed
1095 1096
        return 0;
    }
1097
    throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
xiongziliang committed
1098 1099
}

1100
bool RtspSession::close(MediaSource &sender, bool force) {
1101
    //此回调在其他线程触发
1102
    if(!_push_src || (!force && _push_src->totalReaderCount())){
1103 1104
        return false;
    }
1105 1106 1107
    string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
    safeShutdown(SockException(Err_shutdown,err));
    return true;
1108
}
1109

1110
int RtspSession::totalReaderCount(MediaSource &sender) {
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124
    return _push_src ? _push_src->totalReaderCount() : sender.readerCount();
}

inline void RtspSession::onSendRtpPacket(const RtpPacket::Ptr &pkt){
#if RTSP_SERVER_SEND_RTCP
    int track_index = getTrackIndexByTrackType(pkt->type);
    RtcpCounter &counter = _rtcp_counter[track_index];
    counter.pktCnt += 1;
    counter.octCount += (pkt->size() - pkt->offset);
    auto &ticker = _rtcp_send_tickers[track_index];
    if (ticker.elapsedTime() > 5 * 1000) {
        //send rtcp every 5 second
        ticker.resetTime();
        //直接保存网络字节序
xiongziliang committed
1125
        memcpy(&counter.time_stamp, pkt->data() + 8, 4);
1126 1127 1128
        sendSenderReport(_rtp_type == Rtsp::RTP_TCP, track_index);
    }
#endif
1129
}
1130

1131
void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
1132
    switch (_rtp_type) {
1133
        case Rtsp::RTP_TCP: {
1134 1135 1136 1137
            int i = 0;
            int size = pkt->size();
            setSendFlushFlag(false);
            pkt->for_each([&](const RtpPacket::Ptr &rtp) {
1138
                onSendRtpPacket(rtp);
1139 1140 1141 1142 1143
                if (++i == size) {
                    setSendFlushFlag(true);
                }
                send(rtp);
            });
1144 1145 1146
        }
            break;
        case Rtsp::RTP_UDP: {
1147 1148 1149
            int i = 0;
            int size = pkt->size();
            pkt->for_each([&](const RtpPacket::Ptr &rtp) {
1150 1151 1152
                onSendRtpPacket(rtp);
                int track_index = getTrackIndexByTrackType(rtp->type);
                auto &pSock = _rtp_socks[track_index];
1153 1154 1155 1156 1157
                if (!pSock) {
                    shutdown(SockException(Err_shutdown, "udp sock not opened yet"));
                    return;
                }
                BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
1158
                _bytes_usage += buffer->size();
1159 1160
                pSock->send(buffer, nullptr, 0, ++i == size);
            });
1161 1162 1163 1164 1165 1166 1167
        }
            break;
        default:
            break;
    }
}

1168
void RtspSession::sendSenderReport(bool over_tcp, int track_index) {
xiongziliang committed
1169
    static const char s_cname[] = "ZLMediaKitRtsp";
1170 1171 1172 1173
    uint8_t rtcp_buf[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
    uint8_t *rtcp_sr = rtcp_buf + 4, *rtcp_sdes = rtcp_sr + 28;
    auto &track = _sdp_track[track_index];
    auto &counter = _rtcp_counter[track_index];
1174

1175 1176 1177 1178
    rtcp_buf[0] = '$';
    rtcp_buf[1] = track->_interleaved + 1;
    rtcp_buf[2] = (sizeof(rtcp_buf) - 4) >> 8;
    rtcp_buf[3] = (sizeof(rtcp_buf) - 4) & 0xFF;
1179

1180 1181 1182 1183
    rtcp_sr[0] = 0x80;
    rtcp_sr[1] = 0xC8;
    rtcp_sr[2] = 0x00;
    rtcp_sr[3] = 0x06;
1184

1185 1186
    uint32_t ssrc = htonl(track->_ssrc);
    memcpy(&rtcp_sr[4], &ssrc, 4);
1187 1188 1189 1190 1191 1192 1193 1194 1195

    uint64_t msw;
    uint64_t lsw;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
    lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);

    msw = htonl(msw);
1196
    memcpy(&rtcp_sr[8], &msw, 4);
1197 1198

    lsw = htonl(lsw);
1199
    memcpy(&rtcp_sr[12], &lsw, 4);
1200
    //直接使用网络字节序
1201
    memcpy(&rtcp_sr[16], &counter.timeStamp, 4);
1202 1203

    uint32_t pktCnt = htonl(counter.pktCnt);
1204
    memcpy(&rtcp_sr[20], &pktCnt, 4);
1205 1206

    uint32_t octCount = htonl(counter.octCount);
1207
    memcpy(&rtcp_sr[24], &octCount, 4);
1208

1209 1210 1211 1212
    rtcp_sdes[0] = 0x81;
    rtcp_sdes[1] = 0xCA;
    rtcp_sdes[2] = 0x00;
    rtcp_sdes[3] = 0x06;
1213

1214
    memcpy(&rtcp_sdes[4], &ssrc, 4);
1215

1216 1217 1218 1219
    rtcp_sdes[8] = 0x01;
    rtcp_sdes[9] = 0x0f;
    memcpy(&rtcp_sdes[10], s_cname, sizeof(s_cname));
    rtcp_sdes[10 + sizeof(s_cname)] = 0x00;
1220

1221 1222 1223 1224
    if (over_tcp) {
        send(obtainBuffer((char *) rtcp_buf, sizeof(rtcp_buf)));
    } else {
        _rtcp_socks[track_index]->send((char *) rtcp_buf + 4, sizeof(rtcp_buf) - 4);
1225
    }
xzl committed
1226 1227
}

1228
void RtspSession::setSocketFlags(){
1229 1230
    GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
    if(mergeWriteMS > 0) {
1231
        //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
1232
        SockUtil::setNoDelay(getSock()->rawFD(), false);
1233
        //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
xiongziliang committed
1234
        setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
1235 1236 1237
    }
}

xzl committed
1238
}
xiongziliang committed
1239
/* namespace mediakit */