FFmpegSource.cpp 10.5 KB
Newer Older
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3 4 5
 *
 * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xiongziliang committed
9
 */
10 11 12 13 14 15

#include "FFmpegSource.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Util/File.h"
#include "System.h"
16
#include "Thread/WorkThreadPool.h"
17 18 19

namespace FFmpeg {
#define FFmpeg_FIELD "ffmpeg."
20 21 22
const string kBin = FFmpeg_FIELD"bin";
const string kCmd = FFmpeg_FIELD"cmd";
const string kLog = FFmpeg_FIELD"log";
23
const string kSnap = FFmpeg_FIELD"snap";
24 25

onceToken token([]() {
26
#ifdef _WIN32
27
    string ffmpeg_bin = trim(System::execute("where ffmpeg"));
28
#else
29
    string ffmpeg_bin = trim(System::execute("which ffmpeg"));
30 31 32 33
#endif
    //默认ffmpeg命令路径为环境变量中路径
    mINI::Instance()[kBin] = ffmpeg_bin.empty() ? "ffmpeg" : ffmpeg_bin;
    //ffmpeg日志保存路径
34
    mINI::Instance()[kLog] = "./ffmpeg/ffmpeg.log";
35 36
    mINI::Instance()[kCmd] = "%s -re -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s";
    mINI::Instance()[kSnap] = "%s -i %s -y -f mjpeg -t 0.001 %s";
37 38 39 40 41 42 43 44 45 46 47 48 49
});
}

FFmpegSource::FFmpegSource() {
    _poller = EventPollerPool::Instance().getPoller();
}

FFmpegSource::~FFmpegSource() {
    DebugL;
}


void FFmpegSource::play(const string &src_url,const string &dst_url,int timeout_ms,const onPlay &cb) {
xiongziliang committed
50 51 52
    GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin);
    GET_CONFIG(string,ffmpeg_cmd,FFmpeg::kCmd);
    GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog);
53 54 55 56 57 58 59

    _src_url = src_url;
    _dst_url = dst_url;
    _media_info.parse(dst_url);

    char cmd[1024] = {0};
    snprintf(cmd, sizeof(cmd),ffmpeg_cmd.data(),ffmpeg_bin.data(),src_url.data(),dst_url.data());
60
    _process.run(cmd,ffmpeg_log.empty() ? "" : File::absolutePath("",ffmpeg_log));
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
    InfoL << cmd;

    if(_media_info._host == "127.0.0.1"){
        //推流给自己的,通过判断流是否注册上来判断是否正常
        if(_media_info._schema != RTSP_SCHEMA && _media_info._schema != RTMP_SCHEMA){
            cb(SockException(Err_other,"本服务只支持rtmp/rtsp推流"));
            return;
        }
        weak_ptr<FFmpegSource> weakSelf = shared_from_this();
        findAsync(timeout_ms,[cb,weakSelf,timeout_ms](const MediaSource::Ptr &src){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //自己已经销毁
                return;
            }
            if(src){
                //推流给自己成功
                cb(SockException());
79
                strongSelf->onGetMediaSource(src);
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
                strongSelf->startTimer(timeout_ms);
                return;
            }
            //推流失败
            if(!strongSelf->_process.wait(false)){
                //ffmpeg进程已经退出
                cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
                return;
            }
            //ffmpeg进程还在线,但是等待推流超时
            cb(SockException(Err_other,"等待超时"));
        });
    } else{
        //推流给其他服务器的,通过判断FFmpeg进程是否在线判断是否成功
        weak_ptr<FFmpegSource> weakSelf = shared_from_this();
        _timer = std::make_shared<Timer>(timeout_ms / 1000,[weakSelf,cb,timeout_ms](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf){
                //自身已经销毁
                return false;
            }
            //FFmpeg还在线,那么我们认为推流成功
            if(strongSelf->_process.wait(false)){
                cb(SockException());
                strongSelf->startTimer(timeout_ms);
                return false;
            }
            //ffmpeg进程已经退出
            cb(SockException(Err_other,StrPrinter << "ffmpeg已经退出,exit code = " << strongSelf->_process.exit_code()));
            return false;
        },_poller);
    }
}

void FFmpegSource::findAsync(int maxWaitMS, const function<void(const MediaSource::Ptr &src)> &cb) {
    auto src = MediaSource::find(_media_info._schema,
                                 _media_info._vhost,
                                 _media_info._app,
118
                                 _media_info._streamid);
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
    if(src || !maxWaitMS){
        cb(src);
        return;
    }

    void *listener_tag = this;
    //若干秒后执行等待媒体注册超时回调
    auto onRegistTimeout = _poller->doDelayTask(maxWaitMS,[cb,listener_tag](){
        //取消监听该事件
        NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
        cb(nullptr);
        return 0;
    });

    weak_ptr<FFmpegSource> weakSelf = shared_from_this();
    auto onRegist = [listener_tag,weakSelf,cb,onRegistTimeout](BroadcastMediaChangedArgs) {
        auto strongSelf = weakSelf.lock();
        if(!strongSelf) {
            //本身已经销毁,取消延时任务
            onRegistTimeout->cancel();
            NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
            return;
        }

xiongziliang committed
143 144 145 146 147
        if (!bRegist ||
            sender.getSchema() != strongSelf->_media_info._schema ||
            sender.getVhost() != strongSelf->_media_info._vhost ||
            sender.getApp() != strongSelf->_media_info._app ||
            sender.getId() != strongSelf->_media_info._streamid) {
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
            //不是自己感兴趣的事件,忽略之
            return;
        }

        //查找的流终于注册上了;取消延时任务,防止多次回调
        onRegistTimeout->cancel();
        //取消事件监听
        NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);

        //切换到自己的线程再回复
        strongSelf->_poller->async([listener_tag,weakSelf,cb](){
            auto strongSelf = weakSelf.lock();
            if(!strongSelf) {
                return;
            }
            //再找一遍媒体源,一般能找到
            strongSelf->findAsync(0,cb);
        }, false);
    };
    //监听媒体注册事件
    NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist);
}

