FFmpegSource.cpp 12 KB
Newer Older
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3
 *
4
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
xiongziliang committed
5
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xiongziliang committed
9
 */
10 11 12 13 14 15

#include "FFmpegSource.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Util/File.h"
#include "System.h"
16
#include "Thread/WorkThreadPool.h"
17
#include "Network/sockutil.h"
18 19 20

namespace FFmpeg {
#define FFmpeg_FIELD "ffmpeg."
21 22 23
const string kBin = FFmpeg_FIELD"bin";
const string kCmd = FFmpeg_FIELD"cmd";
const string kLog = FFmpeg_FIELD"log";
24
const string kSnap = FFmpeg_FIELD"snap";
25 26

onceToken token([]() {
27
#ifdef _WIN32
28
    string ffmpeg_bin = trim(System::execute("where ffmpeg"));
29
#else
30
    string ffmpeg_bin = trim(System::execute("which ffmpeg"));
31 32 33 34
#endif
    //默认ffmpeg命令路径为环境变量中路径
    mINI::Instance()[kBin] = ffmpeg_bin.empty() ? "ffmpeg" : ffmpeg_bin;
    //ffmpeg日志保存路径
35
    mINI::Instance()[kLog] = "./ffmpeg/ffmpeg.log";
36 37
    mINI::Instance()[kCmd] = "%s -re -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s";
    mINI::Instance()[kSnap] = "%s -i %s -y -f mjpeg -t 0.001 %s";
38 39 40 41 42 43 44 45 46 47 48
});
}

FFmpegSource::FFmpegSource() {
    _poller = EventPollerPool::Instance().getPoller();
}

FFmpegSource::~FFmpegSource() {
    DebugL;
}

49 50 51 52 53 54 55 56 57 58 59 60
static bool is_local_ip(const string &ip){
    if (ip == "127.0.0.1" || ip == "localhost") {
        return true;
    }
    auto ips = SockUtil::getInterfaceList();
    for (auto &obj : ips) {
        if (ip == obj["ip"]) {
            return true;
        }
    }
    return false;
}
61

xiongziliang committed
62
void FFmpegSource::setupRecordFlag(bool enable_hls, bool enable_mp4){
63 64 65 66
    _enable_hls = enable_hls;
    _enable_mp4 = enable_mp4;
}

67
void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url,const string &dst_url,int timeout_ms,const onPlay &cb) {
xiongziliang committed
68
    GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin);
69
    GET_CONFIG(string,ffmpeg_cmd_default,FFmpeg::kCmd);
xiongziliang committed
70
    GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog);
71 72 73

    _src_url = src_url;
    _dst_url = dst_url;
74
    _ffmpeg_cmd_key = ffmpeg_cmd_key;
75 76
    _media_info.parse(dst_url);

77 78 79 80 81 82 83 84 85 86
    auto ffmpeg_cmd = ffmpeg_cmd_default;
    if (!ffmpeg_cmd_key.empty()) {
        auto cmd_it = mINI::Instance().find(ffmpeg_cmd_key);
        if (cmd_it != mINI::Instance().end()) {
            ffmpeg_cmd = cmd_it->second;
        } else{
            WarnL << "配置文件中,ffmpeg命令模板(" << ffmpeg_cmd_key << ")不存在,已采用默认模板(" << ffmpeg_cmd_default << ")";
        }
    }

87
    char cmd[1024] = {0};
88
    snprintf(cmd, sizeof(cmd), ffmpeg_cmd.data(), ffmpeg_bin.data(), src_url.data(), dst_url.data());
89
    _process.run(cmd,ffmpeg_log.empty() ? "" : File::absolutePath("",ffmpeg_log));
90 91
    InfoL << cmd;

