RtmpPusher.cpp 9.43 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9
 */
xiongziliang committed
10

xzl committed
11 12
#include "RtmpPusher.h"
#include "Rtmp/utils.h"
xiongzilaing committed
13 14 15
#include "Util/util.h"
#include "Util/onceToken.h"
#include "Thread/ThreadPool.h"
xiongziliang committed
16
using namespace toolkit;
xiongziliang committed
17
using namespace mediakit::Client;
xzl committed
18

xiongziliang committed
19
namespace mediakit {
xzl committed
20

21
RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){
22
    _pMediaSrc=src;
xzl committed
23 24 25
}

RtmpPusher::~RtmpPusher() {
26 27
    teardown();
    DebugL << endl;
xzl committed
28 29
}
void RtmpPusher::teardown() {
30 31 32 33 34 35 36 37
    if (alive()) {
        _strApp.clear();
        _strStream.clear();
        _strTcUrl.clear();
        {
            lock_guard<recursive_mutex> lck(_mtxOnResultCB);
            _mapOnResultCB.clear();
        }
xzl committed
38
        {
39 40
            lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
            _dqOnStatusCB.clear();
xzl committed
41
        }
42
        _pPublishTimer.reset();
xiongziliang committed
43
        reset();
xiongziliang committed
44
        shutdown(SockException(Err_shutdown,"teardown"));
45
    }
xzl committed
46 47
}

48
void RtmpPusher::onPublishResult(const SockException &ex,bool handshakeCompleted) {
49 50 51 52 53 54 55 56 57 58 59 60
    if(!handshakeCompleted){
        //播放结果回调
        _pPublishTimer.reset();
        if(_onPublished){
            _onPublished(ex);
        }
    } else {
        //播放成功后异常断开回调
        if(_onShutdown){
            _onShutdown(ex);
        }
    }
xiongziliang committed
61

62 63 64
    if(ex){
        teardown();
    }
xiongziliang committed
65 66
}

xiongziliang committed
67
void RtmpPusher::publish(const string &strUrl)  {
68 69 70
    teardown();
    string strHost = FindField(strUrl.data(), "://", "/");
    _strApp = 	FindField(strUrl.data(), (strHost + "/").data(), "/");
xiongziliang committed
71
    _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
72
    _strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
xzl committed
73

74
    if (!_strApp.size() || !_strStream.size()) {
75
        onPublishResult(SockException(Err_other,"rtmp url非法"),false);
xzl committed
76 77
        return;
    }
78
    DebugL << strHost << " " << _strApp << " " << _strStream;
xzl committed
79

80 81
    auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
    if (iPort <= 0) {
xzl committed
82
        //rtmp 默认端口1935
83 84
        iPort = 1935;
    } else {
xzl committed
85
        //服务器域名
86 87
        strHost = FindField(strHost.data(), NULL, ":");
    }
xiongziliang committed
88 89

    weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
90 91
    float publishTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
    _pPublishTimer.reset( new Timer(publishTimeOutSec,  [weakSelf]() {
xiongziliang committed
92 93 94 95
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return false;
        }
96
        strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"), false);
xiongziliang committed
97 98 99 100 101 102 103
        return false;
    },getPoller()));

    if(!(*this)[kNetAdapter].empty()){
        setNetAdapter((*this)[kNetAdapter]);
    }

104
    startConnect(strHost, iPort);
xzl committed
105 106 107
}

void RtmpPusher::onErr(const SockException &ex){
108 109
    //定时器_pPublishTimer为空后表明握手结束了
    onPublishResult(ex,!_pPublishTimer);
xzl committed
110 111
}
void RtmpPusher::onConnect(const SockException &err){
112 113 114 115 116 117
    if(err) {
        onPublishResult(err,false);
        return;
    }
    //推流器不需要多大的接收缓存,节省内存占用
    _sock->setReadBuffer(std::make_shared<BufferRaw>(1 * 1024));
118

119 120
    weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
    startClientSession([weakSelf](){
xzl committed
121 122 123 124
        auto strongSelf=weakSelf.lock();
        if(!strongSelf) {
            return;
        }
xiongziliang committed
125 126

        strongSelf->sendChunkSize(60000);
xzl committed
127
        strongSelf->send_connect();
128
    });
xzl committed
129
}
130
void RtmpPusher::onRecv(const Buffer::Ptr &pBuf){
131 132 133 134 135 136 137
    try {
        onParseRtmp(pBuf->data(), pBuf->size());
    } catch (exception &e) {
        SockException ex(Err_other, e.what());
        //定时器_pPublishTimer为空后表明握手结束了
        onPublishResult(ex,!_pPublishTimer);
    }
xzl committed
138 139 140 141
}


inline void RtmpPusher::send_connect() {
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    AMFValue obj(AMF_OBJECT);
    obj.set("app", _strApp);
    obj.set("type", "nonprivate");
    obj.set("tcUrl", _strTcUrl);
    obj.set("swfUrl", _strTcUrl);
    sendInvoke("connect", obj);
    addOnResultCB([this](AMFDecoder &dec){
        //TraceL << "connect result";
        dec.load<AMFValue>();
        auto val = dec.load<AMFValue>();
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
        if(level != "status"){
            throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl);
        }
        send_createStream();
    });
xzl committed
159 160 161
}

