RtmpSession.cpp 20.4 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9 10 11
 */

#include "RtmpSession.h"
12
#include "Common/config.h"
xzl committed
13
#include "Util/onceToken.h"
xiongziliang committed
14
namespace mediakit {
xzl committed
15

xiongziliang committed
16
RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
17
    DebugP(this);
18 19
    GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);
    pSock->setSendTimeOutSecond(keep_alive_sec);
20 21
    //起始接收buffer缓存设置为4K,节省内存
    pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
xzl committed
22 23 24
}

RtmpSession::~RtmpSession() {
25
    DebugP(this);
xzl committed
26 27 28
}

void RtmpSession::onError(const SockException& err) {
xiongziliang committed
29
    bool isPlayer = !_pPublisherSrc;
30
    uint64_t duration = _ticker.createdTime()/1000;
31
    WarnP(this) << (isPlayer ? "RTMP播放器(" : "RTMP推流器(")
xiongziliang committed
32 33 34
                << _mediaInfo._vhost << "/"
                << _mediaInfo._app << "/"
                << _mediaInfo._streamid
35 36
                << ")断开:" << err.what()
                << ",耗时(s):" << duration;
37 38

    //流量统计事件广播
39
    GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
40

41
    if(_ui64TotalBytes > iFlowThreshold * 1024){
42
        NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast<SockInfo &>(*this));
43
    }
xzl committed
44 45 46
}

void RtmpSession::onManager() {
47
    GET_CONFIG(uint32_t,handshake_sec,Rtmp::kHandshakeSecond);
48 49
    GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);

50 51 52 53 54 55 56 57 58 59 60
    if (_ticker.createdTime() > handshake_sec * 1000) {
        if (!_pRingReader && !_pPublisherSrc) {
            shutdown(SockException(Err_timeout,"illegal connection"));
        }
    }
    if (_pPublisherSrc) {
        //publisher
        if (_ticker.elapsedTime() > keep_alive_sec * 1000) {
            shutdown(SockException(Err_timeout,"recv data from rtmp pusher timeout"));
        }
    }
xzl committed
61 62
}

63
void RtmpSession::onRecv(const Buffer::Ptr &pBuf) {
64 65
    _ticker.resetTime();
    try {
66
        _ui64TotalBytes += pBuf->size();
67 68 69 70
        onParseRtmp(pBuf->data(), pBuf->size());
    } catch (exception &e) {
        shutdown(SockException(Err_shutdown, e.what()));
    }
xzl committed
71 72 73
}

void RtmpSession::onCmd_connect(AMFDecoder &dec) {
74 75 76 77 78 79 80 81 82 83 84 85
    auto params = dec.load<AMFValue>();
    double amfVer = 0;
    AMFValue objectEncoding = params["objectEncoding"];
    if(objectEncoding){
        amfVer = objectEncoding.as_number();
    }
    ///////////set chunk size////////////////
    sendChunkSize(60000);
    ////////////window Acknowledgement size/////
    sendAcknowledgementSize(5000000);
    ///////////set peerBandwidth////////////////
    sendPeerBandwidth(5000000);
xiongziliang committed
86

87 88 89
    _mediaInfo._app = params["app"].as_string();
    _strTcUrl = params["tcUrl"].as_string();
    if(_strTcUrl.empty()){
90
        //defaultVhost:默认vhost
91
        _strTcUrl = string(RTMP_SCHEMA) + "://" + DEFAULT_VHOST + "/" + _mediaInfo._app;
92
    }
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    bool ok = true; //(app == APP_NAME);
    AMFValue version(AMF_OBJECT);
    version.set("fmsVer", "FMS/3,0,1,123");
    version.set("capabilities", 31.0);
    AMFValue status(AMF_OBJECT);
    status.set("level", ok ? "status" : "error");
    status.set("code", ok ? "NetConnection.Connect.Success" : "NetConnection.Connect.InvalidApp");
    status.set("description", ok ? "Connection succeeded." : "InvalidApp.");
    status.set("objectEncoding", amfVer);
    sendReply(ok ? "_result" : "_error", version, status);
    if (!ok) {
        throw std::runtime_error("Unsupported application: " + _mediaInfo._app);
    }

    AMFEncoder invoke;
    invoke << "onBWDone" << 0.0 << nullptr;
    sendResponse(MSG_CMD, invoke.data());
xzl committed
110 111 112
}

