RtpSession.cpp 2.48 KB
Newer Older
Gemfield committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
9 10 11
 */

#if defined(ENABLE_RTPPROXY)
Gemfield committed
12 13
#include "RtpSession.h"
#include "RtpSelector.h"
14
namespace mediakit{
Gemfield committed
15 16 17 18 19 20 21 22

RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
    DebugP(this);
    socklen_t addr_len = sizeof(addr);
    getpeername(sock->rawFD(), &addr, &addr_len);
}
RtpSession::~RtpSession() {
    DebugP(this);
23
    if(_process){
Gemfield committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
        RtpSelector::Instance().delProcess(_ssrc,_process.get());
    }
}

void RtpSession::onRecv(const Buffer::Ptr &data) {
    try {
        RtpSplitter::input(data->data(), data->size());
    } catch (SockException &ex) {
        shutdown(ex);
    } catch (std::exception &ex) {
        shutdown(SockException(Err_other, ex.what()));
    }
}

void RtpSession::onError(const SockException &err) {
    WarnL << _ssrc << " " << err.what();
}

void RtpSession::onManager() {
    if(_process && !_process->alive()){
        shutdown(SockException(Err_timeout, "receive rtp timeout"));
    }

    if(!_process && _ticker.createdTime() > 10 * 1000){
        shutdown(SockException(Err_timeout, "illegal connection"));
    }
}

void RtpSession::onRtpPacket(const char *data, uint64_t len) {
53 54 55 56
    if (!_process) {
        if (!RtpSelector::getSSRC(data + 2, len - 2, _ssrc)) {
            return;
        }
Gemfield committed
57
        _process = RtpSelector::Instance().getProcess(_ssrc, true);
58
        _process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
Gemfield committed
59
    }
60
    _process->inputRtp(_sock,data + 2, len - 2, &addr);
Gemfield committed
61 62
    _ticker.resetTime();
}
63

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
bool RtpSession::close(MediaSource &sender, bool force) {
    //此回调在其他线程触发
    if(!_process || (!force && _process->totalReaderCount())){
        return false;
    }
    string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
    safeShutdown(SockException(Err_shutdown,err));
    return true;
}

int RtpSession::totalReaderCount(MediaSource &sender) {
    //此回调在其他线程触发
    return _process ? _process->totalReaderCount() : sender.totalReaderCount();
}

79 80
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)