WebApi.cpp 19.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
#include <signal.h>
#include <functional>
#include <sstream>
#include <unordered_map>
#include "jsoncpp/json.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/NoticeCenter.h"
xiongziliang committed
10
#ifdef ENABLE_MYSQL
11
#include "Util/SqlPool.h"
xiongziliang committed
12
#endif //ENABLE_MYSQL
13 14 15 16 17
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Http/HttpRequester.h"
#include "Http/HttpSession.h"
#include "Network/TcpServer.h"
xiongziliang committed
18
#include "Player/PlayerProxy.h"
19 20 21 22 23

using namespace Json;
using namespace toolkit;
using namespace mediakit;

xiongziliang committed
24 25 26 27

typedef map<string,variant,StrCaseCompare> ApiArgsType;


28 29
#define API_ARGS TcpSession &sender, \
                 HttpSession::KeyValue &headerIn, \
30
                 HttpSession::KeyValue &headerOut, \
xiongziliang committed
31
                 ApiArgsType &allArgs, \
32 33 34 35 36
                 Json::Value &val

#define API_REGIST(field, name, ...) \
    s_map_api.emplace("/index/"#field"/"#name,[](API_ARGS,const HttpSession::HttpResponseInvoker &invoker){ \
         static auto lam = [&](API_ARGS) __VA_ARGS__ ;  \
37
         lam(sender,headerIn, headerOut, allArgs, val); \
38 39 40 41 42 43 44 45 46 47 48 49 50
         invoker("200 OK", headerOut, val.toStyledString()); \
     });