void RtmpSession::onCmd_createStream(AMFDecoder &dec) {
113
    sendReply("_result", nullptr, double(STREAM_MEDIA));
xzl committed
114 115 116
}

void RtmpSession::onCmd_publish(AMFDecoder &dec) {
xiongziliang committed
117
    std::shared_ptr<Ticker> pTicker(new Ticker);
118 119 120 121 122 123
    weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
    std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker,weakSelf](){
        auto strongSelf = weakSelf.lock();
        if(strongSelf){
            DebugP(strongSelf.get()) << "publish 回复时间:" << pTicker->elapsedTime() << "ms";
        }
xiongziliang committed
124
    }));
125
    dec.load<AMFValue>();/* NULL */
126
    _mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load<std::string>()));
127
    _mediaInfo._schema = RTMP_SCHEMA;
128

129
    auto onRes = [this,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
130
        auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
131 132
                                                                           _mediaInfo._vhost,
                                                                           _mediaInfo._app,
133
                                                                           _mediaInfo._streamid));
134
        bool authSuccess = err.empty();
135
        bool ok = (!src && !_pPublisherSrc && authSuccess);
136 137 138
        AMFValue status(AMF_OBJECT);
        status.set("level", ok ? "status" : "error");
        status.set("code", ok ? "NetStream.Publish.Start" : (authSuccess ? "NetStream.Publish.BadName" : "NetStream.Publish.BadAuth"));
139
        status.set("description", ok ? "Started publishing stream." : (authSuccess ? "Already publishing." : err.data()));
140 141 142
        status.set("clientid", "0");
        sendReply("onStatus", nullptr, status);
        if (!ok) {
xiongziliang committed
143 144 145 146 147
            string errMsg = StrPrinter << (authSuccess ? "already publishing:" : err.data()) << " "
                                    << _mediaInfo._vhost << " "
                                    << _mediaInfo._app << " "
                                    << _mediaInfo._streamid;
            shutdown(SockException(Err_shutdown,errMsg));
148 149
            return;
        }
150
        _pPublisherSrc.reset(new RtmpMediaSourceImp(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid));
151
        _pPublisherSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
152 153 154
        //设置转协议
        _pPublisherSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4);

155 156
        //如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能
        _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
157
        setSocketFlags();
158 159
    };

160 161 162 163 164 165
    if(_mediaInfo._app.empty() || _mediaInfo._streamid.empty()){
        //不允许莫名其妙的推流url
        onRes("rtmp推流url非法", false, false, false);
        return;
    }

166
    Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
167 168 169 170
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
171
        strongSelf->async([weakSelf,onRes,err,pToken,enableRtxp,enableHls,enableMP4](){
172 173 174 175
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
176
            onRes(err,enableRtxp,enableHls,enableMP4);
177 178
        });
    };
179
    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
180 181
    if(!flag){
        //该事件无人监听,默认鉴权成功
182 183 184 185
        GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
        GET_CONFIG(bool,toHls,General::kPublishToHls);
        GET_CONFIG(bool,toMP4,General::kPublishToMP4);
        onRes("",toRtxp,toHls,toMP4);
186
    }
xzl committed
187 188 189
}

void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
190 191 192 193 194 195
    AMFValue status(AMF_OBJECT);
    status.set("level", "status");
    status.set("code", "NetStream.Unpublish.Success");
    status.set("description", "Stop publishing.");
    sendReply("onStatus", nullptr, status);
    throw std::runtime_error(StrPrinter << "Stop publishing" << endl);
xzl committed
196 197
}