92
    if (is_local_ip(_media_info._host)) {
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
        //推流给自己的,通过判断流是否注册上来判断是否正常
        if(_media_info._schema != RTSP_SCHEMA && _media_info._schema != RTMP_SCHEMA){
            cb(SockException(Err_other,"本服务只支持rtmp/rtsp推流"));
            return;
        }
        weak_ptr<FFmpegSource> weakSelf = shared_from_this();
        findAsync(timeout_ms,[cb,weakSelf,timeout_ms](const MediaSource::Ptr &src){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //自己已经销毁
                return;
            }
            if(src){
                //推流给自己成功
                cb(SockException());
108
                strongSelf->onGetMediaSource(src);
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
                strongSelf->startTimer(timeout_ms);
                return;
            }
            //推流失败
            if(!strongSelf->_process.wait(false)){
                //ffmpeg进程已经退出
                cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
                return;
            }
            //ffmpeg进程还在线,但是等待推流超时
            cb(SockException(Err_other,"等待超时"));
        });
    } else{
        //推流给其他服务器的,通过判断FFmpeg进程是否在线判断是否成功
        weak_ptr<FFmpegSource> weakSelf = shared_from_this();
124
        _timer = std::make_shared<Timer>(timeout_ms / 1000.0f,[weakSelf,cb,timeout_ms](){
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //自身已经销毁
                return false;
            }
            //FFmpeg还在线,那么我们认为推流成功
            if(strongSelf->_process.wait(false)){
                cb(SockException());
                strongSelf->startTimer(timeout_ms);
                return false;
            }
            //ffmpeg进程已经退出
            cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
            return false;
        },_poller);
    }
}

void FFmpegSource::findAsync(int maxWaitMS, const function<void(const MediaSource::Ptr &src)> &cb) {
    auto src = MediaSource::find(_media_info._schema,
                                 _media_info._vhost,
                                 _media_info._app,
147
                                 _media_info._streamid);
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
    if(src || !maxWaitMS){
        cb(src);
        return;
    }

    void *listener_tag = this;
    //若干秒后执行等待媒体注册超时回调
    auto onRegistTimeout = _poller->doDelayTask(maxWaitMS,[cb,listener_tag](){
        //取消监听该事件
        NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
        cb(nullptr);
        return 0;
    });

    weak_ptr<FFmpegSource> weakSelf = shared_from_this();
    auto onRegist = [listener_tag,weakSelf,cb,onRegistTimeout](BroadcastMediaChangedArgs) {
        auto strongSelf = weakSelf.lock();
        if(!strongSelf) {
            //本身已经销毁,取消延时任务
            onRegistTimeout->cancel();
            NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
            return;
        }

xiongziliang committed
172 173 174 175 176
        if (!bRegist ||
            sender.getSchema() != strongSelf->_media_info._schema ||
            sender.getVhost() != strongSelf->_media_info._vhost ||
            sender.getApp() != strongSelf->_media_info._app ||
            sender.getId() != strongSelf->_media_info._streamid) {
177 178 179 180 181 182 183 184 185 186
            //不是自己感兴趣的事件,忽略之
            return;
        }

        //查找的流终于注册上了;取消延时任务,防止多次回调
        onRegistTimeout->cancel();
        //取消事件监听
        NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);

        //切换到自己的线程再回复
xiongziliang committed
187
        strongSelf->_poller->async([weakSelf,cb](){
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
            auto strongSelf = weakSelf.lock();
            if(!strongSelf) {
                return;
            }
            //再找一遍媒体源,一般能找到
            strongSelf->findAsync(0,cb);
        }, false);
    };
    //监听媒体注册事件
    NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist);
}

/**
 * 定时检查媒体是否在线
 */