#define API_REGIST_INVOKER(field, name, ...) \
    s_map_api.emplace("/index/"#field"/"#name,[](API_ARGS,const HttpSession::HttpResponseInvoker &invoker) __VA_ARGS__);

//异步http api lambad定义
typedef std::function<void(API_ARGS,const HttpSession::HttpResponseInvoker &invoker)> AsyncHttpApi;
//api列表
static map<string, AsyncHttpApi> s_map_api;

namespace API {
typedef enum {
51
    InvalidArgs = -300,
52 53 54 55 56 57 58 59
    SqlFailed = -200,
    AuthFailed = -100,
    OtherFailed = -1,
    Success = 0
} ApiErr;

#define API_FIELD "api."
const char kApiDebug[] = API_FIELD"apiDebug";
60 61
const char kSecret[] = API_FIELD"secret";

62
static onceToken token([]() {
xiongziliang committed
63
    mINI::Instance()[kApiDebug] = "1";
64
    mINI::Instance()[kSecret] = "035c73f7-bb6b-4889-a715-d9eb2d1925cc";
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
});
}//namespace API

class ApiRetException: public std::runtime_error {
public:
    ApiRetException(const char *str = "success" ,int code = API::Success):runtime_error(str){
        _code = code;
    }
    ~ApiRetException() = default;
    int code(){ return _code; }
private:
    int _code;
};

class AuthException : public ApiRetException {
public:
    AuthException(const char *str):ApiRetException(str,API::AuthFailed){}
    ~AuthException() = default;
};

85
class InvalidArgsException: public ApiRetException {
86
public:
87 88 89 90 91 92 93 94
    InvalidArgsException(const char *str):ApiRetException(str,API::InvalidArgs){}
    ~InvalidArgsException() = default;
};

class SuccessException: public ApiRetException {
public:
    SuccessException():ApiRetException("success",API::Success){}
    ~SuccessException() = default;
95 96
};

97 98

//获取HTTP请求中url参数、content参数
xiongziliang committed
99 100 101
static ApiArgsType getAllArgs(const Parser &parser) {
    ApiArgsType allArgs;
    if(parser["Content-Type"].find("application/x-www-form-urlencoded") == 0){
102 103
        auto contentArgs = parser.parseArgs(parser.Content());
        for (auto &pr : contentArgs) {
xiongziliang committed
104
            allArgs[pr.first] = HttpSession::urlDecode(pr.second);
105
        }
xiongziliang committed
106 107 108 109 110 111 112 113 114 115 116
    }else if(parser["Content-Type"].find("application/json") == 0){
        try {
            stringstream ss(parser.Content());
            Value jsonArgs;
            ss >> jsonArgs;
            auto keys = jsonArgs.getMemberNames();
            for (auto key = keys.begin(); key != keys.end(); ++key){
                allArgs[*key] = jsonArgs[*key].asString();
            }
        }catch (std::exception &ex){
            WarnL << ex.what();
117
        }
xiongziliang committed
118 119
    }else if(!parser["Content-Type"].empty()){
        WarnL << "invalid Content-Type:" << parser["Content-Type"];
120
    }
xiongziliang committed
121 122 123 124 125 126

    auto &urlArgs = parser.getUrlArgs();
    for (auto &pr : urlArgs) {
        allArgs[pr.first] = HttpSession::urlDecode(pr.second);
    }
    return std::move(allArgs);
127 128 129
}

static inline void addHttpListener(){
130
    GET_CONFIG(bool, api_debug, API::kApiDebug);
131 132 133 134 135 136 137 138 139
    //注册监听kBroadcastHttpRequest事件
    NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastHttpRequest, [](BroadcastHttpRequestArgs) {
        auto it = s_map_api.find(parser.Url());
        if (it == s_map_api.end()) {
            consumed = false;
            return;
        }
        //该api已被消费
        consumed = true;
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
        //执行API
        Json::Value val;
        val["code"] = API::Success;
        HttpSession::KeyValue headerOut;
        auto allArgs = getAllArgs(parser);
        HttpSession::KeyValue &headerIn = parser.getValues();
        headerOut["Content-Type"] = "application/json; charset=utf-8";
        if(api_debug){
            auto newInvoker = [invoker,parser,allArgs](const string &codeOut,
                                                       const HttpSession::KeyValue &headerOut,
                                                       const string &contentOut){
                stringstream ss;
                for(auto &pr : allArgs ){
                    ss << pr.first << " : " << pr.second << "\r\n";
                }

                DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n"
                       << "# content:\r\n" << parser.Content() << "\r\n"
                       << "# args:\r\n" << ss.str()
                       << "# response:\r\n"
                       << contentOut << "\r\n";

                invoker(codeOut,headerOut,contentOut);
            };
            ((HttpSession::HttpResponseInvoker &)invoker) = newInvoker;
        }
166

167
        try {
168
            it->second(sender,headerIn, headerOut, allArgs, val, invoker);
169 170 171 172
        }  catch(ApiRetException &ex){
            val["code"] = ex.code();
            val["msg"] = ex.what();
            invoker("200 OK", headerOut, val.toStyledString());
xiongziliang committed
173 174 175
        }
#ifdef ENABLE_MYSQL
        catch(SqlException &ex){
176 177 178 179
            val["code"] = API::SqlFailed;
            val["msg"] = StrPrinter << "操作数据库失败:" << ex.what() << ":" << ex.getSql();
            WarnL << ex.what() << ":" << ex.getSql();
            invoker("200 OK", headerOut, val.toStyledString());
xiongziliang committed
180 181 182
        }
#endif// ENABLE_MYSQL
        catch (std::exception &ex) {
183 184 185 186
            val["code"] = API::OtherFailed;
            val["msg"] = ex.what();
            invoker("200 OK", headerOut, val.toStyledString());
        }
187 188 189
    });
}

190
template <typename Args,typename First>
xiongziliang committed
191
bool checkArgs(Args &&args,First &&first){
192 193 194 195
    return !args[first].empty();
}

template <typename Args,typename First,typename ...KeyTypes>
xiongziliang committed
196 197
bool checkArgs(Args &&args,First &&first,KeyTypes && ...keys){
    return !args[first].empty() && checkArgs(std::forward<Args>(args),std::forward<KeyTypes>(keys)...);
198 199 200
}