198 199
void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){
    bool authSuccess = err.empty();
200 201 202 203 204 205 206 207 208 209
    bool ok = (src.operator bool() && authSuccess);
    if (ok) {
        //stream begin
        sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA);
    }
    // onStatus(NetStream.Play.Reset)
    AMFValue status(AMF_OBJECT);
    status.set("level", ok ? "status" : "error");
    status.set("code", ok ? "NetStream.Play.Reset" : (authSuccess ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth"));
    status.set("description", ok ? "Resetting and playing." : (authSuccess ? "No such stream." : err.data()));
210
    status.set("details", _mediaInfo._streamid);
211 212 213
    status.set("clientid", "0");
    sendReply("onStatus", nullptr, status);
    if (!ok) {
xiongziliang committed
214 215 216 217 218
        string errMsg = StrPrinter << (authSuccess ? "no such stream:" : err.data()) << " "
                                 << _mediaInfo._vhost << " "
                                 << _mediaInfo._app << " "
                                 << _mediaInfo._streamid;
        shutdown(SockException(Err_shutdown,errMsg));
219 220
        return;
    }
xzl committed
221

222 223 224 225 226
    // onStatus(NetStream.Play.Start)
    status.clear();
    status.set("level", "status");
    status.set("code", "NetStream.Play.Start");
    status.set("description", "Started playing.");
227
    status.set("details", _mediaInfo._streamid);
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
    status.set("clientid", "0");
    sendReply("onStatus", nullptr, status);

    // |RtmpSampleAccess(true, true)
    AMFEncoder invoke;
    invoke << "|RtmpSampleAccess" << true << true;
    sendResponse(MSG_DATA, invoke.data());

    //onStatus(NetStream.Data.Start)
    invoke.clear();
    AMFValue obj(AMF_OBJECT);
    obj.set("code", "NetStream.Data.Start");
    invoke << "onStatus" << obj;
    sendResponse(MSG_DATA, invoke.data());

    //onStatus(NetStream.Play.PublishNotify)
    status.clear();
    status.set("level", "status");
    status.set("code", "NetStream.Play.PublishNotify");
    status.set("description", "Now published.");
248
    status.set("details", _mediaInfo._streamid);
249 250 251
    status.set("clientid", "0");
    sendReply("onStatus", nullptr, status);

252 253 254 255 256 257 258 259
    auto &metadata = src->getMetaData();
    if(metadata){
        //在有metadata的情况下才发送metadata
        //其实metadata没什么用,有些推流器不产生metadata
        // onMetaData
        invoke.clear();
        invoke << "onMetaData" << metadata;
        sendResponse(MSG_DATA, invoke.data());
260 261
        auto duration = metadata["duration"];
        if(duration && duration.as_number() > 0){
xiongziliang committed
262 263 264 265
            //这是点播,使用绝对时间戳
            _stamp[0].setPlayBack();
            _stamp[1].setPlayBack();
        }
266 267
    }

268 269

    src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) {
270
        //DebugP(this)<<"send initial frame";
271 272 273
        onSendMedia(pkt);
    });

274
    //音频同步于视频
xiongziliang committed
275
    _stamp[0].syncTo(_stamp[1]);
276
    _pRingReader = src->getRing()->attach(getPoller());
277
    weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
xiongziliang committed
278
    _pRingReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) {
279 280 281 282
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            return;
        }
xiongziliang committed
283 284 285 286 287 288 289 290 291 292 293 294
        if(strongSelf->_paused){
            return;
        }
        int i = 0;
        int size = pkt->size();
        strongSelf->setSendFlushFlag(false);
        pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
            if(++i == size){
                strongSelf->setSendFlushFlag(true);
            }
            strongSelf->onSendMedia(rtmp);
        });
295
    });
296
    _pRingReader->setDetachCB([weakSelf]() {
297 298 299
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            return;
300
        }
xiongziliang committed
301
        strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached"));
302
    });
303
    _pPlayerSrc = src;
304
    if (src->totalReaderCount() == 1) {
305 306
        src->seekTo(0);
    }
307 308
    //提高服务器发送性能
    setSocketFlags();
309
}
310

311 312 313 314 315 316 317 318 319
void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool)> &cb){
    if(!err.empty()){
        //鉴权失败,直接返回播放失败
        sendPlayResponse(err, nullptr);
        cb(false);
        return;
    }

    //鉴权成功,查找媒体源并回复
320
    weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
xiongziliang committed
321
    MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf,cb](const MediaSource::Ptr &src){
322
        auto rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(src);
323 324
        auto strongSelf = weakSelf.lock();
        if(strongSelf){
325
            strongSelf->sendPlayResponse("", rtmp_src);
326
        }
327
        cb(rtmp_src.operator bool());
328 329 330
    });
}

