PusherProxy.cpp 3.25 KB
Newer Older
ziyue committed
1
/*
monktan committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
 *
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
 *
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
 */

#include "PusherProxy.h"

using namespace toolkit;

namespace mediakit {

17 18
PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const EventPoller::Ptr &poller)
            : MediaPusher(src, poller){
monktan committed
19 20
    _retry_count = retry_count;
    _on_close = [](const SockException &) {};
21
    _weak_src = src;
monktan committed
22 23 24 25 26 27 28 29 30 31 32 33 34 35
}

PusherProxy::~PusherProxy() {
    _timer.reset();
}

void PusherProxy::setPushCallbackOnce(const function<void(const SockException &ex)> &cb) {
    _on_publish = cb;
}

void PusherProxy::setOnClose(const function<void(const SockException &ex)> &cb) {
    _on_close = cb;
}

36 37 38
void PusherProxy::publish(const string &dst_url) {
    std::weak_ptr<PusherProxy> weak_self = shared_from_this();
    std::shared_ptr<int> failed_cnt(new int(0));
monktan committed
39

40 41 42 43 44
    setOnPublished([weak_self, dst_url, failed_cnt](const SockException &err) {
        auto strong_self = weak_self.lock();
        if (!strong_self) {
            return;
        }
monktan committed
45

46 47 48 49
        if (strong_self->_on_publish) {
            strong_self->_on_publish(err);
            strong_self->_on_publish = nullptr;
        }
monktan committed
50

51
        auto src = strong_self->_weak_src.lock();
monktan committed
52 53
        if (!err) {
            // 推流成功
54 55 56
            *failed_cnt = 0;
            InfoL << "Publish " << dst_url << " success";
        } else if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) {
monktan committed
57
            // 推流失败,延时重试推送
58
            strong_self->rePublish(dst_url, (*failed_cnt)++);
monktan committed
59
        } else {
60 61
            //如果媒体源已经注销, 或达到了最大重试次数,回调关闭
            strong_self->_on_close(err);
monktan committed
62 63 64
        }
    });

65 66 67 68 69
    setOnShutdown([weak_self, dst_url, failed_cnt](const SockException &err) {
        auto strong_self = weak_self.lock();
        if (!strong_self) {
            return;
        }
monktan committed
70

71
        auto src = strong_self->_weak_src.lock();
monktan committed
72
        //推流异常中断,延时重试播放
73 74
        if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) {
            strong_self->rePublish(dst_url, (*failed_cnt)++);
monktan committed
75
        } else {
76 77
            //如果媒体源已经注销, 或达到了最大重试次数,回调关闭
            strong_self->_on_close(err);
monktan committed
78 79 80
        }
    });

81
    MediaPusher::publish(dst_url);
monktan committed
82 83
}

84 85 86 87
void PusherProxy::rePublish(const string &dst_url, int failed_cnt) {
    auto delay = MAX(2 * 1000, MIN(failed_cnt * 3000, 60 * 1000));
    weak_ptr<PusherProxy> weak_self = shared_from_this();
    _timer = std::make_shared<Timer>(delay / 1000.0f, [weak_self, dst_url, failed_cnt]() {
monktan committed
88
        //推流失败次数越多,则延时越长
89 90
        auto strong_self = weak_self.lock();
        if (!strong_self) {
monktan committed
91 92
            return false;
        }
93 94
        WarnL << "推流重试[" << failed_cnt << "]:" << dst_url;
        strong_self->MediaPusher::publish(dst_url);
monktan committed
95 96 97 98 99
        return false;
    }, getPoller());
}

} /* namespace mediakit */