#define CHECK_ARGS(...)  \
xiongziliang committed
201
    if(!checkArgs(allArgs,##__VA_ARGS__)){ \
202
        throw InvalidArgsException("缺少必要参数:" #__VA_ARGS__); \
203 204
    }

205
#define CHECK_SECRET() \
206 207 208 209 210
    if(sender.get_peer_ip() != "127.0.0.1"){ \
        CHECK_ARGS("secret"); \
        if(api_secret != allArgs["secret"]){ \
            throw AuthException("secret错误"); \
        } \
211 212
    }

213
static unordered_map<string ,PlayerProxy::Ptr> s_proxyMap;
214
static recursive_mutex s_proxyMapMtx;
215 216 217
static inline string getProxyKey(const string &vhost,const string &app,const string &stream){
    return vhost + "/" + app + "/" + stream;
}
218

219 220 221 222 223
/**
 * 安装api接口
 * 所有api都支持GET和POST两种方式
 * POST方式参数支持application/json和application/x-www-form-urlencoded方式
 */
224 225 226
void installWebApi() {
    addHttpListener();

227
    GET_CONFIG(string,api_secret,API::kSecret);
228

229 230
    //获取线程负载
    //测试url http://127.0.0.1/index/api/getThreadsLoad
231 232 233 234
    API_REGIST_INVOKER(api, getThreadsLoad, {
        EventPollerPool::Instance().getExecutorDelay([invoker, headerOut](const vector<int> &vecDelay) {
            Value val;
            auto vec = EventPollerPool::Instance().getExecutorLoad();
xiongziliang committed
235
            int i = API::Success;
236 237 238 239 240 241 242 243 244 245
            for (auto load : vec) {
                Value obj(objectValue);
                obj["load"] = load;
                obj["delay"] = vecDelay[i++];
                val["data"].append(obj);
            }
            invoker("200 OK", headerOut, val.toStyledString());
        });
    });

246 247
    //获取服务器配置
    //测试url http://127.0.0.1/index/api/getServerConfig
248
    API_REGIST(api, getServerConfig, {
249
        CHECK_SECRET();
250 251 252 253 254 255 256
        Value obj;
        for (auto &pr : mINI::Instance()) {
            obj[pr.first] = (string &) pr.second;
        }
        val["data"].append(obj);
    });

257 258 259
    //设置服务器配置
    //测试url(比如关闭http api调试) http://127.0.0.1/index/api/setServerConfig?api.apiDebug=0
    //你也可以通过http post方式传参,可以通过application/x-www-form-urlencoded或application/json方式传参
260
    API_REGIST(api, setServerConfig, {
261
        CHECK_SECRET();
262
        auto &ini = mINI::Instance();
xiongziliang committed
263
        int changed = API::Success;
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
        for (auto &pr : allArgs) {
            if (ini.find(pr.first) == ini.end()) {
                //没有这个key
                continue;
            }
            if (ini[pr.first] == pr.second) {
                continue;
            }
            ini[pr.first] = pr.second;
            //替换成功
            ++changed;
        }
        if (changed > 0) {
            NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig);
            ini.dumpFile();
        }
        val["changed"] = changed;
    });

283 284 285

    //获取服务器api列表
    //测试url http://127.0.0.1/index/api/getApiList
286
    API_REGIST(api,getApiList,{
287
        CHECK_SECRET();
288 289 290 291 292
        for(auto &pr : s_map_api){
            val["data"].append(pr.first);
        }
    });

293 294
    //重启服务器,只有Daemon方式才能重启,否则是直接关闭!
    //测试url http://127.0.0.1/index/api/restartServer
295
    API_REGIST(api,restartServer,{
296
        CHECK_SECRET();
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
        EventPollerPool::Instance().getPoller()->doDelayTask(1000,[](){
            //尝试正常退出
            ::kill(getpid(), SIGINT);

            //3秒后强制退出
            EventPollerPool::Instance().getPoller()->doDelayTask(3000,[](){
                exit(0);
                return 0;
            });

            return 0;
        });
        val["msg"] = "服务器将在一秒后自动重启";
    });


313 314 315 316
    //获取流列表,可选筛选参数
    //测试url0(获取所有流) http://127.0.0.1/index/api/getMediaList
    //测试url1(获取虚拟主机为"__defaultVost__"的流) http://127.0.0.1/index/api/getMediaList?vhost=__defaultVost__
    //测试url2(获取rtsp类型的流) http://127.0.0.1/index/api/getMediaList?schema=rtsp
317
    API_REGIST(api,getMediaList,{
318
        CHECK_SECRET();
319
        //获取所有MediaSource列表
xiongziliang committed
320
        val["code"] = API::Success;
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
        val["msg"] = "success";
        MediaSource::for_each_media([&](const string &schema,
                                        const string &vhost,
                                        const string &app,
                                        const string &stream,
                                        const MediaSource::Ptr &media){
            if(!allArgs["schema"].empty() && allArgs["schema"] != schema){
                return;
            }
            if(!allArgs["vhost"].empty() && allArgs["vhost"] != vhost){
                return;
            }
            if(!allArgs["app"].empty() && allArgs["app"] != app){
                return;
            }
            Value item;
            item["schema"] = schema;
            item["vhost"] = vhost;
            item["app"] = app;
            item["stream"] = stream;
xiongziliang committed
341
            val["data"].append(item);
342 343 344
        });
    });

345
    //主动关断流,包括关断拉流、推流
346
    //测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1
347
    API_REGIST(api,close_stream,{
348
        CHECK_SECRET();
349
        CHECK_ARGS("schema","vhost","app","stream");
350 351 352 353 354 355
        //踢掉推流器
        auto src = MediaSource::find(allArgs["schema"],
                                     allArgs["vhost"],
                                     allArgs["app"],
                                     allArgs["stream"]);
        if(src){
356
            bool flag = src->close(allArgs["force"].as<bool>());
357
            val["code"] = flag ? 0 : -1;
358
            val["msg"] = flag ? "success" : "close failed";
359 360
        }else{
            val["code"] = -2;
361
            val["msg"] = "can not find the stream";
362 363 364
        }
    });

365 366 367 368 369 370 371 372 373 374
    //获取所有TcpSession列表信息
    //可以根据本地端口和远端ip来筛选
    //测试url(筛选某端口下的tcp会话) http://127.0.0.1/index/api/getAllSession?local_port=1935
    API_REGIST(api,getAllSession,{
        CHECK_SECRET();
        Value jsession;
        uint16_t local_port = allArgs["local_port"].as<uint16_t>();
        string &peer_ip = allArgs["peer_ip"];

        SessionMap::Instance().for_each_session([&](const string &id,const TcpSession::Ptr &session){
xiongziliang committed
375
            if(local_port != API::Success && local_port != session->get_local_port()){
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
                return;
            }
            if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){
                return;
            }
            jsession["peer_ip"] = session->get_peer_ip();
            jsession["peer_port"] = session->get_peer_port();
            jsession["local_ip"] = session->get_local_ip();
            jsession["local_port"] = session->get_local_port();
            jsession["id"] = id;
            jsession["typeid"] = typeid(*session).name();
            val["data"].append(jsession);
        });
    });

    //断开tcp连接,比如说可以断开rtsp、rtmp播放器等
    //测试url http://127.0.0.1/index/api/kick_session?id=123456
393
    API_REGIST(api,kick_session,{
394
        CHECK_SECRET();
395
        CHECK_ARGS("id");
396
        //踢掉tcp会话
xiongziliang committed
397
        auto session = SessionMap::Instance().get(allArgs["id"]);
398
        if(!session){
xiongziliang committed
399
            val["code"] = API::OtherFailed;
400 401 402 403
            val["msg"] = "can not find the target";
            return;
        }
        session->safeShutdown();
xiongziliang committed
404
        val["code"] = API::Success;
405 406 407
        val["msg"] = "success";
    });

408 409 410 411 412 413 414 415 416
    static auto addStreamProxy = [](const string &vhost,
                                    const string &app,
                                    const string &stream,
                                    const string &url,
                                    bool enable_hls,
                                    bool enable_mp4,
                                    int rtp_type,
                                    const function<void(const SockException &ex,const string &key)> &cb){
        auto key = getProxyKey(vhost,app,stream);
417 418 419 420 421 422
        lock_guard<recursive_mutex> lck(s_proxyMapMtx);
        if(s_proxyMap.find(key) != s_proxyMap.end()){
            //已经在拉流了
            cb(SockException(Err_success),key);
            return;
        }
xiongziliang committed
423
        //添加拉流代理
424
        PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_hls,enable_mp4));