331
void RtmpSession::doPlay(AMFDecoder &dec){
xiongziliang committed
332
    std::shared_ptr<Ticker> pTicker(new Ticker);
333
    weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
334 335 336 337 338 339
    std::shared_ptr<onceToken> pToken(new onceToken(nullptr,[pTicker,weakSelf](){
        auto strongSelf = weakSelf.lock();
        if(strongSelf) {
            DebugP(strongSelf.get()) << "play 回复时间:" << pTicker->elapsedTime() << "ms";
        }
    }));
xiongziliang committed
340
    Broadcast::AuthInvoker invoker = [weakSelf,pToken](const string &err){
341 342 343 344
        auto strongSelf = weakSelf.lock();
        if(!strongSelf){
            return;
        }
xiongziliang committed
345
        strongSelf->async([weakSelf,err,pToken](){
346 347 348 349
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                return;
            }
350
            strongSelf->doPlayResponse(err,[pToken](bool){});
351 352
        });
    };
353 354

    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast<SockInfo &>(*this));
355 356
    if(!flag){
        //该事件无人监听,默认不鉴权
357
        doPlayResponse("",[pToken](bool){});
358
    }
359 360
}
void RtmpSession::onCmd_play2(AMFDecoder &dec) {
361
    doPlay(dec);
362
}
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394

string RtmpSession::getStreamId(const string &str){
    string stream_id;
    string params;
    auto pos = str.find('?');
    if(pos != string::npos){
        //有url参数
        stream_id = str.substr(0,pos);
        //获取url参数
        params = str.substr(pos + 1);
    }else{
        //没有url参数
        stream_id = str;
    }

    pos = stream_id.find(":");
    if(pos != string::npos){
        //vlc和ffplay在播放 rtmp://127.0.0.1/record/0.mp4时,
        //传过来的url会是rtmp://127.0.0.1/record/mp4:0,
        //我们在这里还原成0.mp4
        stream_id = stream_id.substr(pos + 1) + "." + stream_id.substr(0,pos);
    }

    if(params.empty()){
        //没有url参数
        return stream_id;
    }

    //有url参数
    return stream_id + '?' + params;
}

395
void RtmpSession::onCmd_play(AMFDecoder &dec) {
396
    dec.load<AMFValue>();/* NULL */
397
    _mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load<std::string>()));
398
    _mediaInfo._schema = RTMP_SCHEMA;
399
    doPlay(dec);
xzl committed
400 401 402
}

void RtmpSession::onCmd_pause(AMFDecoder &dec) {
403 404 405 406 407 408 409 410
    dec.load<AMFValue>();/* NULL */
    bool paused = dec.load<bool>();
    TraceP(this) << paused;
    AMFValue status(AMF_OBJECT);
    status.set("level", "status");
    status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify");
    status.set("description", paused ? "Paused stream." : "Unpaused stream.");
    sendReply("onStatus", nullptr, status);
xiongziliang committed
411 412 413
    //streamBegin
    sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA);
    _paused = paused;
xzl committed
414 415 416
}

void RtmpSession::setMetaData(AMFDecoder &dec) {
417 418 419 420 421 422 423
    if (!_pPublisherSrc) {
        throw std::runtime_error("not a publisher");
    }
    std::string type = dec.load<std::string>();
    if (type != "onMetaData") {
        throw std::runtime_error("can only set metadata");
    }
424
    auto metadata = dec.load<AMFValue>();
425
//    dumpMetadata(metadata);
426
    _pPublisherSrc->setMetaData(metadata);
427
    _set_meta_data = true;
xzl committed
428 429 430
}

void RtmpSession::onProcessCmd(AMFDecoder &dec) {
xiongziliang committed
431
    typedef void (RtmpSession::*rtmpCMDHandle)(AMFDecoder &dec);
432
    static unordered_map<string, rtmpCMDHandle> s_cmd_functions;
xiongziliang committed
433
    static onceToken token([]() {
434 435 436 437 438 439 440 441
        s_cmd_functions.emplace("connect",&RtmpSession::onCmd_connect);
        s_cmd_functions.emplace("createStream",&RtmpSession::onCmd_createStream);
        s_cmd_functions.emplace("publish",&RtmpSession::onCmd_publish);
        s_cmd_functions.emplace("deleteStream",&RtmpSession::onCmd_deleteStream);
        s_cmd_functions.emplace("play",&RtmpSession::onCmd_play);
        s_cmd_functions.emplace("play2",&RtmpSession::onCmd_play2);
        s_cmd_functions.emplace("seek",&RtmpSession::onCmd_seek);
        s_cmd_functions.emplace("pause",&RtmpSession::onCmd_pause);}, []() {});
xiongziliang committed
442 443

    std::string method = dec.load<std::string>();
444 445
    auto it = s_cmd_functions.find(method);
    if (it == s_cmd_functions.end()) {
xiongziliang committed
446
//		TraceP(this) << "can not support cmd:" << method;
447 448 449 450 451
        return;
    }
    _dNowReqID = dec.load<double>();
    auto fun = it->second;
    (this->*fun)(dec);
xzl committed
452 453 454
}