/**
 * 定时检查媒体是否在线
 */
void FFmpegSource::startTimer(int timeout_ms) {
    weak_ptr<FFmpegSource> weakSelf = shared_from_this();
    _timer = std::make_shared<Timer>(1, [weakSelf, timeout_ms]() {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
            //自身已经销毁
            return false;
        }
        if (strongSelf->_media_info._host == "127.0.0.1") {
            //推流给自己的,我们通过检查是否已经注册来判断FFmpeg是否工作正常
            strongSelf->findAsync(0, [&](const MediaSource::Ptr &src) {
                //同步查找流
                if (!src) {
                    //流不在线,重新拉流
188 189 190 191 192
                    if(strongSelf->_replay_ticker.elapsedTime() > 10 * 1000){
                        //上次重试时间超过10秒,那么再重试FFmpeg拉流
                        strongSelf->_replay_ticker.resetTime();
                        strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {});
                    }
193 194 195 196 197 198
                }
            });
        } else {
            //推流给其他服务器的,我们通过判断FFmpeg进程是否在线,如果FFmpeg推流中断,那么它应该会自动退出
            if (!strongSelf->_process.wait(false)) {
                //ffmpeg不在线,重新拉流
199 200 201 202 203 204 205 206 207 208 209 210 211
                strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [weakSelf](const SockException &ex) {
                    if(!ex){
                        //没有错误
                        return;
                    }
                    auto strongSelf = weakSelf.lock();
                    if (!strongSelf) {
                        //自身已经销毁
                        return;
                    }
                    //上次重试时间超过10秒,那么再重试FFmpeg拉流
                    strongSelf->startTimer(10 * 1000);
                });
212 213 214 215
            }
        }
        return true;
    }, _poller);
216
}
217

218 219 220
void FFmpegSource::setOnClose(const function<void()> &cb){
    _onClose = cb;
}
221

222 223 224 225 226 227 228 229 230 231 232 233
bool FFmpegSource::close(MediaSource &sender, bool force) {
    auto listener = _listener.lock();
    if(listener && !listener->close(sender,force)){
        //关闭失败
        return false;
    }
    //该流无人观看,我们停止吧
    if(_onClose){
        _onClose();
    }
    return true;
}
234

235 236 237 238 239
int FFmpegSource::totalReaderCount(MediaSource &sender) {
    auto listener = _listener.lock();
    if(listener){
        return listener->totalReaderCount(sender);
    }
240
    return sender.readerCount();
241 242
}

243 244 245 246 247 248 249 250 251
void FFmpegSource::onNoneReader(MediaSource &sender){
    auto listener = _listener.lock();
    if(listener){
        listener->onNoneReader(sender);
        return;
    }
    MediaSourceEvent::onNoneReader(sender);
}

252 253 254 255 256 257 258
void FFmpegSource::onRegist(MediaSource &sender, bool regist){
    auto listener = _listener.lock();
    if(listener){
        listener->onRegist(sender, regist);
    }
}

259 260 261 262
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
    _listener = src->getListener();
    src->setListener(shared_from_this());
}
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290

void FFmpegSnap::makeSnap(const string &play_url, const string &save_path, float timeout_sec,  const function<void(bool)> &cb) {
    GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin);
    GET_CONFIG(string,ffmpeg_snap,FFmpeg::kSnap);
    GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog);

    std::shared_ptr<Process> process = std::make_shared<Process>();
    auto delayTask = EventPollerPool::Instance().getPoller()->doDelayTask(timeout_sec * 1000,[process,cb](){
        if(process->wait(false)){
            //FFmpeg进程还在运行,超时就关闭它
            process->kill(2000);
        }
        return 0;
    });

    WorkThreadPool::Instance().getPoller()->async([process,play_url,save_path,delayTask,cb](){
        char cmd[1024] = {0};
        snprintf(cmd, sizeof(cmd),ffmpeg_snap.data(),ffmpeg_bin.data(),play_url.data(),save_path.data());
        process->run(cmd,ffmpeg_log.empty() ? "" : File::absolutePath("",ffmpeg_log));
        //等待FFmpeg进程退出
        process->wait(true);
        //FFmpeg进程退出了可以取消定时器了
        delayTask->cancel();
        //执行回调函数
        cb(process->exit_code() == 0);
    });
}