425 426
        s_proxyMap[key] = player;
        
xiongziliang committed
427
        //指定RTP over TCP(播放rtsp时有效)
428
        (*player)[kRtpType] = rtp_type;
429
        //开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试
430
        player->setPlayCallbackOnce([cb,player,key](const SockException &ex){
431
            if(ex){
432
                lock_guard<recursive_mutex> lck(s_proxyMapMtx);
433
                s_proxyMap.erase(key);
434 435
            }
            const_cast<PlayerProxy::Ptr &>(player).reset();
436
            cb(ex,key);
437
        });
438 439 440 441 442 443

        //被主动关闭拉流
        player->setOnClose([key](){
            lock_guard<recursive_mutex> lck(s_proxyMapMtx);
            s_proxyMap.erase(key);
        });
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
        player->play(url);
    };

    //动态添加rtsp/rtmp拉流代理
    //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&stream=0&url=rtmp://127.0.0.1/live/obs
    API_REGIST_INVOKER(api,addStreamProxy,{
        CHECK_SECRET();
        CHECK_ARGS("vhost","app","stream","url");
        addStreamProxy(allArgs["vhost"],
                       allArgs["app"],
                       allArgs["stream"],
                       allArgs["url"],
                       allArgs["enable_hls"],
                       allArgs["enable_mp4"],
                       allArgs["rtp_type"],
                       [invoker,val,headerOut](const SockException &ex,const string &key){
                           if(ex){
                               const_cast<Value &>(val)["code"] = API::OtherFailed;
                               const_cast<Value &>(val)["msg"] = ex.what();
                           }else{
                               const_cast<Value &>(val)["data"]["key"] = key;
                           }
                           invoker("200 OK", headerOut, val.toStyledString());
                       });
xiongziliang committed
468 469
    });

