Commit feaacf39 by 夏楚 Committed by GitHub

Merge pull request #1920 from custompal/cus_pr

增加获取媒体流播放器列表功能
parents 48d49a33 33e1e6b8
ZLToolKit @ 23575ba8
Subproject commit 658271fdf4fb497b0665c06544627789844a003a Subproject commit 23575ba82d00641f8f33851b1d85391da310c378
...@@ -741,6 +741,35 @@ void installWebApi() { ...@@ -741,6 +741,35 @@ void installWebApi() {
val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"])); val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"]));
}); });
api_regist("/index/api/getMediaPlayerList",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET();
CHECK_ARGS("schema", "vhost", "app", "stream");
auto src = MediaSource::find(allArgs["schema"], allArgs["vhost"], allArgs["app"], allArgs["stream"]);
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
src->getPlayerList(
[=](const std::list<std::shared_ptr<void>> &info_list) mutable {
val["code"] = API::Success;
auto &data = val["data"];
data = Value(arrayValue);
for (auto &info : info_list) {
auto obj = static_pointer_cast<Value>(info);
data.append(std::move(*obj));
}
invoker(200, headerOut, val.toStyledString());
},
[](std::shared_ptr<void> &&info) -> std::shared_ptr<void> {
auto obj = std::make_shared<Value>();
auto session = static_pointer_cast<Session>(info);
(*obj)["peer_ip"] = session->get_peer_ip();
(*obj)["peer_port"] = session->get_peer_port();
(*obj)["id"] = session->getIdentifier();
(*obj)["typeid"] = toolkit::demangle(typeid(*session).name());
return obj;
});
});
//测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs //测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs
api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP_ASYNC){ api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP_ASYNC){
CHECK_SECRET(); CHECK_SECRET();
......
...@@ -192,52 +192,6 @@ public: ...@@ -192,52 +192,6 @@ public:
std::string _param_strs; std::string _param_strs;
}; };
class BytesSpeed {
public:
BytesSpeed() = default;
~BytesSpeed() = default;
/**
* 添加统计字节
*/
BytesSpeed& operator += (size_t bytes) {
_bytes += bytes;
if (_bytes > 1024 * 1024) {
//数据大于1MB就计算一次网速
computeSpeed();
}
return *this;
}
/**
* 获取速度,单位bytes/s
*/
int getSpeed() {
if (_ticker.elapsedTime() < 1000) {
//获取频率小于1秒,那么返回上次计算结果
return _speed;
}
return computeSpeed();
}
private:
int computeSpeed() {
auto elapsed = _ticker.elapsedTime();
if (!elapsed) {
return _speed;
}
_speed = (int)(_bytes * 1000 / elapsed);
_ticker.resetTime();
_bytes = 0;
return _speed;
}
private:
int _speed = 0;
size_t _bytes = 0;
toolkit::Ticker _ticker;
};
/** /**
* 媒体源,任何rtsp/rtmp的直播流都源自该对象 * 媒体源,任何rtsp/rtmp的直播流都源自该对象
*/ */
...@@ -293,6 +247,12 @@ public: ...@@ -293,6 +247,12 @@ public:
virtual int readerCount() = 0; virtual int readerCount() = 0;
// 观看者个数,包括(hls/rtsp/rtmp) // 观看者个数,包括(hls/rtsp/rtmp)
virtual int totalReaderCount(); virtual int totalReaderCount();
// 获取播放器列表
virtual void getPlayerList(const std::function<void(const std::list<std::shared_ptr<void>> &info_list)> &cb,
const std::function<std::shared_ptr<void>(std::shared_ptr<void> &&info)> &on_change) {
assert(cb);
cb(std::list<std::shared_ptr<void>>());
}
// 获取媒体源类型 // 获取媒体源类型
MediaOriginType getOriginType() const; MediaOriginType getOriginType() const;
...@@ -350,7 +310,7 @@ private: ...@@ -350,7 +310,7 @@ private:
void emitEvent(bool regist); void emitEvent(bool regist);
protected: protected:
BytesSpeed _speed[TrackMax]; toolkit::BytesSpeed _speed[TrackMax];
private: private:
std::atomic_flag _owned { false }; std::atomic_flag _owned { false };
......
...@@ -51,6 +51,11 @@ public: ...@@ -51,6 +51,11 @@ public:
return _ring; return _ring;
} }
void getPlayerList(const std::function<void(const std::list<std::shared_ptr<void>> &info_list)> &cb,
const std::function<std::shared_ptr<void>(std::shared_ptr<void> &&info)> &on_change) override {
_ring->getInfoList(cb, on_change);
}
/** /**
* 获取fmp4 init segment * 获取fmp4 init segment
*/ */
......
...@@ -245,8 +245,8 @@ bool HttpSession::checkLiveStream(const string &schema, const string &url_suffi ...@@ -245,8 +245,8 @@ bool HttpSession::checkLiveStream(const string &schema, const string &url_suffi
}; };
Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) { Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) {
if (auto strongSelf = weak_self.lock()) { if (auto strong_self = weak_self.lock()) {
strongSelf->async([onRes, err]() { onRes(err); }); strong_self->async([onRes, err]() { onRes(err); });
} }
}; };
...@@ -277,6 +277,7 @@ bool HttpSession::checkLiveStreamFMP4(const function<void()> &cb){ ...@@ -277,6 +277,7 @@ bool HttpSession::checkLiveStreamFMP4(const function<void()> &cb){
weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
fmp4_src->pause(false); fmp4_src->pause(false);
_fmp4_reader = fmp4_src->getRing()->attach(getPoller()); _fmp4_reader = fmp4_src->getRing()->attach(getPoller());
_fmp4_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); });
_fmp4_reader->setDetachCB([weak_self]() { _fmp4_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
...@@ -318,6 +319,7 @@ bool HttpSession::checkLiveStreamTS(const function<void()> &cb){ ...@@ -318,6 +319,7 @@ bool HttpSession::checkLiveStreamTS(const function<void()> &cb){
weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
ts_src->pause(false); ts_src->pause(false);
_ts_reader = ts_src->getRing()->attach(getPoller()); _ts_reader = ts_src->getRing()->attach(getPoller());
_ts_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); });
_ts_reader->setDetachCB([weak_self](){ _ts_reader->setDetachCB([weak_self](){
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
...@@ -412,19 +414,19 @@ void HttpSession::Handle_Req_GET_l(ssize_t &content_len, bool sendBody) { ...@@ -412,19 +414,19 @@ void HttpSession::Handle_Req_GET_l(ssize_t &content_len, bool sendBody) {
} }
bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
HttpFileManager::onAccessPath(*this, _parser, [weakSelf, bClose](int code, const string &content_type, HttpFileManager::onAccessPath(*this, _parser, [weak_self, bClose](int code, const string &content_type,
const StrCaseMap &responseHeader, const HttpBody::Ptr &body) { const StrCaseMap &responseHeader, const HttpBody::Ptr &body) {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
strongSelf->async([weakSelf, bClose, code, content_type, responseHeader, body]() { strong_self->async([weak_self, bClose, code, content_type, responseHeader, body]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
strongSelf->sendResponse(code, bClose, content_type.data(), responseHeader, body); strong_self->sendResponse(code, bClose, content_type.data(), responseHeader, body);
}); });
}); });
} }
...@@ -645,19 +647,19 @@ void HttpSession::urlDecode(Parser &parser){ ...@@ -645,19 +647,19 @@ void HttpSession::urlDecode(Parser &parser){
bool HttpSession::emitHttpEvent(bool doInvoke){ bool HttpSession::emitHttpEvent(bool doInvoke){
bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
/////////////////////异步回复Invoker/////////////////////////////// /////////////////////异步回复Invoker///////////////////////////////
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
HttpResponseInvoker invoker = [weakSelf,bClose](int code, const KeyValue &headerOut, const HttpBody::Ptr &body){ HttpResponseInvoker invoker = [weak_self,bClose](int code, const KeyValue &headerOut, const HttpBody::Ptr &body){
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if(!strongSelf) { if(!strong_self) {
return; return;
} }
strongSelf->async([weakSelf, bClose, code, headerOut, body]() { strong_self->async([weak_self, bClose, code, headerOut, body]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
//本对象已经销毁 //本对象已经销毁
return; return;
} }
strongSelf->sendResponse(code, bClose, nullptr, headerOut, body); strong_self->sendResponse(code, bClose, nullptr, headerOut, body);
}); });
}; };
///////////////////广播HTTP事件/////////////////////////// ///////////////////广播HTTP事件///////////////////////////
......
...@@ -28,12 +28,12 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr ...@@ -28,12 +28,12 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr
throw std::runtime_error("RtmpMediaSource 无效"); throw std::runtime_error("RtmpMediaSource 无效");
} }
if (!poller->isCurrentThread()) { if (!poller->isCurrentThread()) {
weak_ptr<FlvMuxer> weakSelf = getSharedPtr(); weak_ptr<FlvMuxer> weak_self = getSharedPtr();
//延时两秒启动录制,目的是为了等待config帧收集完毕 //延时两秒启动录制,目的是为了等待config帧收集完毕
poller->doDelayTask(2000, [weakSelf, poller, media, start_pts]() { poller->doDelayTask(2000, [weak_self, poller, media, start_pts]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (strongSelf) { if (strong_self) {
strongSelf->start(poller, media, start_pts); strong_self->start(poller, media, start_pts);
} }
return 0; return 0;
}); });
...@@ -42,21 +42,22 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr ...@@ -42,21 +42,22 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr
onWriteFlvHeader(media); onWriteFlvHeader(media);
std::weak_ptr<FlvMuxer> weakSelf = getSharedPtr(); std::weak_ptr<FlvMuxer> weak_self = getSharedPtr();
media->pause(false); media->pause(false);
_ring_reader = media->getRing()->attach(poller); _ring_reader = media->getRing()->attach(poller);
_ring_reader->setDetachCB([weakSelf]() { _ring_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); });
auto strongSelf = weakSelf.lock(); _ring_reader->setDetachCB([weak_self]() {
if (!strongSelf) { auto strong_self = weak_self.lock();
if (!strong_self) {
return; return;
} }
strongSelf->onDetach(); strong_self->onDetach();
}); });
bool check = start_pts > 0; bool check = start_pts > 0;
_ring_reader->setReadCB([weakSelf, start_pts, check](const RtmpMediaSource::RingDataType &pkt) mutable { _ring_reader->setReadCB([weak_self, start_pts, check](const RtmpMediaSource::RingDataType &pkt) mutable {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
...@@ -69,7 +70,7 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr ...@@ -69,7 +70,7 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr
} }
check = false; check = false;
} }
strongSelf->onWriteRtmp(rtmp, ++i == size); strong_self->onWriteRtmp(rtmp, ++i == size);
}); });
}); });
} }
......
...@@ -69,6 +69,11 @@ public: ...@@ -69,6 +69,11 @@ public:
return _ring; return _ring;
} }
void getPlayerList(const std::function<void(const std::list<std::shared_ptr<void>> &info_list)> &cb,
const std::function<std::shared_ptr<void>(std::shared_ptr<void> &&info)> &on_change) override {
_ring->getInfoList(cb, on_change);
}
/** /**
* 获取播放器个数 * 获取播放器个数
* @return * @return
......
...@@ -199,13 +199,13 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -199,13 +199,13 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
} }
Broadcast::PublishAuthInvoker invoker = [weak_self, on_res, pToken](const string &err, const ProtocolOption &option) { Broadcast::PublishAuthInvoker invoker = [weak_self, on_res, pToken](const string &err, const ProtocolOption &option) {
auto strongSelf = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
strongSelf->async([weak_self, on_res, err, pToken, option]() { strong_self->async([weak_self, on_res, err, pToken, option]() {
auto strongSelf = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
on_res(err, option); on_res(err, option);
...@@ -307,28 +307,29 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr ...@@ -307,28 +307,29 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr
src->pause(false); src->pause(false);
_ring_reader = src->getRing()->attach(getPoller()); _ring_reader = src->getRing()->attach(getPoller());
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weak_self = dynamic_pointer_cast<RtmpSession>(shared_from_this());
_ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { _ring_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); });
auto strongSelf = weakSelf.lock(); _ring_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) {
if (!strongSelf) { auto strong_self = weak_self.lock();
if (!strong_self) {
return; return;
} }
size_t i = 0; size_t i = 0;
auto size = pkt->size(); auto size = pkt->size();
strongSelf->setSendFlushFlag(false); strong_self->setSendFlushFlag(false);
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){ pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
if(++i == size){ if(++i == size){
strongSelf->setSendFlushFlag(true); strong_self->setSendFlushFlag(true);
} }
strongSelf->onSendMedia(rtmp); strong_self->onSendMedia(rtmp);
}); });
}); });
_ring_reader->setDetachCB([weakSelf]() { _ring_reader->setDetachCB([weak_self]() {
auto strongSelf = weakSelf.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); strong_self->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached"));
}); });
src->pause(false); src->pause(false);
_play_src = src; _play_src = src;
...@@ -360,9 +361,9 @@ void RtmpSession::doPlay(AMFDecoder &dec){ ...@@ -360,9 +361,9 @@ void RtmpSession::doPlay(AMFDecoder &dec){
std::shared_ptr<Ticker> ticker(new Ticker); std::shared_ptr<Ticker> ticker(new Ticker);
weak_ptr<RtmpSession> weak_self = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weak_self = dynamic_pointer_cast<RtmpSession>(shared_from_this());
std::shared_ptr<onceToken> token(new onceToken(nullptr, [ticker,weak_self](){ std::shared_ptr<onceToken> token(new onceToken(nullptr, [ticker,weak_self](){
auto strongSelf = weak_self.lock(); auto strong_self = weak_self.lock();
if (strongSelf) { if (strong_self) {
DebugP(strongSelf.get()) << "play 回复时间:" << ticker->elapsedTime() << "ms"; DebugP(strong_self.get()) << "play 回复时间:" << ticker->elapsedTime() << "ms";
} }
})); }));
Broadcast::AuthInvoker invoker = [weak_self,token](const string &err){ Broadcast::AuthInvoker invoker = [weak_self,token](const string &err){
......
...@@ -36,7 +36,7 @@ protected: ...@@ -36,7 +36,7 @@ protected:
private: private:
std::string _stream_id; std::string _stream_id;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;
std::weak_ptr<RtpSelector > _parent; std::weak_ptr<RtpSelector> _parent;
}; };
class RtpSelector : public std::enable_shared_from_this<RtpSelector>{ class RtpSelector : public std::enable_shared_from_this<RtpSelector>{
......
...@@ -65,6 +65,11 @@ public: ...@@ -65,6 +65,11 @@ public:
return _ring; return _ring;
} }
void getPlayerList(const std::function<void(const std::list<std::shared_ptr<void>> &info_list)> &cb,
const std::function<std::shared_ptr<void>(std::shared_ptr<void> &&info)> &on_change) override {
_ring->getInfoList(cb, on_change);
}
/** /**
* 获取播放器个数 * 获取播放器个数
*/ */
......
...@@ -51,6 +51,11 @@ public: ...@@ -51,6 +51,11 @@ public:
return _ring; return _ring;
} }
void getPlayerList(const std::function<void(const std::list<std::shared_ptr<void>> &info_list)> &cb,
const std::function<std::shared_ptr<void>(std::shared_ptr<void> &&info)> &on_change) override {
_ring->getInfoList(cb, on_change);
}
/** /**
* 获取播放器个数 * 获取播放器个数
*/ */
......
...@@ -226,6 +226,8 @@ void SrtTransportImp::doPlay() { ...@@ -226,6 +226,8 @@ void SrtTransportImp::doPlay() {
assert(ts_src); assert(ts_src);
ts_src->pause(false); ts_src->pause(false);
strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller()); strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller());
weak_ptr<Session> weak_session = strong_self->getSession();
strong_self->_ts_reader->setGetInfoCB([weak_session]() { return weak_session.lock(); });
strong_self->_ts_reader->setDetachCB([weak_self]() { strong_self->_ts_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strong_self) { if (!strong_self) {
......
...@@ -39,23 +39,25 @@ void WebRtcPlayer::onStartWebRTC() { ...@@ -39,23 +39,25 @@ void WebRtcPlayer::onStartWebRTC() {
_play_src->pause(false); _play_src->pause(false);
_reader = _play_src->getRing()->attach(getPoller(), true); _reader = _play_src->getRing()->attach(getPoller(), true);
weak_ptr<WebRtcPlayer> weak_self = static_pointer_cast<WebRtcPlayer>(shared_from_this()); weak_ptr<WebRtcPlayer> weak_self = static_pointer_cast<WebRtcPlayer>(shared_from_this());
weak_ptr<Session> weak_session = getSession();
_reader->setGetInfoCB([weak_session]() { return weak_session.lock(); });
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
auto strongSelf = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
size_t i = 0; size_t i = 0;
pkt->for_each([&](const RtpPacket::Ptr &rtp) { pkt->for_each([&](const RtpPacket::Ptr &rtp) {
//TraceL<<"send track type:"<<rtp->type<<" ts:"<<rtp->getStamp()<<" ntp:"<<rtp->ntp_stamp<<" size:"<<rtp->getPayloadSize()<<" i:"<<i; //TraceL<<"send track type:"<<rtp->type<<" ts:"<<rtp->getStamp()<<" ntp:"<<rtp->ntp_stamp<<" size:"<<rtp->getPayloadSize()<<" i:"<<i;
strongSelf->onSendRtp(rtp, ++i == pkt->size()); strong_self->onSendRtp(rtp, ++i == pkt->size());
}); });
}); });
_reader->setDetachCB([weak_self]() { _reader->setDetachCB([weak_self]() {
auto strongSelf = weak_self.lock(); auto strong_self = weak_self.lock();
if (!strongSelf) { if (!strong_self) {
return; return;
} }
strongSelf->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); strong_self->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
}); });
} }
//使用完毕后,释放强引用,这样确保推流器断开后能及时注销媒体 //使用完毕后,释放强引用,这样确保推流器断开后能及时注销媒体
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论