inline void RtmpPusher::send_createStream() {
162 163 164 165 166 167 168 169
    AMFValue obj(AMF_NULL);
    sendInvoke("createStream", obj);
    addOnResultCB([this](AMFDecoder &dec){
        //TraceL << "createStream result";
        dec.load<AMFValue>();
        _ui32StreamId = dec.load<int>();
        send_publish();
    });
xzl committed
170 171
}
inline void RtmpPusher::send_publish() {
172 173 174
    AMFEncoder enc;
    enc << "publish" << ++_iReqID << nullptr << _strStream << _strApp ;
    sendRequest(MSG_CMD, enc.data());
xzl committed
175

176 177 178 179 180 181 182 183 184
    addOnStatusCB([this](AMFValue &val) {
        auto level = val["level"].as_string();
        auto code = val["code"].as_string();
        if(level != "status") {
            throw std::runtime_error(StrPrinter <<"publish 失败:" << level << " " << code << endl);
        }
        //start send media
        send_metaData();
    });
xzl committed
185 186 187
}

inline void RtmpPusher::send_metaData(){
188
    auto src = _pMediaSrc.lock();
xzl committed
189
    if (!src) {
190
        throw std::runtime_error("the media source was released");
xzl committed
191
    }
192

xzl committed
193 194 195 196
    AMFEncoder enc;
    enc << "@setDataFrame" << "onMetaData" <<  src->getMetaData();
    sendRequest(MSG_DATA, enc.data());
    
xiongziliang committed
197
    src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
198
        sendRtmp(pkt->typeId, _ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId );
xzl committed
199 200
    });
    
201
    _pRtmpReader = src->getRing()->attach(getPoller());
xzl committed
202
    weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
xiongziliang committed
203
    _pRtmpReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){
204 205 206 207
        auto strongSelf = weakSelf.lock();
        if(!strongSelf) {
            return;
        }
xiongziliang committed
208 209 210 211 212 213 214 215 216 217

        int i = 0;
        int size = pkt->size();
        strongSelf->setSendFlushFlag(false);
        pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
            if(++i == size){
                strongSelf->setSendFlushFlag(true);
            }
            strongSelf->sendRtmp(rtmp->typeId, strongSelf->_ui32StreamId, rtmp, rtmp->timeStamp, rtmp->chunkId);
        });
xzl committed
218
    });
219
    _pRtmpReader->setDetachCB([weakSelf](){
xzl committed
220 221
        auto strongSelf = weakSelf.lock();
        if(strongSelf){
222
            strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放"), !strongSelf->_pPublishTimer);
xzl committed
223 224
        }
    });
225
    onPublishResult(SockException(Err_success,"success"), false);
226 227
    //提升发送性能
    setSocketFlags();
xzl committed
228
}
229 230

void RtmpPusher::setSocketFlags(){
231 232
    GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
    if(mergeWriteMS > 0) {
233
        //提高发送性能
xiongziliang committed
234
        setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
235
        SockUtil::setNoDelay(_sock->rawFD(), false);
236
    }
237 238
}

xzl committed
239
void RtmpPusher::onCmd_result(AMFDecoder &dec){
240 241 242 243 244 245 246 247 248
    auto iReqId = dec.load<int>();
    lock_guard<recursive_mutex> lck(_mtxOnResultCB);
    auto it = _mapOnResultCB.find(iReqId);
    if(it != _mapOnResultCB.end()){
        it->second(dec);
        _mapOnResultCB.erase(it);
    }else{
        WarnL << "unhandled _result";
    }
xzl committed
249 250
}
void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) {
251 252 253 254 255 256 257 258 259 260
    AMFValue val;
    while(true){
        val = dec.load<AMFValue>();
        if(val.type() == AMF_OBJECT){
            break;
        }
    }
    if(val.type() != AMF_OBJECT){
        throw std::runtime_error("onStatus:the result object was not found");
    }
xzl committed
261

262
    lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
263 264 265 266 267 268 269 270 271 272 273
    if(_dqOnStatusCB.size()){
        _dqOnStatusCB.front()(val);
        _dqOnStatusCB.pop_front();
    }else{
        auto level = val["level"];
        auto code = val["code"].as_string();
        if(level.type() == AMF_STRING){
            if(level.as_string() != "status"){
                throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl);
            }
        }
xzl committed
274 275 276 277
    }
}

void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) {
278 279 280 281 282 283 284 285 286 287
    switch (chunkData.typeId) {
        case MSG_CMD:
        case MSG_CMD3: {
            typedef void (RtmpPusher::*rtmpCMDHandle)(AMFDecoder &dec);
            static unordered_map<string, rtmpCMDHandle> g_mapCmd;
            static onceToken token([]() {
                g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result);
                g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result);
                g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus);
            }, []() {});
288

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
            AMFDecoder dec(chunkData.strBuf, 0);
            std::string type = dec.load<std::string>();
            auto it = g_mapCmd.find(type);
            if(it != g_mapCmd.end()){
                auto fun = it->second;
                (this->*fun)(dec);
            }else{
                WarnL << "can not support cmd:" << type;
            }
        }
            break;
        default:
            //WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
            break;
        }
xzl committed
304 305 306
}


xiongziliang committed
307
} /* namespace mediakit */
xzl committed
308