Commit 90bbdf95 by baiyfcu Committed by GitHub

Merge pull request #18 from xiongziliang/master

update
parents a4621896 81a38404
[submodule "ZLToolKit"] [submodule "ZLToolKit"]
path = 3rdpart/ZLToolKit path = 3rdpart/ZLToolKit
url = https://gitee.com/xiahcu/ZLToolKit url = ../ZLToolKit
[submodule "3rdpart/media-server"] [submodule "3rdpart/media-server"]
path = 3rdpart/media-server path = 3rdpart/media-server
url = https://gitee.com/xiahcu/media-server url = ../media-server
Subproject commit 5030af90126ea8f01ded6744ae8abdf549d00a81 Subproject commit 17e82574991134f798ae32f82d48e2d6c6b97b06
Subproject commit 576216c64bf3bcdc5e787da2adb3e169bdd97118 Subproject commit 43facc343afc2b5b70bbbc3c177f20dfa936f2bf
...@@ -41,13 +41,17 @@ INCLUDE_DIRECTORIES(${ToolKit_Root}) ...@@ -41,13 +41,17 @@ INCLUDE_DIRECTORIES(${ToolKit_Root})
INCLUDE_DIRECTORIES(${MediaKit_Root}) INCLUDE_DIRECTORIES(${MediaKit_Root})
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/3rdpart) INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/3rdpart)
set(ENABLE_HLS true) option(ENABLE_HLS "Enable HLS" true)
set(ENABLE_OPENSSL true) option(ENABLE_OPENSSL "Enable OpenSSL" true)
set(ENABLE_MYSQL false) option(ENABLE_MYSQL "Enable MySQL" false)
set(ENABLE_FAAC false) option(ENABLE_FAAC "Enable FAAC" false)
set(ENABLE_X264 false) option(ENABLE_X264 "Enable x264" false)
set(ENABLE_MP4 true) option(ENABLE_MP4 "Enable MP4" true)
set(ENABLE_RTPPROXY true) option(ENABLE_RTPPROXY "Enable RTPPROXY" true)
option(ENABLE_API "Enable C API SDK" true)
option(ENABLE_CXX_API "Enable C++ API SDK" false)
option(ENABLE_TESTS "Enable Tests" true)
option(ENABLE_SERVER "Enable Server" true)
set(LINK_LIB_LIST zlmediakit zltoolkit) set(LINK_LIB_LIST zlmediakit zltoolkit)
...@@ -113,6 +117,7 @@ if(ENABLE_HLS) ...@@ -113,6 +117,7 @@ if(ENABLE_HLS)
add_library(mpeg STATIC ${src_mpeg}) add_library(mpeg STATIC ${src_mpeg})
list(APPEND LINK_LIB_LIST mpeg) list(APPEND LINK_LIB_LIST mpeg)
list(APPEND CXX_API_TARGETS mpeg)
if(WIN32) if(WIN32)
set_target_properties(mpeg PROPERTIES COMPILE_FLAGS ${VS_FALGS} ) set_target_properties(mpeg PROPERTIES COMPILE_FLAGS ${VS_FALGS} )
...@@ -136,6 +141,7 @@ if(ENABLE_MP4) ...@@ -136,6 +141,7 @@ if(ENABLE_MP4)
add_library(mov STATIC ${src_mov}) add_library(mov STATIC ${src_mov})
add_library(flv STATIC ${src_flv}) add_library(flv STATIC ${src_flv})
list(APPEND LINK_LIB_LIST mov flv) list(APPEND LINK_LIB_LIST mov flv)
list(APPEND CXX_API_TARGETS mov flv)
if(WIN32) if(WIN32)
set_target_properties(mov flv PROPERTIES COMPILE_FLAGS ${VS_FALGS} ) set_target_properties(mov flv PROPERTIES COMPILE_FLAGS ${VS_FALGS} )
...@@ -153,6 +159,7 @@ if(ENABLE_RTPPROXY AND ENABLE_HLS) ...@@ -153,6 +159,7 @@ if(ENABLE_RTPPROXY AND ENABLE_HLS)
add_library(rtp STATIC ${src_rtp}) add_library(rtp STATIC ${src_rtp})
add_definitions(-DENABLE_RTPPROXY) add_definitions(-DENABLE_RTPPROXY)
list(APPEND LINK_LIB_LIST rtp) list(APPEND LINK_LIB_LIST rtp)
list(APPEND CXX_API_TARGETS rtp)
endif() endif()
#收集源代码 #收集源代码
...@@ -174,6 +181,23 @@ endif () ...@@ -174,6 +181,23 @@ endif ()
#添加库 #添加库
add_library(zltoolkit STATIC ${ToolKit_src_list}) add_library(zltoolkit STATIC ${ToolKit_src_list})
add_library(zlmediakit STATIC ${MediaKit_src_list}) add_library(zlmediakit STATIC ${MediaKit_src_list})
list(APPEND CXX_API_TARGETS zltoolkit zlmediakit)
#安装目录
if (WIN32)
set(INSTALL_PATH_LIB $ENV{HOME}/${CMAKE_PROJECT_NAME}/lib)
set(INSTALL_PATH_INCLUDE $ENV{HOME}/${CMAKE_PROJECT_NAME}/include)
else ()
set(INSTALL_PATH_LIB lib)
set(INSTALL_PATH_INCLUDE include)
endif ()
if(ENABLE_CXX_API)
# 保留目录结构
install(DIRECTORY ${ToolKit_Root}/ DESTINATION ${INSTALL_PATH_INCLUDE}/ZLToolKit REGEX "(.*[.](md|cpp)|win32)$" EXCLUDE)
install(DIRECTORY ${MediaKit_Root}/ DESTINATION ${INSTALL_PATH_INCLUDE}/ZLMediaKit REGEX ".*[.](md|cpp)$" EXCLUDE)
install(TARGETS ${CXX_API_TARGETS} DESTINATION ${INSTALL_PATH_LIB})
endif()
if (WIN32) if (WIN32)
list(APPEND LINK_LIB_LIST WS2_32 Iphlpapi shlwapi) list(APPEND LINK_LIB_LIST WS2_32 Iphlpapi shlwapi)
...@@ -188,11 +212,17 @@ execute_process(COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/www ${EXECUTABLE_OUTPU ...@@ -188,11 +212,17 @@ execute_process(COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/www ${EXECUTABLE_OUTPU
execute_process(COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/conf/config.ini ${EXECUTABLE_OUTPUT_PATH}/) execute_process(COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/conf/config.ini ${EXECUTABLE_OUTPUT_PATH}/)
#添加c库 #添加c库
add_subdirectory(api) if(ENABLE_API)
add_subdirectory(api)
endif()
if (NOT IOS) if (NOT IOS)
#测试程序 #测试程序
add_subdirectory(tests) if(ENABLE_TESTS)
add_subdirectory(tests)
endif()
#主服务器 #主服务器
add_subdirectory(server) if(ENABLE_SERVER)
add_subdirectory(server)
endif()
endif () endif ()
...@@ -170,6 +170,6 @@ bash build_docker_images.sh ...@@ -170,6 +170,6 @@ bash build_docker_images.sh
欢迎捐赠以便更好的推动项目的发展,谢谢您的支持! 欢迎捐赠以便更好的推动项目的发展,谢谢您的支持!
[支付宝](https://raw.githubusercontent.com/xiongziliang/other/master/IMG_3919.JPG) [支付宝](https://gitee.com/xiahcu/other/raw/master/IMG_3919.JPG)
[微信](https://raw.githubusercontent.com/xiongziliang/other/master/IMG_3920.JPG) [微信](https://gitee.com/xiahcu/other/raw/master/IMG_3920.JPG)
...@@ -128,7 +128,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber ...@@ -128,7 +128,7 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber
### Before build ### Before build
- **You must use git to clone the complete code. Do not download the source code by downloading zip package. Otherwise, the sub-module code will not be downloaded by default.You can do it like this:** - **You must use git to clone the complete code. Do not download the source code by downloading zip package. Otherwise, the sub-module code will not be downloaded by default.You can do it like this:**
``` ```
git clone https://github.com/zlmediakit/ZLMediaKit.git git clone https://github.com/xiongziliang/ZLMediaKit.git
cd ZLMediaKit cd ZLMediaKit
git submodule update --init git submodule update --init
``` ```
......
...@@ -22,15 +22,6 @@ else () ...@@ -22,15 +22,6 @@ else ()
target_link_libraries(mk_api ${LINK_LIB_LIST}) target_link_libraries(mk_api ${LINK_LIB_LIST})
add_subdirectory(tests) add_subdirectory(tests)
#安装目录
if (WIN32)
set(INSTALL_PATH_LIB $ENV{HOME}/${CMAKE_PROJECT_NAME}/lib)
set(INSTALL_PATH_INCLUDE $ENV{HOME}/${CMAKE_PROJECT_NAME}/include)
else ()
set(INSTALL_PATH_LIB lib)
set(INSTALL_PATH_INCLUDE include)
endif ()
file(GLOB api_header_list include/*.h) file(GLOB api_header_list include/*.h)
install(FILES ${api_header_list} DESTINATION ${INSTALL_PATH_INCLUDE}) install(FILES ${api_header_list} DESTINATION ${INSTALL_PATH_INCLUDE})
install(TARGETS mk_api ARCHIVE DESTINATION ${INSTALL_PATH_LIB} LIBRARY DESTINATION ${INSTALL_PATH_LIB}) install(TARGETS mk_api ARCHIVE DESTINATION ${INSTALL_PATH_LIB} LIBRARY DESTINATION ${INSTALL_PATH_LIB})
......
...@@ -102,6 +102,13 @@ API_EXPORT void API_CALL mk_env_init1(int thread_num, ...@@ -102,6 +102,13 @@ API_EXPORT void API_CALL mk_env_init1(int thread_num,
API_EXPORT void API_CALL mk_set_option(const char *key, const char *val); API_EXPORT void API_CALL mk_set_option(const char *key, const char *val);
/** /**
* 获取配置项的值
* @param key 配置项名
*/
API_EXPORT const char * API_CALL mk_get_option(const char *key);
/**
* 创建http[s]服务器 * 创建http[s]服务器
* @param port htt监听端口,推荐80,传入0则随机分配 * @param port htt监听端口,推荐80,传入0则随机分配
* @param ssl 是否为ssl类型服务器 * @param ssl 是否为ssl类型服务器
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#define MK_MEDIA_H_ #define MK_MEDIA_H_
#include "mk_common.h" #include "mk_common.h"
#include "mk_events_objects.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -99,6 +100,15 @@ API_EXPORT void API_CALL mk_media_input_h265(mk_media ctx, void *data, int len, ...@@ -99,6 +100,15 @@ API_EXPORT void API_CALL mk_media_input_h265(mk_media ctx, void *data, int len,
API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, uint32_t dts, void *adts); API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, uint32_t dts, void *adts);
/** /**
* 输入单帧PCM音频,启用ENABLE_FAAC编译时,该函数才有效
* @param ctx 对象指针
* @param data 单帧PCM数据
* @param len 单帧PCM数据字节数
* @param dts 时间戳,毫秒
*/
API_EXPORT void API_CALL mk_media_input_pcm(mk_media ctx, void *data, int len, uint32_t pts);
/**
* 输入单帧G711音频 * 输入单帧G711音频
* @param ctx 对象指针 * @param ctx 对象指针
* @param data 单帧G711数据 * @param data 单帧G711数据
...@@ -149,6 +159,22 @@ API_EXPORT void API_CALL mk_media_set_on_seek(mk_media ctx, on_mk_media_seek cb, ...@@ -149,6 +159,22 @@ API_EXPORT void API_CALL mk_media_set_on_seek(mk_media ctx, on_mk_media_seek cb,
*/ */
API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx); API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx);
/**
* 生成的MediaSource注册或注销事件
* @param user_data 设置回调时的用户数据指针
* @param sender 生成的MediaSource对象
* @param regist 1为注册事件,0为注销事件
*/
typedef void(API_CALL *on_mk_media_source_regist)(void *user_data, mk_media_source sender, int regist);
/**
* 设置MediaSource注册或注销事件回调函数
* @param ctx 对象指针
* @param cb 回调指针
* @param user_data 用户数据指针
*/
API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source_regist cb, void *user_data);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -30,10 +30,8 @@ static TcpServer::Ptr http_server[2]; ...@@ -30,10 +30,8 @@ static TcpServer::Ptr http_server[2];
static TcpServer::Ptr shell_server; static TcpServer::Ptr shell_server;
#ifdef ENABLE_RTPPROXY #ifdef ENABLE_RTPPROXY
#include "Rtp/UdpRecver.h" #include "Rtp/RtpServer.h"
#include "Rtp/RtpSession.h" static std::shared_ptr<RtpServer> rtpServer;
static std::shared_ptr<UdpRecver> udpRtpServer;
static TcpServer::Ptr tcpRtpServer;
#endif #endif
//////////////////////////environment init/////////////////////////// //////////////////////////environment init///////////////////////////
...@@ -57,8 +55,7 @@ API_EXPORT void API_CALL mk_stop_all_server(){ ...@@ -57,8 +55,7 @@ API_EXPORT void API_CALL mk_stop_all_server(){
CLEAR_ARR(rtmp_server); CLEAR_ARR(rtmp_server);
CLEAR_ARR(http_server); CLEAR_ARR(http_server);
#ifdef ENABLE_RTPPROXY #ifdef ENABLE_RTPPROXY
udpRtpServer = nullptr; rtpServer = nullptr;
tcpRtpServer = nullptr;
#endif #endif
stopAllTcpServer(); stopAllTcpServer();
} }
...@@ -119,6 +116,17 @@ API_EXPORT void API_CALL mk_set_option(const char *key, const char *val) { ...@@ -119,6 +116,17 @@ API_EXPORT void API_CALL mk_set_option(const char *key, const char *val) {
mINI::Instance()[key] = val; mINI::Instance()[key] = val;
} }
API_EXPORT const char * API_CALL mk_get_option(const char *key)
{
assert(key);
if (mINI::Instance().find(key) == mINI::Instance().end()) {
WarnL << "key:" << key << " not existed!";
return nullptr;
}
return mINI::Instance()[key].data();
}
API_EXPORT uint16_t API_CALL mk_http_server_start(uint16_t port, int ssl) { API_EXPORT uint16_t API_CALL mk_http_server_start(uint16_t port, int ssl) {
ssl = MAX(0,MIN(ssl,1)); ssl = MAX(0,MIN(ssl,1));
try { try {
...@@ -173,18 +181,12 @@ API_EXPORT uint16_t API_CALL mk_rtmp_server_start(uint16_t port, int ssl) { ...@@ -173,18 +181,12 @@ API_EXPORT uint16_t API_CALL mk_rtmp_server_start(uint16_t port, int ssl) {
API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){ API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){
#ifdef ENABLE_RTPPROXY #ifdef ENABLE_RTPPROXY
try { try {
//创建rtp tcp服务器 //创建rtp 服务器
tcpRtpServer = std::make_shared<TcpServer>(); rtpServer = std::make_shared<RtpServer>();
tcpRtpServer->start<RtpSession>(port); rtpServer->start(port);
return rtpServer->getPort();
//创建rtp udp服务器
auto ret = tcpRtpServer->getPort();
udpRtpServer = std::make_shared<UdpRecver>();
udpRtpServer->initSock(port);
return ret;
} catch (std::exception &ex) { } catch (std::exception &ex) {
tcpRtpServer.reset(); rtpServer.reset();
udpRtpServer.reset();
WarnL << ex.what(); WarnL << ex.what();
return 0; return 0;
} }
......
...@@ -42,6 +42,12 @@ public: ...@@ -42,6 +42,12 @@ public:
_on_seek = cb; _on_seek = cb;
_on_seek_data = user_data; _on_seek_data = user_data;
} }
void setOnRegist(on_mk_media_source_regist cb, void *user_data){
_on_regist = cb;
_on_regist_data = user_data;
}
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override{ bool close(MediaSource &sender,bool force) override{
...@@ -70,12 +76,21 @@ protected: ...@@ -70,12 +76,21 @@ protected:
int totalReaderCount(MediaSource &sender) override{ int totalReaderCount(MediaSource &sender) override{
return _channel->totalReaderCount(); return _channel->totalReaderCount();
} }
void onRegist(MediaSource &sender, bool regist) override{
if (_on_regist) {
_on_regist(_on_regist_data, &sender, regist);
}
}
private: private:
DevChannel::Ptr _channel; DevChannel::Ptr _channel;
on_mk_media_close _on_close = nullptr; on_mk_media_close _on_close = nullptr;
on_mk_media_seek _on_seek = nullptr; on_mk_media_seek _on_seek = nullptr;
on_mk_media_source_regist _on_regist = nullptr;
void *_on_seek_data; void *_on_seek_data;
void *_on_close_data; void *_on_close_data;
void *_on_regist_data;
}; };
API_EXPORT void API_CALL mk_media_set_on_close(mk_media ctx, on_mk_media_close cb, void *user_data){ API_EXPORT void API_CALL mk_media_set_on_close(mk_media ctx, on_mk_media_close cb, void *user_data){
...@@ -90,6 +105,12 @@ API_EXPORT void API_CALL mk_media_set_on_seek(mk_media ctx, on_mk_media_seek cb, ...@@ -90,6 +105,12 @@ API_EXPORT void API_CALL mk_media_set_on_seek(mk_media ctx, on_mk_media_seek cb,
(*obj)->setOnSeek(cb, user_data); (*obj)->setOnSeek(cb, user_data);
} }
API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source_regist cb, void *user_data){
assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
(*obj)->setOnRegist(cb, user_data);
}
API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx){ API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx){
assert(ctx); assert(ctx);
MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
...@@ -157,6 +178,16 @@ API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, u ...@@ -157,6 +178,16 @@ API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, u
(*obj)->getChannel()->inputAAC((char *) data, len, dts, (char *) adts); (*obj)->getChannel()->inputAAC((char *) data, len, dts, (char *) adts);
} }
API_EXPORT void API_CALL mk_media_input_pcm(mk_media ctx, void *data , int len, uint32_t pts){
#ifdef ENABLE_FAAC
assert(ctx && data && len > 0);
MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx;
(*obj)->getChannel()->inputPCM((char*)data, len, pts);
#else
WarnL << "aac编码未启用,该方法无效,编译时请打开ENABLE_FAAC选项";
#endif //ENABLE_FAAC
}
API_EXPORT void API_CALL mk_media_input_g711(mk_media ctx, void* data, int len, uint32_t dts){ API_EXPORT void API_CALL mk_media_input_g711(mk_media ctx, void* data, int len, uint32_t dts){
assert(ctx && data && len > 0); assert(ctx && data && len > 0);
MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx;
......
...@@ -122,6 +122,8 @@ rootPath=./www ...@@ -122,6 +122,8 @@ rootPath=./www
sendBufSize=65536 sendBufSize=65536
#https服务器监听端口 #https服务器监听端口
sslport=443 sslport=443
#是否显示文件夹菜单,开启后可以浏览文件夹
dirMenu=1
[multicast] [multicast]
#rtp组播截止组播ip地址 #rtp组播截止组播ip地址
......
{
"id": "95afe791-f716-426e-99c4-a797e112ab2c",
"name": "127.0.0.1",
"values": [
{
"key": "ZLMediaKit_URL",
"value": "127.0.0.1",
"enabled": true
},
{
"key": "ZLMediaKit_secret",
"value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc",
"enabled": true
},
{
"key": "defaultVhost",
"value": "__defaultVhost__",
"enabled": true
}
],
"_postman_variable_scope": "environment",
"_postman_exported_at": "2020-07-11T15:16:04.479Z",
"_postman_exported_using": "Postman/7.27.1"
}
\ No newline at end of file
把这两个json文件导入postman就可以愉快的测试ZLMediaKit的restful接口了
\ No newline at end of file
...@@ -240,6 +240,22 @@ int FFmpegSource::totalReaderCount(MediaSource &sender) { ...@@ -240,6 +240,22 @@ int FFmpegSource::totalReaderCount(MediaSource &sender) {
return sender.readerCount(); return sender.readerCount();
} }
void FFmpegSource::onNoneReader(MediaSource &sender){
auto listener = _listener.lock();
if(listener){
listener->onNoneReader(sender);
return;
}
MediaSourceEvent::onNoneReader(sender);
}
void FFmpegSource::onRegist(MediaSource &sender, bool regist){
auto listener = _listener.lock();
if(listener){
listener->onRegist(sender, regist);
}
}
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
_listener = src->getListener(); _listener = src->getListener();
src->setListener(shared_from_this()); src->setListener(shared_from_this());
......
...@@ -61,6 +61,9 @@ private: ...@@ -61,6 +61,9 @@ private:
//MediaSourceEvent override //MediaSourceEvent override
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
void onNoneReader(MediaSource &sender) override;
void onRegist(MediaSource &sender, bool regist) override;
private: private:
Process _process; Process _process;
Timer::Ptr _timer; Timer::Ptr _timer;
......
...@@ -34,6 +34,9 @@ ...@@ -34,6 +34,9 @@
#include "Thread/WorkThreadPool.h" #include "Thread/WorkThreadPool.h"
#include "Rtp/RtpSelector.h" #include "Rtp/RtpSelector.h"
#include "FFmpegSource.h" #include "FFmpegSource.h"
#if defined(ENABLE_RTPPROXY)
#include "Rtp/RtpServer.h"
#endif
using namespace Json; using namespace Json;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
...@@ -244,15 +247,24 @@ bool checkArgs(Args &&args,First &&first,KeyTypes && ...keys){ ...@@ -244,15 +247,24 @@ bool checkArgs(Args &&args,First &&first,KeyTypes && ...keys){
} \ } \
} }
//拉流代理器列表
static unordered_map<string ,PlayerProxy::Ptr> s_proxyMap; static unordered_map<string ,PlayerProxy::Ptr> s_proxyMap;
static recursive_mutex s_proxyMapMtx; static recursive_mutex s_proxyMapMtx;
static inline string getProxyKey(const string &vhost,const string &app,const string &stream){
return vhost + "/" + app + "/" + stream;
}
//FFmpeg拉流代理器列表
static unordered_map<string ,FFmpegSource::Ptr> s_ffmpegMap; static unordered_map<string ,FFmpegSource::Ptr> s_ffmpegMap;
static recursive_mutex s_ffmpegMapMtx; static recursive_mutex s_ffmpegMapMtx;
#if defined(ENABLE_RTPPROXY)
//rtp服务器列表
static unordered_map<string, RtpServer::Ptr> s_rtpServerMap;
static recursive_mutex s_rtpServerMapMtx;
#endif
static inline string getProxyKey(const string &vhost,const string &app,const string &stream){
return vhost + "/" + app + "/" + stream;
}
/** /**
* 安装api接口 * 安装api接口
* 所有api都支持GET和POST两种方式 * 所有api都支持GET和POST两种方式
...@@ -729,22 +741,68 @@ void installWebApi() { ...@@ -729,22 +741,68 @@ void installWebApi() {
}); });
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
api_regist1("/index/api/getSsrcInfo",[](API_ARGS1){ api_regist1("/index/api/getRtpInfo",[](API_ARGS1){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("ssrc"); CHECK_ARGS("stream_id");
uint32_t ssrc = 0;
stringstream ss(allArgs["ssrc"]);
ss >> std::hex >> ssrc;
auto process = RtpSelector::Instance().getProcess(ssrc,false); auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false);
if(!process){ if (!process) {
val["exist"] = false; val["exist"] = false;
return; return;
} }
val["exist"] = true; val["exist"] = true;
val["peer_ip"] = process->get_peer_ip(); val["peer_ip"] = process->get_peer_ip();
val["peer_port"] = process->get_peer_port(); val["peer_port"] = process->get_peer_port();
val["local_port"] = process->get_local_port();
val["local_ip"] = process->get_local_ip();
}); });
api_regist1("/index/api/openRtpServer",[](API_ARGS1){
CHECK_SECRET();
CHECK_ARGS("port", "enable_tcp", "stream_id");
auto stream_id = allArgs["stream_id"];
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
if(s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) {
//为了防止RtpProcess所有权限混乱的问题,不允许重复添加相同的stream_id
throw InvalidArgsException("该stream_id已存在");
}
RtpServer::Ptr server = std::make_shared<RtpServer>();
server->start(allArgs["port"], stream_id, allArgs["enable_tcp"].as<bool>());
server->setOnDetach([stream_id]() {
//设置rtp超时移除事件
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
s_rtpServerMap.erase(stream_id);
});
//保存对象
s_rtpServerMap.emplace(stream_id, server);
//回复json
val["port"] = server->getPort();
});
api_regist1("/index/api/closeRtpServer",[](API_ARGS1){
CHECK_SECRET();
CHECK_ARGS("stream_id");
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
val["hit"] = (int) s_rtpServerMap.erase(allArgs["stream_id"]);
});
api_regist1("/index/api/listRtpServer",[](API_ARGS1){
CHECK_SECRET();
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
for (auto &pr : s_rtpServerMap) {
Value obj;
obj["stream_id"] = pr.first;
obj["port"] = pr.second->getPort();
val["data"].append(obj);
}
});
#endif//ENABLE_RTPPROXY #endif//ENABLE_RTPPROXY
// 开始录制hls或MP4 // 开始录制hls或MP4
...@@ -1045,4 +1103,10 @@ void unInstallWebApi(){ ...@@ -1045,4 +1103,10 @@ void unInstallWebApi(){
lock_guard<recursive_mutex> lck(s_ffmpegMapMtx); lock_guard<recursive_mutex> lck(s_ffmpegMapMtx);
s_ffmpegMap.clear(); s_ffmpegMap.clear();
} }
{
#if defined(ENABLE_RTPPROXY)
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
s_rtpServerMap.clear();
#endif
}
} }
\ No newline at end of file
...@@ -336,6 +336,7 @@ void installWebHook(){ ...@@ -336,6 +336,7 @@ void installWebHook(){
//监听播放失败(未找到特定的流)事件 //监听播放失败(未找到特定的流)事件
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){
if(!hook_enable || hook_stream_not_found.empty()){ if(!hook_enable || hook_stream_not_found.empty()){
closePlayer();
return; return;
} }
auto body = make_json(args); auto body = make_json(args);
......
...@@ -20,13 +20,11 @@ ...@@ -20,13 +20,11 @@
#include "Network/TcpServer.h" #include "Network/TcpServer.h"
#include "Poller/EventPoller.h" #include "Poller/EventPoller.h"
#include "Common/config.h" #include "Common/config.h"
#include "Rtsp/UDPServer.h"
#include "Rtsp/RtspSession.h" #include "Rtsp/RtspSession.h"
#include "Rtp/RtpSession.h"
#include "Rtmp/RtmpSession.h" #include "Rtmp/RtmpSession.h"
#include "Shell/ShellSession.h" #include "Shell/ShellSession.h"
#include "Http/WebSocketSession.h" #include "Http/WebSocketSession.h"
#include "Rtp/UdpRecver.h" #include "Rtp/RtpServer.h"
#include "WebApi.h" #include "WebApi.h"
#include "WebHook.h" #include "WebHook.h"
...@@ -283,8 +281,7 @@ int start_main(int argc,char *argv[]) { ...@@ -283,8 +281,7 @@ int start_main(int argc,char *argv[]) {
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
//GB28181 rtp推流端口,支持UDP/TCP //GB28181 rtp推流端口,支持UDP/TCP
UdpRecver recver; RtpServer::Ptr rtpServer = std::make_shared<RtpServer>();
TcpServer::Ptr tcpRtpServer(new TcpServer());
#endif//defined(ENABLE_RTPPROXY) #endif//defined(ENABLE_RTPPROXY)
try { try {
...@@ -307,12 +304,8 @@ int start_main(int argc,char *argv[]) { ...@@ -307,12 +304,8 @@ int start_main(int argc,char *argv[]) {
if(shellPort) { shellSrv->start<ShellSession>(shellPort); } if(shellPort) { shellSrv->start<ShellSession>(shellPort); }
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
if(rtpPort){ //创建rtp服务器
//创建rtp udp服务器 if(rtpPort){ rtpServer->start(rtpPort); }
recver.initSock(rtpPort);
//创建rtp tcp服务器
tcpRtpServer->start<RtpSession>(rtpPort);
}
#endif//defined(ENABLE_RTPPROXY) #endif//defined(ENABLE_RTPPROXY)
}catch (std::exception &ex){ }catch (std::exception &ex){
......
...@@ -178,39 +178,44 @@ static void eraseIfEmpty(MAP &map, IT0 it0, IT1 it1, IT2 it2) { ...@@ -178,39 +178,44 @@ static void eraseIfEmpty(MAP &map, IT0 it0, IT1 it1, IT2 it2) {
} }
} }
} }
}; }
void MediaSource::findAsync_l(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, bool retry, const function<void(const MediaSource::Ptr &src)> &cb){ void MediaSource::findAsync_l(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, bool retry, const function<void(const MediaSource::Ptr &src)> &cb){
auto src = MediaSource::find_l(info._schema, info._vhost, info._app, info._streamid, true); auto src = MediaSource::find_l(info._schema, info._vhost, info._app, info._streamid, true);
if(src || !retry){ if (src || !retry) {
cb(src); cb(src);
return; return;
} }
void *listener_tag = session.get(); void *listener_tag = session.get();
weak_ptr<TcpSession> weakSession = session; weak_ptr<TcpSession> weakSession = session;
//广播未找到流,此时可以立即去拉流,这样还来得及
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,info, static_cast<SockInfo &>(*session));
//最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流
GET_CONFIG(int,maxWaitMS,General::kMaxStreamWaitTimeMS);
//若干秒后执行等待媒体注册超时回调 GET_CONFIG(int, maxWaitMS, General::kMaxStreamWaitTimeMS);
auto onRegistTimeout = session->getPoller()->doDelayTask(maxWaitMS,[cb,listener_tag](){ auto onTimeout = session->getPoller()->doDelayTask(maxWaitMS, [cb, listener_tag]() {
//取消监听该事件 //最多等待一定时间,如果这个时间内,流未注册上,那么返回未找到流
NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged); NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
cb(nullptr); cb(nullptr);
return 0; return 0;
}); });
auto onRegist = [listener_tag,weakSession,info,cb,onRegistTimeout](BroadcastMediaChangedArgs) { auto cancelAll = [onTimeout, listener_tag]() {
//取消延时任务,防止多次回调
onTimeout->cancel();
//取消媒体注册事件监听
NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
};
function<void()> closePlayer = [cb, cancelAll]() {
cancelAll();
//告诉播放器,流不存在,这样会立即断开播放器
cb(nullptr);
};
auto onRegist = [weakSession, info, cb, cancelAll](BroadcastMediaChangedArgs) {
auto strongSession = weakSession.lock(); auto strongSession = weakSession.lock();
if(!strongSession) { if (!strongSession) {
//自己已经销毁 //自己已经销毁
//取消延时任务,防止多次回调 cancelAll();
onRegistTimeout->cancel();
//取消事件监听
NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
return; return;
} }
...@@ -223,24 +228,24 @@ void MediaSource::findAsync_l(const MediaInfo &info, const std::shared_ptr<TcpSe ...@@ -223,24 +228,24 @@ void MediaSource::findAsync_l(const MediaInfo &info, const std::shared_ptr<TcpSe
return; return;
} }
//取消延时任务,防止多次回调 cancelAll();
onRegistTimeout->cancel();
//取消事件监听
NoticeCenter::Instance().delListener(listener_tag,Broadcast::kBroadcastMediaChanged);
//播发器请求的流终于注册上了,切换到自己的线程再回复 //播发器请求的流终于注册上了,切换到自己的线程再回复
strongSession->async([weakSession,info,cb](){ strongSession->async([weakSession, info, cb]() {
auto strongSession = weakSession.lock(); auto strongSession = weakSession.lock();
if(!strongSession) { if (!strongSession) {
return; return;
} }
DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid; DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid;
//再找一遍媒体源,一般能找到 //再找一遍媒体源,一般能找到
findAsync_l(info,strongSession,false,cb); findAsync_l(info, strongSession, false, cb);
}, false); }, false);
}; };
//监听媒体注册事件 //监听媒体注册事件
NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist); NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist);
//广播未找到流,此时可以立即去拉流,这样还来得及
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream, info, static_cast<SockInfo &>(*session), closePlayer);
} }
void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session,const function<void(const Ptr &src)> &cb){ void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session,const function<void(const Ptr &src)> &cb){
...@@ -295,7 +300,7 @@ void MediaSource::regist() { ...@@ -295,7 +300,7 @@ void MediaSource::regist() {
//注册该源,注册后服务器才能找到该源 //注册该源,注册后服务器才能找到该源
{ {
lock_guard<recursive_mutex> lock(g_mtxMediaSrc); lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
g_mapMediaSrc[_strSchema][_strVhost][_strApp][_strId] = shared_from_this(); g_mapMediaSrc[_strSchema][_strVhost][_strApp][_strId] = shared_from_this();
} }
_StrPrinter codec_info; _StrPrinter codec_info;
auto tracks = getTracks(true); auto tracks = getTracks(true);
...@@ -326,6 +331,11 @@ void MediaSource::regist() { ...@@ -326,6 +331,11 @@ void MediaSource::regist() {
InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId << " " << codec_info; InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId << " " << codec_info;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, true, *this); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, true, *this);
auto listener = _listener.lock();
if (listener) {
listener->onRegist(*this, true);
}
} }
//反注册该源 //反注册该源
...@@ -352,6 +362,11 @@ bool MediaSource::unregist() { ...@@ -352,6 +362,11 @@ bool MediaSource::unregist() {
if(ret){ if(ret){
InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId; InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, false, *this); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, false, *this);
auto listener = _listener.lock();
if (listener) {
listener->onRegist(*this, false);
}
} }
return ret; return ret;
} }
......
...@@ -53,9 +53,11 @@ public: ...@@ -53,9 +53,11 @@ public:
virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; }; virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return false; };
// 获取录制状态 // 获取录制状态
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }; virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
private:
// 通知无人观看 // 通知无人观看
void onNoneReader(MediaSource &sender); virtual void onNoneReader(MediaSource &sender);
//流注册或注销事件
virtual void onRegist(MediaSource &sender, bool regist) {};
private: private:
Timer::Ptr _async_close_timer; Timer::Ptr _async_close_timer;
}; };
......
...@@ -278,6 +278,22 @@ int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) { ...@@ -278,6 +278,22 @@ int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) {
return listener->totalReaderCount(sender); return listener->totalReaderCount(sender);
} }
void MultiMediaSourceMuxer::onNoneReader(MediaSource &sender){
auto listener = _listener.lock();
if (!listener) {
MediaSourceEvent::onNoneReader(sender);
return;
}
listener->onNoneReader(sender);
}
void MultiMediaSourceMuxer::onRegist(MediaSource &sender, bool regist){
auto listener = _listener.lock();
if (listener) {
listener->onRegist(sender, regist);
}
}
bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) {
return _muxer->setupRecord(sender,type,start,custom_path); return _muxer->setupRecord(sender,type,start,custom_path);
} }
......
...@@ -132,6 +132,19 @@ public: ...@@ -132,6 +132,19 @@ public:
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
/** /**
* 触发无人观看事件
* @param sender 触发者
*/
void onNoneReader(MediaSource &sender) override;
/**
* 媒体注册注销事件
* @param sender 触发者
* @param regist 是否为注册事件
*/
void onRegist(MediaSource &sender, bool regist) override;
/**
* 设置录制状态 * 设置录制状态
* @param type 录制类型 * @param type 录制类型
* @param start 开始或停止 * @param start 开始或停止
......
...@@ -176,6 +176,8 @@ bool DtsGenerator::getDts_l(uint32_t pts, uint32_t &dts){ ...@@ -176,6 +176,8 @@ bool DtsGenerator::getDts_l(uint32_t pts, uint32_t &dts){
_sorter_max_size = _frames_since_last_max_pts; _sorter_max_size = _frames_since_last_max_pts;
//我们记录P帧间时间间隔(也就是多个B帧时间戳增量累计) //我们记录P帧间时间间隔(也就是多个B帧时间戳增量累计)
_dts_pts_offset = (pts - _last_max_pts); _dts_pts_offset = (pts - _last_max_pts);
//除以2,防止dts大于pts
_dts_pts_offset /= 2;
} }
//遇到P帧或关键帧,连续B帧计数清零 //遇到P帧或关键帧,连续B帧计数清零
_frames_since_last_max_pts = 0; _frames_since_last_max_pts = 0;
......
...@@ -100,11 +100,15 @@ const string kCharSet = HTTP_FIELD"charSet"; ...@@ -100,11 +100,15 @@ const string kCharSet = HTTP_FIELD"charSet";
const string kRootPath = HTTP_FIELD"rootPath"; const string kRootPath = HTTP_FIELD"rootPath";
//http 404错误提示内容 //http 404错误提示内容
const string kNotFound = HTTP_FIELD"notFound"; const string kNotFound = HTTP_FIELD"notFound";
//是否显示文件夹菜单
const string kDirMenu = HTTP_FIELD"dirMenu";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kSendBufSize] = 64 * 1024; mINI::Instance()[kSendBufSize] = 64 * 1024;
mINI::Instance()[kMaxReqSize] = 4*1024; mINI::Instance()[kMaxReqSize] = 4*1024;
mINI::Instance()[kKeepAliveSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15;
mINI::Instance()[kDirMenu] = true;
#if defined(_WIN32) #if defined(_WIN32)
mINI::Instance()[kCharSet] = "gb2312"; mINI::Instance()[kCharSet] = "gb2312";
#else #else
......
...@@ -111,7 +111,7 @@ extern const string kBroadcastFlowReport; ...@@ -111,7 +111,7 @@ extern const string kBroadcastFlowReport;
//未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 //未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了
extern const string kBroadcastNotFoundStream; extern const string kBroadcastNotFoundStream;
#define BroadcastNotFoundStreamArgs const MediaInfo &args,SockInfo &sender #define BroadcastNotFoundStreamArgs const MediaInfo &args,SockInfo &sender, const function<void()> &closePlayer
//某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑 //某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑
extern const string kBroadcastStreamNoneReader; extern const string kBroadcastStreamNoneReader;
...@@ -193,6 +193,8 @@ extern const string kCharSet; ...@@ -193,6 +193,8 @@ extern const string kCharSet;
extern const string kRootPath; extern const string kRootPath;
//http 404错误提示内容 //http 404错误提示内容
extern const string kNotFound; extern const string kNotFound;
//是否显示文件夹菜单
extern const string kDirMenu;
}//namespace Http }//namespace Http
////////////SHELL配置/////////// ////////////SHELL配置///////////
......
...@@ -138,14 +138,18 @@ public: ...@@ -138,14 +138,18 @@ public:
if (_cfg.empty()) { if (_cfg.empty()) {
//未获取到aac_cfg信息 //未获取到aac_cfg信息
if (frame->prefixSize()) { if (frame->prefixSize()) {
//7个字节的adts头 //根据7个字节的adts头生成aac config
_cfg = makeAacConfig((uint8_t *) (frame->data()), frame->prefixSize()); _cfg = makeAacConfig((uint8_t *) (frame->data()), frame->prefixSize());
onReady(); onReady();
} else { } else {
WarnL << "无法获取adts头!"; WarnL << "无法获取adts头!";
} }
} }
AudioTrack::inputFrame(frame);
if (frame->size() > frame->prefixSize()) {
//除adts头外,有实际负载
AudioTrack::inputFrame(frame);
}
} }
private: private:
/** /**
......
...@@ -32,8 +32,10 @@ static string getAacCfg(const RtmpPacket &thiz) { ...@@ -32,8 +32,10 @@ static string getAacCfg(const RtmpPacket &thiz) {
bool AACRtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt, bool) { bool AACRtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt, bool) {
if (pkt->isCfgFrame()) { if (pkt->isCfgFrame()) {
_aac_cfg = getAacCfg(*pkt); _aac_cfg = getAacCfg(*pkt);
onGetAAC(nullptr, 0, 0);
return false; return false;
} }
if (!_aac_cfg.empty()) { if (!_aac_cfg.empty()) {
onGetAAC(pkt->strBuf.data() + 2, pkt->strBuf.size() - 2, pkt->timeStamp); onGetAAC(pkt->strBuf.data() + 2, pkt->strBuf.size() - 2, pkt->timeStamp);
} }
...@@ -42,7 +44,6 @@ bool AACRtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt, bool) { ...@@ -42,7 +44,6 @@ bool AACRtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt, bool) {
void AACRtmpDecoder::onGetAAC(const char* data, int len, uint32_t stamp) { void AACRtmpDecoder::onGetAAC(const char* data, int len, uint32_t stamp) {
auto frame = ResourcePoolHelper<AACFrame>::obtainObj(); auto frame = ResourcePoolHelper<AACFrame>::obtainObj();
//生成adts头 //生成adts头
char adts_header[32] = {0}; char adts_header[32] = {0};
auto size = dumpAacConfig(_aac_cfg, len, (uint8_t *) adts_header, sizeof(adts_header)); auto size = dumpAacConfig(_aac_cfg, len, (uint8_t *) adts_header, sizeof(adts_header));
...@@ -54,12 +55,16 @@ void AACRtmpDecoder::onGetAAC(const char* data, int len, uint32_t stamp) { ...@@ -54,12 +55,16 @@ void AACRtmpDecoder::onGetAAC(const char* data, int len, uint32_t stamp) {
frame->_prefix_size = 0; frame->_prefix_size = 0;
} }
//追加负载数据 if(len > 0){
frame->_buffer.append(data, len); //追加负载数据
frame->_dts = stamp; frame->_buffer.append(data, len);
frame->_dts = stamp;
}
//写入环形缓存 if(size > 0 || len > 0){
RtmpCodec::inputFrame(frame); //有adts头或者实际aac负载
RtmpCodec::inputFrame(frame);
}
} }
///////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////
......
...@@ -118,7 +118,7 @@ RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) { ...@@ -118,7 +118,7 @@ RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) {
case CodecH265 : return std::make_shared<H265RtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved); case CodecH265 : return std::make_shared<H265RtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecAAC : return std::make_shared<AACRtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved); case CodecAAC : return std::make_shared<AACRtpEncoder>(ssrc,mtu,sample_rate,pt,interleaved);
case CodecG711A : case CodecG711A :
case CodecG711U : return std::make_shared<G711RtpEncoder>(ssrc, mtu, sample_rate, pt, interleaved); case CodecG711U : return std::make_shared<G711RtpEncoder>(codec_id, ssrc, mtu, sample_rate, pt, interleaved);
default : WarnL << "暂不支持该CodecId:" << codec_id; return nullptr; default : WarnL << "暂不支持该CodecId:" << codec_id; return nullptr;
} }
} }
...@@ -129,7 +129,7 @@ RtpCodec::Ptr Factory::getRtpDecoderByTrack(const Track::Ptr &track) { ...@@ -129,7 +129,7 @@ RtpCodec::Ptr Factory::getRtpDecoderByTrack(const Track::Ptr &track) {
case CodecH265 : return std::make_shared<H265RtpDecoder>(); case CodecH265 : return std::make_shared<H265RtpDecoder>();
case CodecAAC : return std::make_shared<AACRtpDecoder>(track->clone()); case CodecAAC : return std::make_shared<AACRtpDecoder>(track->clone());
case CodecG711A : case CodecG711A :
case CodecG711U : return std::make_shared<G711RtpDecoder>(track->clone()); case CodecG711U : return std::make_shared<G711RtpDecoder>(track->getCodecId());
default : WarnL << "暂不支持该CodecId:" << track->getCodecName(); return nullptr; default : WarnL << "暂不支持该CodecId:" << track->getCodecName(); return nullptr;
} }
} }
...@@ -243,9 +243,8 @@ RtmpCodec::Ptr Factory::getRtmpCodecByTrack(const Track::Ptr &track, bool is_enc ...@@ -243,9 +243,8 @@ RtmpCodec::Ptr Factory::getRtmpCodecByTrack(const Track::Ptr &track, bool is_enc
AMFValue Factory::getAmfByCodecId(CodecId codecId) { AMFValue Factory::getAmfByCodecId(CodecId codecId) {
switch (codecId){ switch (codecId){
//此处用string标明rtmp编码类型目的是为了兼容某些android系统 case CodecAAC: return AMFValue(FLV_CODEC_AAC);
case CodecAAC: return AMFValue("mp4a"); case CodecH264: return AMFValue(FLV_CODEC_H264);
case CodecH264: return AMFValue("avc1");
case CodecH265: return AMFValue(FLV_CODEC_H265); case CodecH265: return AMFValue(FLV_CODEC_H265);
case CodecG711A: return AMFValue(FLV_CODEC_G711A); case CodecG711A: return AMFValue(FLV_CODEC_G711A);
case CodecG711U: return AMFValue(FLV_CODEC_G711U); case CodecG711U: return AMFValue(FLV_CODEC_G711U);
......
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
namespace mediakit{ namespace mediakit{
G711RtpDecoder::G711RtpDecoder(const Track::Ptr &track){ G711RtpDecoder::G711RtpDecoder(CodecId codecid){
_codecid = track->getCodecId(); _codecid = codecid;
_frame = obtainFrame(); _frame = obtainFrame();
} }
...@@ -59,16 +59,10 @@ void G711RtpDecoder::onGetG711(const G711Frame::Ptr &frame) { ...@@ -59,16 +59,10 @@ void G711RtpDecoder::onGetG711(const G711Frame::Ptr &frame) {
///////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////
G711RtpEncoder::G711RtpEncoder(uint32_t ui32Ssrc, G711RtpEncoder::G711RtpEncoder(CodecId codecid, uint32_t ui32Ssrc, uint32_t ui32MtuSize,
uint32_t ui32MtuSize, uint32_t ui32SampleRate, uint8_t ui8PayloadType, uint8_t ui8Interleaved) :
uint32_t ui32SampleRate, G711RtpDecoder(codecid),
uint8_t ui8PayloadType, RtpInfo(ui32Ssrc, ui32MtuSize, ui32SampleRate, ui8PayloadType, ui8Interleaved) {
uint8_t ui8Interleaved) :
RtpInfo(ui32Ssrc,
ui32MtuSize,
ui32SampleRate,
ui8PayloadType,
ui8Interleaved) {
} }
void G711RtpEncoder::inputFrame(const Frame::Ptr &frame) { void G711RtpEncoder::inputFrame(const Frame::Ptr &frame) {
......
...@@ -21,7 +21,7 @@ class G711RtpDecoder : public RtpCodec , public ResourcePoolHelper<G711Frame> { ...@@ -21,7 +21,7 @@ class G711RtpDecoder : public RtpCodec , public ResourcePoolHelper<G711Frame> {
public: public:
typedef std::shared_ptr<G711RtpDecoder> Ptr; typedef std::shared_ptr<G711RtpDecoder> Ptr;
G711RtpDecoder(const Track::Ptr &track); G711RtpDecoder(CodecId codecid);
~G711RtpDecoder() {} ~G711RtpDecoder() {}
/** /**
...@@ -35,9 +35,6 @@ public: ...@@ -35,9 +35,6 @@ public:
return _codecid; return _codecid;
} }
protected:
G711RtpDecoder() {}
private: private:
void onGetG711(const G711Frame::Ptr &frame); void onGetG711(const G711Frame::Ptr &frame);
G711Frame::Ptr obtainFrame(); G711Frame::Ptr obtainFrame();
...@@ -61,7 +58,8 @@ public: ...@@ -61,7 +58,8 @@ public:
* @param ui8PayloadType pt类型 * @param ui8PayloadType pt类型
* @param ui8Interleaved rtsp interleaved 值 * @param ui8Interleaved rtsp interleaved 值
*/ */
G711RtpEncoder(uint32_t ui32Ssrc, G711RtpEncoder(CodecId codecid,
uint32_t ui32Ssrc,
uint32_t ui32MtuSize, uint32_t ui32MtuSize,
uint32_t ui32SampleRate, uint32_t ui32SampleRate,
uint8_t ui8PayloadType = 0, uint8_t ui8PayloadType = 0,
......
...@@ -228,11 +228,6 @@ private: ...@@ -228,11 +228,6 @@ private:
} }
break; break;
case H264Frame::NAL_SEI:{
//忽略SEI
break;
}
default: default:
VideoTrack::inputFrame(frame); VideoTrack::inputFrame(frame);
break; break;
......
...@@ -166,6 +166,9 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -166,6 +166,9 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
auto pcData = frame->data() + frame->prefixSize(); auto pcData = frame->data() + frame->prefixSize();
auto iLen = frame->size() - frame->prefixSize(); auto iLen = frame->size() - frame->prefixSize();
auto type = H264_TYPE(((uint8_t*)pcData)[0]); auto type = H264_TYPE(((uint8_t*)pcData)[0]);
if(type == H264Frame::NAL_SEI){
return;
}
if (!_gotSpsPps) { if (!_gotSpsPps) {
//尝试从frame中获取sps pps //尝试从frame中获取sps pps
...@@ -187,10 +190,6 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -187,10 +190,6 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
} }
} }
if(type == H264Frame::NAL_SEI){
return;
}
if(_lastPacket && _lastPacket->timeStamp != frame->dts()) { if(_lastPacket && _lastPacket->timeStamp != frame->dts()) {
RtmpCodec::inputRtmp(_lastPacket, _lastPacket->isVideoKeyFrame()); RtmpCodec::inputRtmp(_lastPacket, _lastPacket->isVideoKeyFrame());
_lastPacket = nullptr; _lastPacket = nullptr;
......
...@@ -234,7 +234,7 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -234,7 +234,7 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) {
bool mark = false; bool mark = false;
int nOffset = 1; int nOffset = 1;
while (!mark) { while (!mark) {
if (iLen < nOffset + iSize) { if (iLen <= nOffset + iSize) {
//已经拆分结束 //已经拆分结束
iSize = iLen - nOffset; iSize = iLen - nOffset;
mark = true; mark = true;
...@@ -274,4 +274,4 @@ void H264RtpEncoder::makeH264Rtp(int nal_type,const void* data, unsigned int len ...@@ -274,4 +274,4 @@ void H264RtpEncoder::makeH264Rtp(int nal_type,const void* data, unsigned int len
RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && nal_type == H264Frame::NAL_IDR); RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && nal_type == H264Frame::NAL_IDR);
} }
}//namespace mediakit }//namespace mediakit
\ No newline at end of file
...@@ -166,7 +166,7 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { ...@@ -166,7 +166,7 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) {
bool mark = false; bool mark = false;
int nOffset = 2; int nOffset = 2;
while (!mark) { while (!mark) {
if (iLen < nOffset + maxSize) { //是否拆分结束 if (iLen <= nOffset + maxSize) { //是否拆分结束
maxSize = iLen - nOffset; maxSize = iLen - nOffset;
mark = true; mark = true;
//FU end //FU end
...@@ -208,4 +208,4 @@ void H265RtpEncoder::makeH265Rtp(int nal_type,const void* data, unsigned int len ...@@ -208,4 +208,4 @@ void H265RtpEncoder::makeH265Rtp(int nal_type,const void* data, unsigned int len
RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && H265Frame::isKeyFrame(nal_type)); RtpCodec::inputRtp(makeRtp(getTrackType(),data,len,mark,uiStamp),first_packet && H265Frame::isKeyFrame(nal_type));
} }
}//namespace mediakit }//namespace mediakit
\ No newline at end of file
...@@ -195,6 +195,11 @@ static string searchIndexFile(const string &dir){ ...@@ -195,6 +195,11 @@ static string searchIndexFile(const string &dir){
} }
static bool makeFolderMenu(const string &httpPath, const string &strFullPath, string &strRet) { static bool makeFolderMenu(const string &httpPath, const string &strFullPath, string &strRet) {
GET_CONFIG(bool, dirMenu, Http::kDirMenu);
if(!dirMenu){
//不允许浏览文件夹
return false;
}
string strPathPrefix(strFullPath); string strPathPrefix(strFullPath);
string last_dir_name; string last_dir_name;
if(strPathPrefix.back() == '/'){ if(strPathPrefix.back() == '/'){
......
...@@ -93,6 +93,7 @@ void MP4File::openFile(const char *file,const char *mode) { ...@@ -93,6 +93,7 @@ void MP4File::openFile(const char *file,const char *mode) {
//创建智能指针 //创建智能指针
_file.reset(fp,[file_buf](FILE *fp) { _file.reset(fp,[file_buf](FILE *fp) {
fflush(fp);
fclose(fp); fclose(fp);
}); });
} }
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#ifdef ENABLE_MP4 #ifdef ENABLE_MP4
#include "MP4Muxer.h" #include "MP4Muxer.h"
#include "Util/File.h" #include "Util/File.h"
#include "Extension/H264.h"
namespace mediakit{ namespace mediakit{
MP4Muxer::MP4Muxer(const char *file) { MP4Muxer::MP4Muxer(const char *file) {
...@@ -65,7 +66,12 @@ void MP4Muxer::inputFrame(const Frame::Ptr &frame) { ...@@ -65,7 +66,12 @@ void MP4Muxer::inputFrame(const Frame::Ptr &frame) {
int64_t dts_out, pts_out; int64_t dts_out, pts_out;
switch (frame->getCodecId()) { switch (frame->getCodecId()) {
case CodecH264: case CodecH264: {
int type = H264_TYPE(*((uint8_t *)frame->data() + frame->prefixSize()));
if(type == H264Frame::NAL_SEI){
break;
}
}
case CodecH265: { case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理,
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
......
...@@ -25,6 +25,8 @@ namespace mediakit{ ...@@ -25,6 +25,8 @@ namespace mediakit{
class MP4Muxer : public MediaSinkInterface, public MP4File{ class MP4Muxer : public MediaSinkInterface, public MP4File{
public: public:
typedef std::shared_ptr<MP4Muxer> Ptr;
MP4Muxer(const char *file); MP4Muxer(const char *file);
~MP4Muxer() override; ~MP4Muxer() override;
...@@ -42,9 +44,13 @@ public: ...@@ -42,9 +44,13 @@ public:
*/ */
void resetTracks() override ; void resetTracks() override ;
/**
* 手动关闭文件(对象析构时会自动关闭)
*/
void closeMP4();
private: private:
void openMP4(); void openMP4();
void closeMP4();
void stampSync(); void stampSync();
private: private:
......
...@@ -75,7 +75,7 @@ void MP4Recorder::asyncClose() { ...@@ -75,7 +75,7 @@ void MP4Recorder::asyncClose() {
//获取文件录制时间,放在关闭mp4之前是为了忽略关闭mp4执行时间 //获取文件录制时间,放在关闭mp4之前是为了忽略关闭mp4执行时间
const_cast<MP4Info&>(info).ui64TimeLen = ::time(NULL) - info.ui64StartedTime; const_cast<MP4Info&>(info).ui64TimeLen = ::time(NULL) - info.ui64StartedTime;
//关闭mp4非常耗时,所以要放在后台线程执行 //关闭mp4非常耗时,所以要放在后台线程执行
const_cast<MP4Muxer::Ptr &>(muxer).reset(); muxer->closeMP4();
//临时文件名改成正式文件名,防止mp4未完成时被访问 //临时文件名改成正式文件名,防止mp4未完成时被访问
rename(strFileTmp.data(),strFile.data()); rename(strFileTmp.data(),strFile.data());
//获取文件大小 //获取文件大小
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#if defined(ENABLE_HLS) #if defined(ENABLE_HLS)
#include "mpeg-ts-proto.h" #include "mpeg-ts-proto.h"
#include "mpeg-ts.h" #include "mpeg-ts.h"
#include "Extension/H264.h"
namespace mediakit { namespace mediakit {
...@@ -89,8 +90,13 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { ...@@ -89,8 +90,13 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) {
int64_t dts_out, pts_out; int64_t dts_out, pts_out;
_is_idr_fast_packet = !_have_video; _is_idr_fast_packet = !_have_video;
switch (frame->getCodecId()){ switch (frame->getCodecId()){
case CodecH265:
case CodecH264: { case CodecH264: {
int type = H264_TYPE(*((uint8_t *)frame->data() + frame->prefixSize()));
if(type == H264Frame::NAL_SEI){
break;
}
}
case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, //这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理,
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) { if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
Frame::Ptr back = _frameCached.back(); Frame::Ptr back = _frameCached.back();
......
...@@ -106,6 +106,7 @@ void RtmpDemuxer::makeVideoTrack(const AMFValue &videoCodec) { ...@@ -106,6 +106,7 @@ void RtmpDemuxer::makeVideoTrack(const AMFValue &videoCodec) {
//设置rtmp解码器代理,生成的frame写入该Track //设置rtmp解码器代理,生成的frame写入该Track
_videoRtmpDecoder->addDelegate(_videoTrack); _videoRtmpDecoder->addDelegate(_videoTrack);
onAddTrack(_videoTrack); onAddTrack(_videoTrack);
_tryedGetVideoTrack = true;
} else { } else {
//找不到相应的rtmp解码器,该track无效 //找不到相应的rtmp解码器,该track无效
_videoTrack.reset(); _videoTrack.reset();
...@@ -123,6 +124,7 @@ void RtmpDemuxer::makeAudioTrack(const AMFValue &audioCodec,int sample_rate, int ...@@ -123,6 +124,7 @@ void RtmpDemuxer::makeAudioTrack(const AMFValue &audioCodec,int sample_rate, int
//设置rtmp解码器代理,生成的frame写入该Track //设置rtmp解码器代理,生成的frame写入该Track
_audioRtmpDecoder->addDelegate(_audioTrack); _audioRtmpDecoder->addDelegate(_audioTrack);
onAddTrack(_audioTrack); onAddTrack(_audioTrack);
_tryedGetAudioTrack = true;
} else { } else {
//找不到相应的rtmp解码器,该track无效 //找不到相应的rtmp解码器,该track无效
_audioTrack.reset(); _audioTrack.reset();
......
...@@ -257,8 +257,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -257,8 +257,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
invoke.clear(); invoke.clear();
invoke << "onMetaData" << metadata; invoke << "onMetaData" << metadata;
sendResponse(MSG_DATA, invoke.data()); sendResponse(MSG_DATA, invoke.data());
auto duration = metadata["duration"].as_number(); auto duration = metadata["duration"];
if(duration > 0){ if(duration && duration.as_number() > 0){
//这是点播,使用绝对时间戳 //这是点播,使用绝对时间戳
_stamp[0].setPlayBack(); _stamp[0].setPlayBack();
_stamp[1].setPlayBack(); _stamp[1].setPlayBack();
...@@ -380,7 +380,12 @@ string RtmpSession::getStreamId(const string &str){ ...@@ -380,7 +380,12 @@ string RtmpSession::getStreamId(const string &str){
//vlc和ffplay在播放 rtmp://127.0.0.1/record/0.mp4时, //vlc和ffplay在播放 rtmp://127.0.0.1/record/0.mp4时,
//传过来的url会是rtmp://127.0.0.1/record/mp4:0, //传过来的url会是rtmp://127.0.0.1/record/mp4:0,
//我们在这里还原成0.mp4 //我们在这里还原成0.mp4
stream_id = stream_id.substr(pos + 1) + "." + stream_id.substr(0,pos); //实际使用时发现vlc,mpv等会传过来rtmp://127.0.0.1/record/mp4:0.mp4,这里做个判断
auto ext = stream_id.substr(0,pos);
stream_id = stream_id.substr(pos + 1);
if(stream_id.find(ext) == string::npos){
stream_id = stream_id + "." + ext;
}
} }
if(params.empty()){ if(params.empty()){
......
...@@ -158,6 +158,11 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d ...@@ -158,6 +158,11 @@ void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d
} }
case PSI_STREAM_AAC: { case PSI_STREAM_AAC: {
uint8_t *ptr = (uint8_t *)data;
if(!(bytes > 7 && ptr[0] == 0xFF && (ptr[1] & 0xF0) == 0xF0)){
//这不是aac
break;
}
if (!_codecid_audio) { if (!_codecid_audio) {
//获取到音频 //获取到音频
_codecid_audio = codecid; _codecid_audio = codecid;
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
#include <stdint.h> #include <stdint.h>
#include <memory> #include <memory>
#include <functional> #include <functional>
#include "Decoder.h"
#include "Common/MediaSink.h" #include "Common/MediaSink.h"
using namespace std; using namespace std;
......
...@@ -16,32 +16,15 @@ ...@@ -16,32 +16,15 @@
namespace mediakit{ namespace mediakit{
string printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc);
uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
for (int i = 0; i < 4; i++) {
sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
}
return tmp;
}
static string printAddress(const struct sockaddr *addr){ static string printAddress(const struct sockaddr *addr){
return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
} }
RtpProcess::RtpProcess(uint32_t ssrc) { RtpProcess::RtpProcess(const string &stream_id) {
_ssrc = ssrc;
_track = std::make_shared<SdpTrack>();
_track->_interleaved = 0;
_track->_samplerate = 90000;
_track->_type = TrackVideo;
_track->_ssrc = _ssrc;
_media_info._schema = RTP_APP_NAME; _media_info._schema = RTP_APP_NAME;
_media_info._vhost = DEFAULT_VHOST; _media_info._vhost = DEFAULT_VHOST;
_media_info._app = RTP_APP_NAME; _media_info._app = RTP_APP_NAME;
_media_info._streamid = printSSRC(_ssrc); _media_info._streamid = stream_id;
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
{ {
...@@ -73,11 +56,6 @@ RtpProcess::RtpProcess(uint32_t ssrc) { ...@@ -73,11 +56,6 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
} }
RtpProcess::~RtpProcess() { RtpProcess::~RtpProcess() {
DebugP(this);
if (_addr) {
delete _addr;
}
uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000; uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000;
WarnP(this) << "RTP推流器(" WarnP(this) << "RTP推流器("
<< _media_info._vhost << "/" << _media_info._vhost << "/"
...@@ -90,6 +68,11 @@ RtpProcess::~RtpProcess() { ...@@ -90,6 +68,11 @@ RtpProcess::~RtpProcess() {
if (_total_bytes > iFlowThreshold * 1024) { if (_total_bytes > iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this)); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast<SockInfo &>(*this));
} }
if (_addr) {
delete _addr;
_addr = nullptr;
}
} }
bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
...@@ -115,7 +98,7 @@ bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_le ...@@ -115,7 +98,7 @@ bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_le
} }
_total_bytes += data_len; _total_bytes += data_len;
bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len); bool ret = handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len);
if(dts_out){ if(dts_out){
*dts_out = _dts; *dts_out = _dts;
} }
...@@ -128,7 +111,7 @@ static inline bool checkTS(const uint8_t *packet, int bytes){ ...@@ -128,7 +111,7 @@ static inline bool checkTS(const uint8_t *packet, int bytes){
} }
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
if(rtp->sequence != _sequence + 1 && _sequence != 0){ if(rtp->sequence != _sequence + (uint16_t)1 && _sequence != 0){
WarnP(this) << "rtp丢包:" << rtp->sequence << " != " << _sequence << "+1" << ",公网环境下请使用tcp方式推流"; WarnP(this) << "rtp丢包:" << rtp->sequence << " != " << _sequence << "+1" << ",公网环境下请使用tcp方式推流";
} }
_sequence = rtp->sequence; _sequence = rtp->sequence;
...@@ -188,6 +171,16 @@ bool RtpProcess::alive() { ...@@ -188,6 +171,16 @@ bool RtpProcess::alive() {
return false; return false;
} }
void RtpProcess::onDetach(){
if(_on_detach){
_on_detach();
}
}
void RtpProcess::setOnDetach(const function<void()> &cb) {
_on_detach = cb;
}
string RtpProcess::get_peer_ip() { string RtpProcess::get_peer_ip() {
if(_addr){ if(_addr){
return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
......
...@@ -22,15 +22,39 @@ using namespace mediakit; ...@@ -22,15 +22,39 @@ using namespace mediakit;
namespace mediakit{ namespace mediakit{
string printSSRC(uint32_t ui32Ssrc);
class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this<RtpProcess>{ class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this<RtpProcess>{
public: public:
typedef std::shared_ptr<RtpProcess> Ptr; typedef std::shared_ptr<RtpProcess> Ptr;
RtpProcess(uint32_t ssrc); RtpProcess(const string &stream_id);
~RtpProcess(); ~RtpProcess();
/**
* 输入rtp
* @param sock 本地监听的socket
* @param data rtp数据指针
* @param data_len rtp数据长度
* @param addr 数据源地址
* @param dts_out 解析出最新的dts
* @return 是否解析成功
*/
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr);
/**
* 是否超时,用于超时移除对象
*/
bool alive(); bool alive();
/**
* 超时时被RtpSelector移除时触发
*/
void onDetach();
/**
* 设置onDetach事件回调
*/
void setOnDetach(const function<void()> &cb);
/// SockInfo override
string get_local_ip() override; string get_local_ip() override;
uint16_t get_local_port() override; uint16_t get_local_port() override;
string get_peer_ip() override; string get_peer_ip() override;
...@@ -54,8 +78,6 @@ private: ...@@ -54,8 +78,6 @@ private:
std::shared_ptr<FILE> _save_file_rtp; std::shared_ptr<FILE> _save_file_rtp;
std::shared_ptr<FILE> _save_file_ps; std::shared_ptr<FILE> _save_file_ps;
std::shared_ptr<FILE> _save_file_video; std::shared_ptr<FILE> _save_file_video;
uint32_t _ssrc;
SdpTrack::Ptr _track;
struct sockaddr *_addr = nullptr; struct sockaddr *_addr = nullptr;
uint16_t _sequence = 0; uint16_t _sequence = 0;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
...@@ -66,6 +88,7 @@ private: ...@@ -66,6 +88,7 @@ private:
MediaInfo _media_info; MediaInfo _media_info;
uint64_t _total_bytes = 0; uint64_t _total_bytes = 0;
Socket::Ptr _sock; Socket::Ptr _sock;
function<void()> _on_detach;
}; };
}//namespace mediakit }//namespace mediakit
......
...@@ -15,37 +15,41 @@ namespace mediakit{ ...@@ -15,37 +15,41 @@ namespace mediakit{
INSTANCE_IMP(RtpSelector); INSTANCE_IMP(RtpSelector);
bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,
const struct sockaddr *addr,uint32_t *dts_out) {
//使用ssrc为流id
uint32_t ssrc = 0; uint32_t ssrc = 0;
if(!getSSRC(data,data_len,ssrc)){ if (!getSSRC(data, data_len, ssrc)) {
WarnL << "get ssrc from rtp failed:" << data_len; WarnL << "get ssrc from rtp failed:" << data_len;
return false; return false;
} }
auto process = getProcess(ssrc, true);
if(process){ //假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流)
return process->inputRtp(sock, data,data_len, addr,dts_out); auto process = getProcess(printSSRC(ssrc), true);
if (process) {
return process->inputRtp(sock, data, data_len, addr, dts_out);
} }
return false; return false;
} }
bool RtpSelector::getSSRC(const char *data,int data_len, uint32_t &ssrc){ bool RtpSelector::getSSRC(const char *data,int data_len, uint32_t &ssrc){
if(data_len < 12){ if (data_len < 12) {
return false; return false;
} }
uint32_t *ssrc_ptr = (uint32_t *)(data + 8); uint32_t *ssrc_ptr = (uint32_t *) (data + 8);
ssrc = ntohl(*ssrc_ptr); ssrc = ntohl(*ssrc_ptr);
return true; return true;
} }
RtpProcess::Ptr RtpSelector::getProcess(uint32_t ssrc,bool makeNew) { RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map); lock_guard<decltype(_mtx_map)> lck(_mtx_map);
auto it = _map_rtp_process.find(ssrc); auto it = _map_rtp_process.find(stream_id);
if(it == _map_rtp_process.end() && !makeNew){ if (it == _map_rtp_process.end() && !makeNew) {
return nullptr; return nullptr;
} }
RtpProcessHelper::Ptr &ref = _map_rtp_process[ssrc]; RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id];
if(!ref){ if (!ref) {
ref = std::make_shared<RtpProcessHelper>(ssrc,shared_from_this()); ref = std::make_shared<RtpProcessHelper>(stream_id, shared_from_this());
ref->attachEvent(); ref->attachEvent();
createTimer(); createTimer();
} }
...@@ -67,18 +71,18 @@ void RtpSelector::createTimer() { ...@@ -67,18 +71,18 @@ void RtpSelector::createTimer() {
} }
} }
void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) { void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map); lock_guard<decltype(_mtx_map)> lck(_mtx_map);
auto it = _map_rtp_process.find(ssrc); auto it = _map_rtp_process.find(stream_id);
if(it == _map_rtp_process.end()){ if (it == _map_rtp_process.end()) {
return; return;
} }
if (it->second->getProcess().get() != ptr) {
if(it->second->getProcess().get() != ptr){
return; return;
} }
auto process = it->second->getProcess();
_map_rtp_process.erase(it); _map_rtp_process.erase(it);
process->onDetach();
} }
void RtpSelector::onManager() { void RtpSelector::onManager() {
...@@ -88,8 +92,10 @@ void RtpSelector::onManager() { ...@@ -88,8 +92,10 @@ void RtpSelector::onManager() {
++it; ++it;
continue; continue;
} }
WarnL << "RtpProcess timeout:" << printSSRC(it->first); WarnL << "RtpProcess timeout:" << it->first;
auto process = it->second->getProcess();
it = _map_rtp_process.erase(it); it = _map_rtp_process.erase(it);
process->onDetach();
} }
} }
...@@ -99,10 +105,10 @@ RtpSelector::RtpSelector() { ...@@ -99,10 +105,10 @@ RtpSelector::RtpSelector() {
RtpSelector::~RtpSelector() { RtpSelector::~RtpSelector() {
} }
RtpProcessHelper::RtpProcessHelper(uint32_t ssrc, const weak_ptr<RtpSelector> &parent) { RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector> &parent) {
_ssrc = ssrc; _stream_id = stream_id;
_parent = parent; _parent = parent;
_process = std::make_shared<RtpProcess>(_ssrc); _process = std::make_shared<RtpProcess>(stream_id);
} }
RtpProcessHelper::~RtpProcessHelper() { RtpProcessHelper::~RtpProcessHelper() {
...@@ -114,14 +120,14 @@ void RtpProcessHelper::attachEvent() { ...@@ -114,14 +120,14 @@ void RtpProcessHelper::attachEvent() {
bool RtpProcessHelper::close(MediaSource &sender, bool force) { bool RtpProcessHelper::close(MediaSource &sender, bool force) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_process || (!force && _process->totalReaderCount())){ if (!_process || (!force && _process->totalReaderCount())) {
return false; return false;
} }
auto parent = _parent.lock(); auto parent = _parent.lock();
if(!parent){ if (!parent) {
return false; return false;
} }
parent->delProcess(_ssrc,_process.get()); parent->delProcess(_stream_id, _process.get());
WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
return true; return true;
} }
......
...@@ -24,19 +24,21 @@ class RtpSelector; ...@@ -24,19 +24,21 @@ class RtpSelector;
class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> { class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> {
public: public:
typedef std::shared_ptr<RtpProcessHelper> Ptr; typedef std::shared_ptr<RtpProcessHelper> Ptr;
RtpProcessHelper(uint32_t ssrc,const weak_ptr<RtpSelector > &parent); RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector > &parent);
~RtpProcessHelper(); ~RtpProcessHelper();
void attachEvent(); void attachEvent();
RtpProcess::Ptr & getProcess(); RtpProcess::Ptr & getProcess();
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
// 观看总人数 // 观看总人数
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
private: private:
weak_ptr<RtpSelector > _parent; weak_ptr<RtpSelector > _parent;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;
uint32_t _ssrc = 0; string _stream_id;
}; };
class RtpSelector : public std::enable_shared_from_this<RtpSelector>{ class RtpSelector : public std::enable_shared_from_this<RtpSelector>{
...@@ -44,16 +46,42 @@ public: ...@@ -44,16 +46,42 @@ public:
RtpSelector(); RtpSelector();
~RtpSelector(); ~RtpSelector();
static RtpSelector &Instance();
bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr );
static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); static bool getSSRC(const char *data,int data_len, uint32_t &ssrc);
RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); static RtpSelector &Instance();
void delProcess(uint32_t ssrc,const RtpProcess *ptr);
/**
* 输入多个rtp流,根据ssrc分流
* @param sock 本地socket
* @param data 收到的数据
* @param data_len 收到的数据长度
* @param addr rtp流源地址
* @param dts_out 解析出最新的dts
* @return 是否成功
*/
bool inputRtp(const Socket::Ptr &sock, const char *data, int data_len,
const struct sockaddr *addr, uint32_t *dts_out = nullptr);
/**
* 获取一个rtp处理器
* @param stream_id 流id
* @param makeNew 不存在时是否新建
* @return rtp处理器
*/
RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew);
/**
* 删除rtp处理器
* @param stream_id 流id
* @param ptr rtp处理器指针
*/
void delProcess(const string &stream_id, const RtpProcess *ptr);
private: private:
void onManager(); void onManager();
void createTimer(); void createTimer();
private: private:
unordered_map<uint32_t,RtpProcessHelper::Ptr> _map_rtp_process; unordered_map<string,RtpProcessHelper::Ptr> _map_rtp_process;
recursive_mutex _mtx_map; recursive_mutex _mtx_map;
Timer::Ptr _timer; Timer::Ptr _timer;
}; };
......
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "RtpServer.h"
#include "RtpSelector.h"
namespace mediakit{
RtpServer::RtpServer() {
}
RtpServer::~RtpServer() {
if(_on_clearup){
_on_clearup();
}
}
void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) {
//创建udp服务器
Socket::Ptr udp_server = std::make_shared<Socket>(nullptr, false);
if (!udp_server->bindUdpSock(local_port, local_ip)) {
throw std::runtime_error(StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true));
}
//设置udp socket读缓存
SockUtil::setRecvBuf(udp_server->rawFD(), 4 * 1024 * 1024);
TcpServer::Ptr tcp_server;
if (enable_tcp) {
try {
//创建tcp服务器
tcp_server = std::make_shared<TcpServer>(udp_server->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id;
tcp_server->start<RtpSession>(udp_server->get_local_port(), local_ip);
} catch (...) {
throw;
}
}
RtpProcess::Ptr process;
if (!stream_id.empty()) {
//指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流)
process = RtpSelector::Instance().getProcess(stream_id, true);
udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
process->inputRtp(udp_server, buf->data(), buf->size(), addr);
});
} else {
//未指定流id,一个端口多个流,通过ssrc来分流
auto &ref = RtpSelector::Instance();
udp_server->setOnRead([&ref, udp_server](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
ref.inputRtp(udp_server, buf->data(), buf->size(), addr);
});
}
_on_clearup = [udp_server, process, stream_id]() {
//去除循环引用
udp_server->setOnRead(nullptr);
if (process) {
//删除rtp处理器
RtpSelector::Instance().delProcess(stream_id, process.get());
}
};
_tcp_server = tcp_server;
_udp_server = udp_server;
_rtp_process = process;
}
void RtpServer::setOnDetach(const function<void()> &cb){
if(_rtp_process){
_rtp_process->setOnDetach(cb);
}
}
EventPoller::Ptr RtpServer::getPoller() {
return _udp_server->getPoller();
}
uint16_t RtpServer::getPort() {
return _udp_server ? _udp_server->get_local_port() : 0;
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)
\ No newline at end of file
...@@ -8,33 +8,62 @@ ...@@ -8,33 +8,62 @@
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#ifndef ZLMEDIAKIT_UDPRECVER_H #ifndef ZLMEDIAKIT_RTPSERVER_H
#define ZLMEDIAKIT_UDPRECVER_H #define ZLMEDIAKIT_RTPSERVER_H
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include <memory> #include <memory>
#include "Network/Socket.h" #include "Network/Socket.h"
#include "Network/TcpServer.h"
#include "RtpSession.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
namespace mediakit{ namespace mediakit{
/** /**
* 组播接收器 * RTP服务器,支持UDP/TCP
*/ */
class UdpRecver { class RtpServer {
public: public:
typedef std::shared_ptr<UdpRecver> Ptr; typedef std::shared_ptr<RtpServer> Ptr;
typedef function<void(const Buffer::Ptr &buf)> onRecv; typedef function<void(const Buffer::Ptr &buf)> onRecv;
UdpRecver(); RtpServer();
virtual ~UdpRecver(); ~RtpServer();
bool initSock(uint16_t local_port,const char *local_ip = "0.0.0.0");
/**
* 开启服务器,可能抛异常
* @param local_port 本地端口,0时为随机端口
* @param stream_id 流id,置空则使用ssrc
* @param enable_tcp 是否启用tcp服务器
* @param local_ip 绑定的本地网卡ip
*/
void start(uint16_t local_port, const string &stream_id = "", bool enable_tcp = true, const char *local_ip = "0.0.0.0");
/**
* 获取绑定的本地端口
*/
uint16_t getPort();
/**
* 获取绑定的线程
*/
EventPoller::Ptr getPoller(); EventPoller::Ptr getPoller();
/**
* 设置RtpProcess onDetach事件回调
*/
void setOnDetach(const function<void()> &cb);
protected: protected:
Socket::Ptr _sock; Socket::Ptr _udp_server;
TcpServer::Ptr _tcp_server;
RtpProcess::Ptr _rtp_process;
function<void()> _on_clearup;
}; };
}//namespace mediakit }//namespace mediakit
#endif//defined(ENABLE_RTPPROXY) #endif//defined(ENABLE_RTPPROXY)
#endif //ZLMEDIAKIT_UDPRECVER_H #endif //ZLMEDIAKIT_RTPSERVER_H
...@@ -11,8 +11,15 @@ ...@@ -11,8 +11,15 @@
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "RtpSession.h" #include "RtpSession.h"
#include "RtpSelector.h" #include "RtpSelector.h"
#include "Network/TcpServer.h"
namespace mediakit{ namespace mediakit{
const string RtpSession::kStreamID = "stream_id";
void RtpSession::attachServer(const TcpServer &server) {
_stream_id = const_cast<TcpServer &>(server)[kStreamID];
}
RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) { RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
DebugP(this); DebugP(this);
socklen_t addr_len = sizeof(addr); socklen_t addr_len = sizeof(addr);
...@@ -21,7 +28,7 @@ RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) { ...@@ -21,7 +28,7 @@ RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
RtpSession::~RtpSession() { RtpSession::~RtpSession() {
DebugP(this); DebugP(this);
if(_process){ if(_process){
RtpSelector::Instance().delProcess(_ssrc,_process.get()); RtpSelector::Instance().delProcess(_stream_id,_process.get());
} }
} }
...@@ -36,7 +43,7 @@ void RtpSession::onRecv(const Buffer::Ptr &data) { ...@@ -36,7 +43,7 @@ void RtpSession::onRecv(const Buffer::Ptr &data) {
} }
void RtpSession::onError(const SockException &err) { void RtpSession::onError(const SockException &err) {
WarnL << _ssrc << " " << err.what(); WarnL << _stream_id << " " << err.what();
} }
void RtpSession::onManager() { void RtpSession::onManager() {
...@@ -51,13 +58,19 @@ void RtpSession::onManager() { ...@@ -51,13 +58,19 @@ void RtpSession::onManager() {
void RtpSession::onRtpPacket(const char *data, uint64_t len) { void RtpSession::onRtpPacket(const char *data, uint64_t len) {
if (!_process) { if (!_process) {
if (!RtpSelector::getSSRC(data + 2, len - 2, _ssrc)) { uint32_t ssrc;
if (!RtpSelector::getSSRC(data + 2, len - 2, ssrc)) {
return; return;
} }
_process = RtpSelector::Instance().getProcess(_ssrc, true); if (_stream_id.empty()) {
//未指定流id就使用ssrc为流id
_stream_id = printSSRC(ssrc);
}
//tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess
_process = RtpSelector::Instance().getProcess(_stream_id, true);
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this())); _process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
} }
_process->inputRtp(_sock,data + 2, len - 2, &addr); _process->inputRtp(_sock, data + 2, len - 2, &addr);
_ticker.resetTime(); _ticker.resetTime();
} }
......
...@@ -22,22 +22,26 @@ namespace mediakit{ ...@@ -22,22 +22,26 @@ namespace mediakit{
class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{ class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{
public: public:
static const string kStreamID;
RtpSession(const Socket::Ptr &sock); RtpSession(const Socket::Ptr &sock);
~RtpSession() override; ~RtpSession() override;
void onRecv(const Buffer::Ptr &) override; void onRecv(const Buffer::Ptr &) override;
void onError(const SockException &err) override; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
void attachServer(const TcpServer &server) override;
protected: protected:
// 通知其停止推流 // 通知其停止推流
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
// 观看总人数 // 观看总人数
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
void onRtpPacket(const char *data,uint64_t len) override; void onRtpPacket(const char *data,uint64_t len) override;
private: private:
uint32_t _ssrc = 0;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;
Ticker _ticker; Ticker _ticker;
struct sockaddr addr; struct sockaddr addr;
string _stream_id;
}; };
}//namespace mediakit }//namespace mediakit
......
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "UdpRecver.h"
#include "RtpSelector.h"
namespace mediakit{
UdpRecver::UdpRecver() {
}
UdpRecver::~UdpRecver() {
if(_sock){
_sock->setOnRead(nullptr);
}
}
bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) {
_sock.reset(new Socket(nullptr, false));
onceToken token(nullptr,[&](){
SockUtil::setRecvBuf(_sock->rawFD(),4 * 1024 * 1024);
});
auto &ref = RtpSelector::Instance();
auto sock = _sock;
_sock->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int ){
ref.inputRtp(sock, buf->data(),buf->size(),addr);
});
return _sock->bindUdpSock(local_port,local_ip);
}
EventPoller::Ptr UdpRecver::getPoller() {
return _sock->getPoller();
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)
\ No newline at end of file
...@@ -27,7 +27,7 @@ namespace mediakit { ...@@ -27,7 +27,7 @@ namespace mediakit {
RtpReceiver::RtpReceiver() {} RtpReceiver::RtpReceiver() {}
RtpReceiver::~RtpReceiver() {} RtpReceiver::~RtpReceiver() {}
bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) {
if(rtp_raw_len < 12){ if(rtp_raw_len < 12){
WarnL << "rtp包太小:" << rtp_raw_len; WarnL << "rtp包太小:" << rtp_raw_len;
return false; return false;
...@@ -46,8 +46,8 @@ bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned ch ...@@ -46,8 +46,8 @@ bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned ch
auto rtp_ptr = _rtp_pool.obtain(); auto rtp_ptr = _rtp_pool.obtain();
auto &rtp = *rtp_ptr; auto &rtp = *rtp_ptr;
rtp.type = track->_type; rtp.type = type;
rtp.interleaved = 2 * track->_type; rtp.interleaved = 2 * type;
rtp.mark = rtp_raw_ptr[1] >> 7; rtp.mark = rtp_raw_ptr[1] >> 7;
rtp.PT = rtp_raw_ptr[1] & 0x7F; rtp.PT = rtp_raw_ptr[1] & 0x7F;
...@@ -59,29 +59,29 @@ bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned ch ...@@ -59,29 +59,29 @@ bool RtpReceiver::handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned ch
memcpy(&rtp.timeStamp, rtp_raw_ptr + 4, 4); memcpy(&rtp.timeStamp, rtp_raw_ptr + 4, 4);
rtp.timeStamp = ntohl(rtp.timeStamp); rtp.timeStamp = ntohl(rtp.timeStamp);
if(!track->_samplerate){ if(!samplerate){
//无法把时间戳转换成毫秒 //无法把时间戳转换成毫秒
return false; return false;
} }
//时间戳转换成毫秒 //时间戳转换成毫秒
rtp.timeStamp = rtp.timeStamp * 1000LL / track->_samplerate; rtp.timeStamp = rtp.timeStamp * 1000LL / samplerate;
//ssrc,内存对齐 //ssrc,内存对齐
memcpy(&rtp.ssrc, rtp_raw_ptr + 8, 4); memcpy(&rtp.ssrc, rtp_raw_ptr + 8, 4);
rtp.ssrc = ntohl(rtp.ssrc); rtp.ssrc = ntohl(rtp.ssrc);
if (track->_ssrc != rtp.ssrc) { if (_ssrc[track_index] != rtp.ssrc) {
if (track->_ssrc == 0) { if (_ssrc[track_index] == 0) {
//保存SSRC至track对象 //保存SSRC至track对象
track->_ssrc = rtp.ssrc; _ssrc[track_index] = rtp.ssrc;
}else{ }else{
//ssrc错误 //ssrc错误
WarnL << "ssrc错误:" << rtp.ssrc << " != " << track->_ssrc; WarnL << "ssrc错误:" << rtp.ssrc << " != " << _ssrc[track_index];
if (_ssrc_err_count[track_index]++ > 10) { if (_ssrc_err_count[track_index]++ > 10) {
//ssrc切换后清除老数据 //ssrc切换后清除老数据
WarnL << "ssrc更换:" << track->_ssrc << " -> " << rtp.ssrc; WarnL << "ssrc更换:" << _ssrc[track_index] << " -> " << rtp.ssrc;
_rtp_sort_cache_map[track_index].clear(); _rtp_sort_cache_map[track_index].clear();
track->_ssrc = rtp.ssrc; _ssrc[track_index] = rtp.ssrc;
} }
return false; return false;
} }
...@@ -168,11 +168,12 @@ void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){ ...@@ -168,11 +168,12 @@ void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){
} }
void RtpReceiver::clear() { void RtpReceiver::clear() {
CLEAR_ARR(_last_seq) CLEAR_ARR(_last_seq);
CLEAR_ARR(_ssrc_err_count) CLEAR_ARR(_ssrc);
CLEAR_ARR(_seq_ok_count) CLEAR_ARR(_ssrc_err_count);
CLEAR_ARR(_sort_started) CLEAR_ARR(_seq_ok_count);
CLEAR_ARR(_seq_cycle_count) CLEAR_ARR(_sort_started);
CLEAR_ARR(_seq_cycle_count);
_rtp_sort_cache_map[0].clear(); _rtp_sort_cache_map[0].clear();
_rtp_sort_cache_map[1].clear(); _rtp_sort_cache_map[1].clear();
......
...@@ -32,12 +32,13 @@ protected: ...@@ -32,12 +32,13 @@ protected:
/** /**
* 输入数据指针生成并排序rtp包 * 输入数据指针生成并排序rtp包
* @param track_index track下标索引 * @param track_index track下标索引
* @param track sdp track相关信息 * @param type track类型
* @param samplerate rtp时间戳基准时钟,视频为90000,音频为采样率
* @param rtp_raw_ptr rtp数据指针 * @param rtp_raw_ptr rtp数据指针
* @param rtp_raw_len rtp数据指针长度 * @param rtp_raw_len rtp数据指针长度
* @return 解析成功返回true * @return 解析成功返回true
*/ */
bool handleOneRtp(int track_index,SdpTrack::Ptr &track, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len); bool handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len);
/** /**
* rtp数据包排序后输出 * rtp数据包排序后输出
...@@ -49,9 +50,12 @@ protected: ...@@ -49,9 +50,12 @@ protected:
void setPoolSize(int size); void setPoolSize(int size);
int getJitterSize(int track_index); int getJitterSize(int track_index);
int getCycleCount(int track_index); int getCycleCount(int track_index);
private: private:
void sortRtp(const RtpPacket::Ptr &rtp , int track_index); void sortRtp(const RtpPacket::Ptr &rtp , int track_index);
private: private:
uint32_t _ssrc[2] = { 0, 0 };
//ssrc不匹配计数 //ssrc不匹配计数
uint32_t _ssrc_err_count[2] = { 0, 0 }; uint32_t _ssrc_err_count[2] = { 0, 0 };
//上次seq //上次seq
......
...@@ -404,4 +404,14 @@ std::pair<Socket::Ptr, Socket::Ptr> makeSockPair(const EventPoller::Ptr &poller, ...@@ -404,4 +404,14 @@ std::pair<Socket::Ptr, Socket::Ptr> makeSockPair(const EventPoller::Ptr &poller,
} }
} }
string printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc);
uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
for (int i = 0; i < 4; i++) {
sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
}
return tmp;
}
}//namespace mediakit }//namespace mediakit
\ No newline at end of file
...@@ -272,6 +272,7 @@ private: ...@@ -272,6 +272,7 @@ private:
}; };
std::pair<Socket::Ptr, Socket::Ptr> makeSockPair(const EventPoller::Ptr &poller, const string &local_ip); std::pair<Socket::Ptr, Socket::Ptr> makeSockPair(const EventPoller::Ptr &poller, const string &local_ip);
string printSSRC(uint32_t ui32Ssrc);
} //namespace mediakit } //namespace mediakit
#endif //RTSP_RTSP_H_ #endif //RTSP_RTSP_H_
...@@ -319,7 +319,8 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) ...@@ -319,7 +319,8 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr); WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return; return;
} }
strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_sdp_track[uiTrackIndex], (unsigned char *) buf->data(), buf->size()); strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_sdp_track[uiTrackIndex]->_type,
strongSelf->_sdp_track[uiTrackIndex]->_samplerate, (unsigned char *) buf->data(), buf->size());
}); });
if(pRtcpSockRef) { if(pRtcpSockRef) {
...@@ -464,14 +465,10 @@ void RtspPlayer::onRtpPacket(const char *data, uint64_t len) { ...@@ -464,14 +465,10 @@ void RtspPlayer::onRtpPacket(const char *data, uint64_t len) {
uint8_t interleaved = data[1]; uint8_t interleaved = data[1];
if(interleaved %2 == 0){ if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved); trackIdx = getTrackIndexByInterleaved(interleaved);
if (trackIdx != -1) { handleOneRtp(trackIdx, _sdp_track[trackIdx]->_type, _sdp_track[trackIdx]->_samplerate, (unsigned char *)data + 4, len - 4);
handleOneRtp(trackIdx, _sdp_track[trackIdx], (unsigned char *)data + 4, len - 4);
}
}else{ }else{
trackIdx = getTrackIndexByInterleaved(interleaved - 1); trackIdx = getTrackIndexByInterleaved(interleaved - 1);
if (trackIdx != -1) { onRtcpPacket(trackIdx, _sdp_track[trackIdx], (unsigned char *) data + 4, len - 4);
onRtcpPacket(trackIdx, _sdp_track[trackIdx], (unsigned char *) data + 4, len - 4);
}
} }
} }
...@@ -712,10 +709,7 @@ void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &tra ...@@ -712,10 +709,7 @@ void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &tra
_rtp_recv_ticker.resetTime(); _rtp_recv_ticker.resetTime();
onRecvRTP(pkt, track); onRecvRTP(pkt, track);
int iTrackIndex = getTrackIndexByInterleaved(pkt->interleaved); int iTrackIndex = getTrackIndexByTrackType(pkt->type);
if (iTrackIndex == -1) {
return;
}
RtcpCounter &counter = _rtcp_counter[iTrackIndex]; RtcpCounter &counter = _rtcp_counter[iTrackIndex];
counter.pktCnt = pkt->sequence; counter.pktCnt = pkt->sequence;
auto &ticker = _rtcp_send_ticker[iTrackIndex]; auto &ticker = _rtcp_send_ticker[iTrackIndex];
...@@ -788,7 +782,7 @@ int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const { ...@@ -788,7 +782,7 @@ int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const {
if (_sdp_track.size() == 1) { if (_sdp_track.size() == 1) {
return 0; return 0;
} }
return -1; throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
} }
int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const { int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const {
...@@ -800,7 +794,7 @@ int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const { ...@@ -800,7 +794,7 @@ int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const {
if (_sdp_track.size() == 1) { if (_sdp_track.size() == 1) {
return 0; return 0;
} }
return -1; throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) trackType);
} }
} /* namespace mediakit */ } /* namespace mediakit */
...@@ -347,7 +347,7 @@ inline int RtspPusher::getTrackIndexByTrackType(TrackType type) { ...@@ -347,7 +347,7 @@ inline int RtspPusher::getTrackIndexByTrackType(TrackType type) {
if(_aTrackInfo.size() == 1){ if(_aTrackInfo.size() == 1){
return 0; return 0;
} }
return -1; throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type);
} }
void RtspPusher::sendRecord() { void RtspPusher::sendRecord() {
......
...@@ -59,51 +59,31 @@ public: ...@@ -59,51 +59,31 @@ public:
//在请求明文密码时如果提供md5密码者则会导致认证失败 //在请求明文密码时如果提供md5密码者则会导致认证失败
typedef std::function<void(bool encrypted,const string &pwd_or_md5)> onAuth; typedef std::function<void(bool encrypted,const string &pwd_or_md5)> onAuth;
RtspSession(const Socket::Ptr &pSock); RtspSession(const Socket::Ptr &sock);
virtual ~RtspSession(); virtual ~RtspSession();
////TcpSession override//// ////TcpSession override////
void onRecv(const Buffer::Ptr &pBuf) override; void onRecv(const Buffer::Ptr &buf) override;
void onError(const SockException &err) override; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
protected: protected:
//RtspSplitter override /////RtspSplitter override/////
/** //收到完整的rtsp包回调,包括sdp等content数据
* 收到完整的rtsp包回调,包括sdp等content数据
* @param parser rtsp包
*/
void onWholeRtspPacket(Parser &parser) override; void onWholeRtspPacket(Parser &parser) override;
//收到rtp包回调
/** void onRtpPacket(const char *data, uint64_t len) override;
* 收到rtp包回调 //从rtsp头中获取Content长度
* @param data
* @param len
*/
void onRtpPacket(const char *data,uint64_t len) override;
/**
* 从rtsp头中获取Content长度
* @param parser
* @return
*/
int64_t getContentLength(Parser &parser) override; int64_t getContentLength(Parser &parser) override;
////RtpReceiver override////
//RtpReceiver override void onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) override;
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; ////MediaSourceEvent override////
//MediaSourceEvent override bool close(MediaSource &sender, bool force) override ;
bool close(MediaSource &sender,bool force) override ;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
/////TcpSession override////
//TcpSession override
int send(const Buffer::Ptr &pkt) override; int send(const Buffer::Ptr &pkt) override;
//收到RTCP包回调
virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len);
/**
* 收到RTCP包回调
* @param iTrackidx
* @param track
* @param pucData
* @param uiLen
*/
virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
private: private:
//处理options方法,获取服务器能力 //处理options方法,获取服务器能力
void handleReq_Options(const Parser &parser); void handleReq_Options(const Parser &parser);
...@@ -127,103 +107,102 @@ private: ...@@ -127,103 +107,102 @@ private:
void handleReq_Post(const Parser &parser); void handleReq_Post(const Parser &parser);
//处理SET_PARAMETER、GET_PARAMETER方法,一般用于心跳 //处理SET_PARAMETER、GET_PARAMETER方法,一般用于心跳
void handleReq_SET_PARAMETER(const Parser &parser); void handleReq_SET_PARAMETER(const Parser &parser);
//rtsp资源未找到 //rtsp资源未找到
void inline send_StreamNotFound(); void send_StreamNotFound();
//不支持的传输模式 //不支持的传输模式
void inline send_UnsupportedTransport(); void send_UnsupportedTransport();
//会话id错误 //会话id错误
void inline send_SessionNotFound(); void send_SessionNotFound();
//一般rtsp服务器打开端口失败时触发 //一般rtsp服务器打开端口失败时触发
void inline send_NotAcceptable(); void send_NotAcceptable();
//ssrc转字符串
inline string printSSRC(uint32_t ui32Ssrc);
//获取track下标 //获取track下标
inline int getTrackIndexByTrackType(TrackType type); int getTrackIndexByTrackType(TrackType type);
inline int getTrackIndexByControlSuffix(const string &controlSuffix); int getTrackIndexByControlSuffix(const string &control_suffix);
inline int getTrackIndexByInterleaved(int interleaved); int getTrackIndexByInterleaved(int interleaved);
//一般用于接收udp打洞包,也用于rtsp推流 //一般用于接收udp打洞包,也用于rtsp推流
inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr); void onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr);
//配合onRcvPeerUdpData使用 //配合onRcvPeerUdpData使用
inline void startListenPeerUdpData(int iTrackIdx); void startListenPeerUdpData(int track_idx);
////rtsp专有认证相关//// ////rtsp专有认证相关////
//认证成功 //认证成功
void onAuthSuccess(); void onAuthSuccess();
//认证失败 //认证失败
void onAuthFailed(const string &realm,const string &why,bool close = true); void onAuthFailed(const string &realm, const string &why, bool close = true);
//开始走rtsp专有认证流程 //开始走rtsp专有认证流程
void onAuthUser(const string &realm,const string &authorization); void onAuthUser(const string &realm, const string &authorization);
//校验base64方式的认证加密 //校验base64方式的认证加密
void onAuthBasic(const string &realm,const string &strBase64); void onAuthBasic(const string &realm, const string &auth_base64);
//校验md5方式的认证加密 //校验md5方式的认证加密
void onAuthDigest(const string &realm,const string &strMd5); void onAuthDigest(const string &realm, const string &auth_md5);
//触发url鉴权事件 //触发url鉴权事件
void emitOnPlay(); void emitOnPlay();
//发送rtp给客户端 //发送rtp给客户端
void sendRtpPacket(const RtspMediaSource::RingDataType &pkt); void sendRtpPacket(const RtspMediaSource::RingDataType &pkt);
//触发rtcp发送
void onSendRtpPacket(const RtpPacket::Ptr &rtp);
//回复客户端 //回复客户端
bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code, const std::initializer_list<string> &header, const string &sdp = "", const char *protocol = "RTSP/1.0");
bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code, const StrCaseMap &header = StrCaseMap(), const string &sdp = "", const char *protocol = "RTSP/1.0");
//服务器发送rtcp //服务器发送rtcp
void sendSenderReport(bool overTcp,int iTrackIndex); void sendSenderReport(bool over_tcp, int track_idx);
//设置socket标志 //设置socket标志
void setSocketFlags(); void setSocketFlags();
private: private:
//用于判断客户端是否超时 //是否已经触发on_play事件
Ticker _ticker; bool _emit_on_play = false;
//是否开始发送rtp
bool _enable_send_rtp;
//推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid;
//收到的seq,回复时一致 //收到的seq,回复时一致
int _iCseq = 0; int _cseq = 0;
//rtsp推流起始时间戳,目的是为了同步
int64_t _start_stamp[2] = {-1, -1};
//消耗的总流量
uint64_t _bytes_usage = 0;
//ContentBase //ContentBase
string _strContentBase; string _content_base;
//Session号 //Session号
string _strSession; string _sessionid;
//记录是否需要rtsp专属鉴权,防止重复触发事件 //记录是否需要rtsp专属鉴权,防止重复触发事件
string _rtsp_realm; string _rtsp_realm;
//是否已经触发on_play事件 //登录认证
bool _emit_on_play = false; string _auth_nonce;
//用于判断客户端是否超时
Ticker _alive_ticker;
//url解析后保存的相关信息 //url解析后保存的相关信息
MediaInfo _mediaInfo; MediaInfo _media_info;
//rtsp推流相关绑定的源
RtspMediaSourceImp::Ptr _push_src;
//rtsp播放器绑定的直播源 //rtsp播放器绑定的直播源
std::weak_ptr<RtspMediaSource> _pMediaSrc; std::weak_ptr<RtspMediaSource> _play_src;
//直播源读取器 //直播源读取器
RtspMediaSource::RingType::RingReader::Ptr _pRtpReader; RtspMediaSource::RingType::RingReader::Ptr _play_reader;
//推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid;
//sdp里面有效的track,包含音频或视频 //sdp里面有效的track,包含音频或视频
vector<SdpTrack::Ptr> _aTrackInfo; vector<SdpTrack::Ptr> _sdp_track;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _rtcp_counter[2];
//rtcp发送时间,trackid idx 为数组下标
Ticker _rtcp_send_tickers[2];
////////RTP over udp//////// ////////RTP over udp////////
//RTP端口,trackid idx 为数组下标 //RTP端口,trackid idx 为数组下标
Socket::Ptr _apRtpSock[2]; Socket::Ptr _rtp_socks[2];
//RTCP端口,trackid idx 为数组下标 //RTCP端口,trackid idx 为数组下标
Socket::Ptr _apRtcpSock[2]; Socket::Ptr _rtcp_socks[2];
//标记是否收到播放的udp打洞包,收到播放的udp打洞包后才能知道其外网udp端口号 //标记是否收到播放的udp打洞包,收到播放的udp打洞包后才能知道其外网udp端口号
unordered_set<int> _udpSockConnected; unordered_set<int> _udp_connected_flags;
////////RTP over udp_multicast//////// ////////RTP over udp_multicast////////
//共享的rtp组播对象 //共享的rtp组播对象
RtpMultiCaster::Ptr _multicaster; RtpMultiCaster::Ptr _multicaster;
////////RTSP over HTTP ////////
//登录认证
string _strNonce;
//消耗的总流量
uint64_t _ui64TotalBytes = 0;
//RTSP over HTTP
//quicktime 请求rtsp会产生两次tcp连接, //quicktime 请求rtsp会产生两次tcp连接,
//一次发送 get 一次发送post,需要通过x-sessioncookie关联起来 //一次发送 get 一次发送post,需要通过x-sessioncookie关联起来
string _http_x_sessioncookie; string _http_x_sessioncookie;
function<void(const Buffer::Ptr &pBuf)> _onRecv; function<void(const Buffer::Ptr &)> _on_recv;
//是否开始发送rtp
bool _enableSendRtp;
//rtsp推流相关
RtspMediaSourceImp::Ptr _pushSrc;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _aRtcpCnt[2];
//rtcp发送时间,trackid idx 为数组下标
Ticker _aRtcpTicker[2];
}; };
/** /**
......
...@@ -8,20 +8,38 @@ if (SDL2_FOUND) ...@@ -8,20 +8,38 @@ if (SDL2_FOUND)
message(STATUS "found library:${SDL2_LIBRARY}") message(STATUS "found library:${SDL2_LIBRARY}")
endif (SDL2_FOUND) endif (SDL2_FOUND)
find_package(PkgConfig QUIET)
#查找ffmpeg/libutil是否安装 #查找ffmpeg/libutil是否安装
find_package(AVUTIL QUIET) if(PKG_CONFIG_FOUND)
if(AVUTIL_FOUND) pkg_check_modules(AVUTIL QUIET IMPORTED_TARGET libavutil)
include_directories(${AVUTIL_INCLUDE_DIR}) if(AVUTIL_FOUND)
list(APPEND LINK_LIB_LIST ${AVUTIL_LIBRARIES}) list(APPEND LINK_LIB_LIST PkgConfig::AVUTIL)
message(STATUS "found library:${AVUTIL_LIBRARIES}") message(STATUS "found library:${AVUTIL_LIBRARY}")
endif()
else()
find_package(AVUTIL QUIET)
if(AVUTIL_FOUND)
include_directories(${AVUTIL_INCLUDE_DIR})
list(APPEND LINK_LIB_LIST ${AVUTIL_LIBRARIES})
message(STATUS "found library:${AVUTIL_LIBRARIES}")
endif()
endif() endif()
#查找ffmpeg/libavcodec是否安装 #查找ffmpeg/libavcodec是否安装
find_package(AVCODEC QUIET) if(PKG_CONFIG_FOUND)
if(AVCODEC_FOUND) pkg_check_modules(AVCODEC QUIET IMPORTED_TARGET libavcodec)
include_directories(${AVCODEC_INCLUDE_DIR}) if(AVCODEC_FOUND)
list(APPEND LINK_LIB_LIST ${AVCODEC_LIBRARIES}) list(APPEND LINK_LIB_LIST PkgConfig::AVCODEC)
message(STATUS "found library:${AVCODEC_LIBRARIES}") message(STATUS "found library:${AVCODEC_LIBRARY}")
endif()
else()
find_package(AVCODEC QUIET)
if(AVCODEC_FOUND)
include_directories(${AVCODEC_INCLUDE_DIR})
list(APPEND LINK_LIB_LIST ${AVCODEC_LIBRARIES})
message(STATUS "found library:${AVCODEC_LIBRARIES}")
endif()
endif() endif()
aux_source_directory(. TEST_SRC_LIST) aux_source_directory(. TEST_SRC_LIST)
......
...@@ -55,7 +55,7 @@ static bool loadFile(const char *path){ ...@@ -55,7 +55,7 @@ static bool loadFile(const char *path){
} }
uint32_t timeStamp; uint32_t timeStamp;
RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp); RtpSelector::Instance().inputRtp(nullptr, rtp, len, &addr, &timeStamp);
if(timeStamp_last){ if(timeStamp_last){
auto diff = timeStamp - timeStamp_last; auto diff = timeStamp - timeStamp_last;
if(diff > 0 && diff < 500){ if(diff > 0 && diff < 500){
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论