Commit 9498b96b by xiongziliang

确保rtp推流线程安全性

parent 80eb6dc4
...@@ -66,18 +66,6 @@ RtpProcess::~RtpProcess() { ...@@ -66,18 +66,6 @@ RtpProcess::~RtpProcess() {
} }
bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) { bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, size_t len, const struct sockaddr *addr, uint64_t *dts_out) {
auto is_busy = _busy_flag.test_and_set();
if (is_busy) {
//其他线程正在执行本函数
WarnP(this) << "其他线程正在执行本函数";
return false;
}
//没有其他线程执行本函数
onceToken token(nullptr, [&]() {
//本函数执行完毕时,释放状态
_busy_flag.clear();
});
if (!_sock) { if (!_sock) {
//第一次运行本函数 //第一次运行本函数
_sock = sock; _sock = sock;
......
...@@ -94,7 +94,6 @@ private: ...@@ -94,7 +94,6 @@ private:
ProcessInterface::Ptr _process; ProcessInterface::Ptr _process;
MultiMediaSourceMuxer::Ptr _muxer; MultiMediaSourceMuxer::Ptr _muxer;
std::atomic_bool _stop_rtp_check{false}; std::atomic_bool _stop_rtp_check{false};
std::atomic_flag _busy_flag{false};
toolkit::Ticker _last_check_alive; toolkit::Ticker _last_check_alive;
std::recursive_mutex _func_mtx; std::recursive_mutex _func_mtx;
std::deque<std::function<void()> > _cached_func; std::deque<std::function<void()> > _cached_func;
......
...@@ -25,25 +25,6 @@ void RtpSelector::clear(){ ...@@ -25,25 +25,6 @@ void RtpSelector::clear(){
_map_rtp_process.clear(); _map_rtp_process.clear();
} }
bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, size_t data_len, const struct sockaddr *addr,
uint64_t *dts_out) {
uint32_t ssrc = 0;
if (!getSSRC(data, data_len, ssrc)) {
WarnL << "get ssrc from rtp failed:" << data_len;
return false;
}
auto process = getProcess(printSSRC(ssrc), true);
if (process) {
try {
return process->inputRtp(true, sock, data, data_len, addr, dts_out);
} catch (...) {
delProcess(printSSRC(ssrc), process.get());
throw;
}
}
return false;
}
bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){
if (data_len < 12) { if (data_len < 12) {
return false; return false;
...@@ -59,6 +40,10 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { ...@@ -59,6 +40,10 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
if (it == _map_rtp_process.end() && !makeNew) { if (it == _map_rtp_process.end() && !makeNew) {
return nullptr; return nullptr;
} }
if (it != _map_rtp_process.end() && makeNew) {
//已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题
throw std::runtime_error(StrPrinter << "RtpProcess(" << stream_id << ") already existed");
}
RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id];
if (!ref) { if (!ref) {
ref = std::make_shared<RtpProcessHelper>(stream_id, shared_from_this()); ref = std::make_shared<RtpProcessHelper>(stream_id, shared_from_this());
......
...@@ -53,21 +53,9 @@ public: ...@@ -53,21 +53,9 @@ public:
void clear(); void clear();
/** /**
* 输入多个rtp流,根据ssrc分流
* @param sock 本地socket
* @param data 收到的数据
* @param data_len 收到的数据长度
* @param addr rtp流源地址
* @param dts_out 解析出最新的dts
* @return 是否成功
*/
bool inputRtp(const toolkit::Socket::Ptr &sock, const char *data, size_t data_len,
const struct sockaddr *addr, uint64_t *dts_out = nullptr);
/**
* 获取一个rtp处理器 * 获取一个rtp处理器
* @param stream_id 流id * @param stream_id 流id
* @param makeNew 不存在时是否新建 * @param makeNew 不存在时是否新建, 该参数为true时,必须确保之前未创建同名对象
* @return rtp处理器 * @return rtp处理器
*/ */
RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew); RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew);
......
...@@ -42,6 +42,8 @@ static bool loadFile(const char *path){ ...@@ -42,6 +42,8 @@ static bool loadFile(const char *path){
addr.ss_family = AF_INET; addr.ss_family = AF_INET;
auto sock = Socket::createSocket(); auto sock = Socket::createSocket();
size_t total_size = 0; size_t total_size = 0;
RtpProcess::Ptr process;
uint32_t ssrc = 0;
while (true) { while (true) {
if (2 != fread(&len, 1, 2, fp)) { if (2 != fread(&len, 1, 2, fp)) {
WarnL; WarnL;
...@@ -58,9 +60,24 @@ static bool loadFile(const char *path){ ...@@ -58,9 +60,24 @@ static bool loadFile(const char *path){
break; break;
} }
total_size += len; total_size += len;
uint64_t timeStamp; uint64_t timeStamp = 0;
if (!process) {
if (!RtpSelector::getSSRC(rtp, len, ssrc)) {
WarnL << "get ssrc from rtp failed:" << len;
return false;
}
process = RtpSelector::Instance().getProcess(printSSRC(ssrc), true);
}
if (process) {
try {
process->inputRtp(true, sock, rtp, len, (struct sockaddr *)&addr, &timeStamp);
} catch (...) {
RtpSelector::Instance().delProcess(printSSRC(ssrc), process.get());
throw;
}
}
RtpSelector::Instance().inputRtp(sock, rtp, len, (struct sockaddr *)&addr, &timeStamp);
auto diff = timeStamp - timeStamp_last; auto diff = timeStamp - timeStamp_last;
if (diff > 0 && diff < 500) { if (diff > 0 && diff < 500) {
usleep(diff * 1000); usleep(diff * 1000);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论