470 471
    //关闭拉流代理
    //测试url http://127.0.0.1/index/api/delStreamProxy?key=__defaultVhost__/proxy/0
xiongziliang committed
472
    API_REGIST(api,delStreamProxy,{
473
        CHECK_SECRET();
474
        CHECK_ARGS("key");
xiongziliang committed
475
        lock_guard<recursive_mutex> lck(s_proxyMapMtx);
476
        val["data"]["flag"] = s_proxyMap.erase(allArgs["key"]) == 1;
xiongziliang committed
477 478 479
    });


480 481 482
    ////////////以下是注册的Hook API////////////
    API_REGIST(hook,on_publish,{
        //开始推流事件
483
        throw SuccessException();
484 485 486 487
    });

    API_REGIST(hook,on_play,{
        //开始播放事件
488
        throw SuccessException();
489 490 491 492
    });

    API_REGIST(hook,on_flow_report,{
        //流量统计hook api
493
        throw SuccessException();
494
    });
xiongziliang committed
495 496

    API_REGIST(hook,on_rtsp_realm,{
497
        //rtsp是否需要鉴权,默认需要鉴权
xiongziliang committed
498
        val["code"] = API::Success;
xiongziliang committed
499 500 501 502 503 504
        val["realm"] = "zlmediakit_reaml";
    });

    API_REGIST(hook,on_rtsp_auth,{
        //rtsp鉴权密码,密码等于用户名
        //rtsp可以有双重鉴权!后面还会触发on_play事件
505
        CHECK_ARGS("user_name");
xiongziliang committed
506
        val["code"] = API::Success;
xiongziliang committed
507
        val["encrypted"] = false;
508
        val["passwd"] = allArgs["user_name"].data();
xiongziliang committed
509 510 511 512
    });

    API_REGIST(hook,on_stream_changed,{
        //媒体注册或反注册事件
513
        throw SuccessException();
xiongziliang committed
514 515
    });

516 517
    API_REGIST_INVOKER(hook,on_stream_not_found,{
        //媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流
518 519
        CHECK_SECRET();
        CHECK_ARGS("vhost","app","stream");
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
        addStreamProxy(allArgs["vhost"],
                       allArgs["app"],
                       allArgs["stream"],
                       "rtmp://live.hkstv.hk.lxdns.com/live/hks2",
                       false,
                       false,
                       0,
                       [invoker,val,headerOut](const SockException &ex,const string &key){
                           if(ex){
                               const_cast<Value &>(val)["code"] = API::OtherFailed;
                               const_cast<Value &>(val)["msg"] = ex.what();
                           }else{
                               const_cast<Value &>(val)["data"]["key"] = key;
                           }
                           invoker("200 OK", headerOut, val.toStyledString());
                       });
xiongziliang committed
536 537 538 539 540
    });


    API_REGIST(hook,on_record_mp4,{
        //录制mp4分片完毕事件
541
        throw SuccessException();
xiongziliang committed
542 543
    });

544 545 546 547
    API_REGIST(hook,on_shell_login,{
        //shell登录调试事件
        throw SuccessException();
    });
548 549 550 551 552 553 554

    API_REGIST(hook,on_stream_none_reader,{
        //无人观看流默认关闭
        val["close"] = true;
    });


555
}
xiongziliang committed
556

557 558 559
void unInstallWebApi(){
    lock_guard<recursive_mutex> lck(s_proxyMapMtx);
    s_proxyMap.clear();
560
}