void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) {
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
    switch (chunkData.typeId) {
    case MSG_CMD:
    case MSG_CMD3: {
        AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0);
        onProcessCmd(dec);
    }
        break;

    case MSG_DATA:
    case MSG_DATA3: {
        AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0);
        std::string type = dec.load<std::string>();
        if (type == "@setDataFrame") {
            setMetaData(dec);
        }else{
xiongziliang committed
470 471
            TraceP(this) << "unknown notify:" << type;
        }
472 473 474 475 476 477 478 479
    }
        break;
    case MSG_AUDIO:
    case MSG_VIDEO: {
        if (!_pPublisherSrc) {
            throw std::runtime_error("Not a rtmp publisher!");
        }
        GET_CONFIG(bool,rtmp_modify_stamp,Rtmp::kModifyStamp);
480 481
        if(rtmp_modify_stamp){
            int64_t dts_out;
482
            _stamp[chunkData.typeId % 2].revise(chunkData.timeStamp, chunkData.timeStamp, dts_out, dts_out, true);
483 484
            chunkData.timeStamp = dts_out;
        }
485 486 487 488 489

        if(!_set_meta_data && !chunkData.isCfgFrame()){
            _set_meta_data = true;
            _pPublisherSrc->setMetaData(TitleMeta().getMetadata());
        }
490
        _pPublisherSrc->onWrite(std::make_shared<RtmpPacket>(std::move(chunkData)));
491 492 493 494 495 496
    }
        break;
    default:
        WarnP(this) << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
        break;
    }
xzl committed
497 498 499 500
}

void RtmpSession::onCmd_seek(AMFDecoder &dec) {
    dec.load<AMFValue>();/* NULL */
501 502 503 504 505 506
    AMFValue status(AMF_OBJECT);
    AMFEncoder invoke;
    status.set("level", "status");
    status.set("code", "NetStream.Seek.Notify");
    status.set("description", "Seeking.");
    sendReply("onStatus", nullptr, status);
xiongziliang committed
507 508 509 510 511 512 513

    auto milliSeconds = dec.load<AMFValue>().as_number();
    InfoP(this) << "rtmp seekTo(ms):" << milliSeconds;
    auto stongSrc = _pPlayerSrc.lock();
    if (stongSrc) {
        stongSrc->seekTo(milliSeconds);
    }
xzl committed
514 515
}

xiongziliang committed
516
void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
517 518 519 520
    //rtmp播放器时间戳从零开始
    int64_t dts_out;
    _stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out);
    sendRtmp(pkt->typeId, pkt->streamId, pkt, dts_out, pkt->chunkId);
xzl committed
521 522
}

523

524 525
bool RtmpSession::close(MediaSource &sender,bool force)  {
    //此回调在其他线程触发
526
    if(!_pPublisherSrc || (!force && _pPublisherSrc->totalReaderCount())){
527 528 529 530 531 532 533
        return false;
    }
    string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
    safeShutdown(SockException(Err_shutdown,err));
    return true;
}

534 535 536 537
int RtmpSession::totalReaderCount(MediaSource &sender) {
    return _pPublisherSrc ? _pPublisherSrc->totalReaderCount() : sender.readerCount();
}

538
void RtmpSession::setSocketFlags(){
539 540
    GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
    if(mergeWriteMS > 0) {
541 542 543
        //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
        SockUtil::setNoDelay(_sock->rawFD(), false);
        //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
xiongziliang committed
544
        setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
545 546
    }
}
547 548 549

void RtmpSession::dumpMetadata(const AMFValue &metadata) {
    if(metadata.type() != AMF_OBJECT && metadata.type() != AMF_ECMA_ARRAY){
550
        WarnL << "invalid metadata type:" << metadata.type();
551 552 553 554 555 556 557 558
        return ;
    }
    _StrPrinter printer;
    metadata.object_for_each([&](const string &key, const AMFValue &val){
            printer << "\r\n" << key << "\t:" << val.to_string() ;
    });
    InfoL << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid << (string)printer;
}
xiongziliang committed
559
} /* namespace mediakit */