void FFmpegSource::startTimer(int timeout_ms) {
    weak_ptr<FFmpegSource> weakSelf = shared_from_this();
205
    _timer = std::make_shared<Timer>(1.0f, [weakSelf, timeout_ms]() {
206 207 208 209 210
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            //自身已经销毁
            return false;
        }
211
        if (is_local_ip(strongSelf->_media_info._host)) {
212 213 214 215
            //推流给自己的,我们通过检查是否已经注册来判断FFmpeg是否工作正常
            strongSelf->findAsync(0, [&](const MediaSource::Ptr &src) {
                //同步查找流
                if (!src) {
xiongziliang committed
216
                    //流不在线,重新拉流, 这里原先是10秒超时,实际发现10秒不够,改成20秒了
217
                    if(strongSelf->_replay_ticker.elapsedTime() > 20 * 1000){
218 219
                        //上次重试时间超过10秒,那么再重试FFmpeg拉流
                        strongSelf->_replay_ticker.resetTime();
220
                        strongSelf->play(strongSelf->_ffmpeg_cmd_key, strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {});
221
                    }
222 223 224 225 226 227
                }
            });
        } else {
            //推流给其他服务器的,我们通过判断FFmpeg进程是否在线,如果FFmpeg推流中断,那么它应该会自动退出
            if (!strongSelf->_process.wait(false)) {
                //ffmpeg不在线,重新拉流
228
                strongSelf->play(strongSelf->_ffmpeg_cmd_key, strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [weakSelf](const SockException &ex) {
229 230 231 232 233 234 235 236 237 238 239 240
                    if(!ex){
                        //没有错误
                        return;
                    }
                    auto strongSelf = weakSelf.lock();
                    if (!strongSelf) {
                        //自身已经销毁
                        return;
                    }
                    //上次重试时间超过10秒,那么再重试FFmpeg拉流
                    strongSelf->startTimer(10 * 1000);
                });
241 242 243 244
            }
        }
        return true;
    }, _poller);
245
}
246

247 248 249
void FFmpegSource::setOnClose(const function<void()> &cb){
    _onClose = cb;
}
250

251
bool FFmpegSource::close(MediaSource &sender, bool force) {
252
    auto listener = getDelegate();
253 254 255 256 257 258 259 260 261 262
    if(listener && !listener->close(sender,force)){
        //关闭失败
        return false;
    }
    //该流无人观看,我们停止吧
    if(_onClose){
        _onClose();
    }
    return true;
}
263

264 265 266 267 268 269 270 271 272 273 274 275
MediaOriginType FFmpegSource::getOriginType(MediaSource &sender) const{
    return MediaOriginType::ffmpeg_pull;
}

string FFmpegSource::getOriginUrl(MediaSource &sender) const{
    return _src_url;
}

std::shared_ptr<SockInfo> FFmpegSource::getOriginSock(MediaSource &sender) const {
    return nullptr;
}

276
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
277
    auto listener = src->getListener(true);
278
    if (listener.lock().get() != this) {
279 280
        //防止多次进入onGetMediaSource函数导致无限递归调用的bug
        setDelegate(listener);
281
        src->setListener(shared_from_this());
282 283 284 285 286 287
        if (_enable_hls) {
            src->setupRecord(Recorder::type_hls, true, "");
        }
        if (_enable_mp4) {
            src->setupRecord(Recorder::type_mp4, true, "");
        }
288
    }
289
}
290 291 292 293 294

void FFmpegSnap::makeSnap(const string &play_url, const string &save_path, float timeout_sec,  const function<void(bool)> &cb) {
    GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin);
    GET_CONFIG(string,ffmpeg_snap,FFmpeg::kSnap);
    GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog);
295 296 297 298 299 300 301
    Ticker ticker;
    WorkThreadPool::Instance().getPoller()->async([timeout_sec, play_url,save_path,cb, ticker](){
        auto elapsed_ms = ticker.elapsedTime();
        if (elapsed_ms > timeout_sec * 1000) {
            //超时,后台线程负载太高,当代太久才启动该任务
            cb(false);
            return;
302 303 304
        }
        char cmd[1024] = {0};
        snprintf(cmd, sizeof(cmd),ffmpeg_snap.data(),ffmpeg_bin.data(),play_url.data(),save_path.data());
305
        std::shared_ptr<Process> process = std::make_shared<Process>();
306
        process->run(cmd,ffmpeg_log.empty() ? "" : File::absolutePath("",ffmpeg_log));
307
        //定时器延时应该减去后台任务启动的延时
308
        auto delayTask = EventPollerPool::Instance().getPoller()->doDelayTask((uint64_t)(timeout_sec * 1000 - elapsed_ms),[process,cb](){
309 310 311 312 313 314 315
            if(process->wait(false)){
                //FFmpeg进程还在运行,超时就关闭它
                process->kill(2000);
            }
            return 0;
        });

316 317 318 319 320 321 322 323 324
        //等待FFmpeg进程退出
        process->wait(true);
        //FFmpeg进程退出了可以取消定时器了
        delayTask->cancel();
        //执行回调函数
        cb(process->exit_code() == 0);
    });
}