Commit 75407391 by mtdxc Committed by GitHub

Header refactor (#2115)

* 优化MultiMediaSourceMuxer头文件包含

* 将MediaSinkDelegate和Demux移到MediaSink中

* MediaSource头文件重构, 独立出PacketCache.h
精简Frame和Track的头文件

* Rtmp头文件重构

* Rtsp头文件重构

* webrtc头文件重构

* 规范.h头文件包含,并将其移到.cpp中:
- 尽量不包含Common\config.h
- Util\File.h
- Rtsp/RtspPlayer.h
- Rtmp/RtmpPlayer.h

* 删除多余的Stamp.h和Base64包含
parent 59ecdd13
......@@ -13,6 +13,7 @@
#include <unordered_map>
#include "Util/logger.h"
#include "Util/SSLBox.h"
#include "Util/File.h"
#include "Network/TcpServer.h"
#include "Network/UdpServer.h"
#include "Thread/WorkThreadPool.h"
......@@ -37,6 +38,7 @@ static std::shared_ptr<RtpServer> rtpServer;
#ifdef ENABLE_WEBRTC
#include "../webrtc/WebRtcSession.h"
#include "../webrtc/WebRtcTransport.h"
static std::shared_ptr<UdpServer> rtcServer_udp;
static std::shared_ptr<TcpServer> rtcServer_tcp;
#endif
......
......@@ -11,6 +11,7 @@
#include <signal.h>
#include "Util/logger.h"
#include <iostream>
#include "Common/config.h"
#include "Rtsp/UDPServer.h"
#include "Player/MediaPlayer.h"
#include "Util/onceToken.h"
......
......@@ -16,7 +16,6 @@
#include <functional>
#include "Process.h"
#include "Util/TimeTicker.h"
#include "Network/Socket.h"
#include "Common/MediaSource.h"
namespace FFmpeg {
......
......@@ -17,6 +17,7 @@
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/NoticeCenter.h"
#include "Util/File.h"
#ifdef ENABLE_MYSQL
#include "Util/SqlPool.h"
#endif //ENABLE_MYSQL
......
......@@ -16,7 +16,7 @@
#include "Util/uv_errno.h"
#include "Transcode.h"
#include "Extension/AAC.h"
#include "Common/config.h"
#define MAX_DELAY_SECOND 3
using namespace std;
......
......@@ -14,7 +14,6 @@
#include <memory>
#include <string>
#include <functional>
#include "Util/util.h"
#include "Util/TimeTicker.h"
#include "Common/MultiMediaSourceMuxer.h"
......
......@@ -10,6 +10,7 @@
#include "MediaSink.h"
#include "Extension/AAC.h"
#include "Common/config.h"
using namespace std;
......@@ -271,4 +272,86 @@ void MediaSink::enableMuteAudio(bool flag) {
_add_mute_audio = flag;
}
///////////////////////////DemuxerSink//////////////////////////////
void MediaSinkDelegate::setTrackListener(TrackListener *listener) {
_listener = listener;
}
bool MediaSinkDelegate::onTrackReady(const Track::Ptr &track) {
if (_listener) {
_listener->addTrack(track);
}
return true;
}
void MediaSinkDelegate::onAllTrackReady() {
if (_listener) {
_listener->addTrackCompleted();
}
}
void MediaSinkDelegate::resetTracks() {
MediaSink::resetTracks();
if (_listener) {
_listener->resetTracks();
}
}
///////////////////////////Demuxer//////////////////////////////
void Demuxer::setTrackListener(TrackListener *listener, bool wait_track_ready) {
if (wait_track_ready) {
auto sink = std::make_shared<MediaSinkDelegate>();
sink->setTrackListener(listener);
_sink = std::move(sink);
}
_listener = listener;
}
bool Demuxer::addTrack(const Track::Ptr &track) {
if (!_sink) {
_origin_track.emplace_back(track);
return _listener ? _listener->addTrack(track) : false;
}
if (_sink->addTrack(track)) {
track->addDelegate([this](const Frame::Ptr &frame) {
return _sink->inputFrame(frame);
});
return true;
}
return false;
}
void Demuxer::addTrackCompleted() {
if (_sink) {
_sink->addTrackCompleted();
} else if (_listener) {
_listener->addTrackCompleted();
}
}
void Demuxer::resetTracks() {
if (_sink) {
_sink->resetTracks();
} else if (_listener) {
_listener->resetTracks();
}
}
vector<Track::Ptr> Demuxer::getTracks(bool ready) const {
if (_sink) {
return _sink->getTracks(ready);
}
vector<Track::Ptr> ret;
for (auto &track : _origin_track) {
if (ready && !track->ready()) {
continue;
}
ret.emplace_back(track);
}
return ret;
}
}//namespace mediakit
......@@ -168,6 +168,44 @@ private:
};
class MediaSinkDelegate : public MediaSink {
public:
MediaSinkDelegate() = default;
~MediaSinkDelegate() override = default;
/**
* 设置track监听器
*/
void setTrackListener(TrackListener *listener);
protected:
void resetTracks() override;
bool onTrackReady(const Track::Ptr & track) override;
void onAllTrackReady() override;
private:
TrackListener *_listener = nullptr;
};
class Demuxer : protected TrackListener, public TrackSource {
public:
Demuxer() = default;
~Demuxer() override = default;
void setTrackListener(TrackListener *listener, bool wait_track_ready = false);
std::vector<Track::Ptr> getTracks(bool trackReady = true) const override;
protected:
bool addTrack(const Track::Ptr &track) override;
void addTrackCompleted() override;
void resetTracks() override;
private:
MediaSink::Ptr _sink;
TrackListener *_listener = nullptr;
std::vector<Track::Ptr> _origin_track;
};
}//namespace mediakit
#endif //ZLMEDIAKIT_MEDIASINK_H
......@@ -7,15 +7,16 @@
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <mutex>
#include "Util/util.h"
#include "Util/NoticeCenter.h"
#include "Network/sockutil.h"
#include "Network/Session.h"
#include "MediaSource.h"
#include "Common/config.h"
#include "Common/Parser.h"
#include "Record/MP4Reader.h"
#include "PacketCache.h"
using namespace std;
using namespace toolkit;
......
......@@ -11,15 +11,10 @@
#ifndef ZLMEDIAKIT_MEDIASOURCE_H
#define ZLMEDIAKIT_MEDIASOURCE_H
#include <mutex>
#include <string>
#include <atomic>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Common/config.h"
#include "Common/Parser.h"
#include "Util/List.h"
#include "Network/Socket.h"
#include "Extension/Track.h"
#include "Record/Recorder.h"
......@@ -410,84 +405,5 @@ private:
toolkit::ObjectStatistic<MediaSource> _statistic;
};
/// 缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy() = default;
~FlushPolicy() = default;
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, size_t cache_size);
private:
// 音视频的最后时间戳
uint64_t _last_stamp[2] = { 0, 0 };
};
/// 合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = toolkit::List<std::shared_ptr<packet> > >
class PacketCache {
public:
PacketCache() { _cache = std::make_shared<packet_list>(); }
virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
bool flag = flushImmediatelyWhenCloseMerge();
if (!flag && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flush();
}
//追加数据到最后
_cache->emplace_back(std::move(pkt));
if (key_pos) {
_key_pos = key_pos;
}
if (flag) {
flush();
}
}
void flush() {
if (_cache->empty()) {
return;
}
onFlush(std::move(_cache), _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
bool flushImmediatelyWhenCloseMerge() {
// 一般的协议关闭合并写时,立即刷新缓存,这样可以减少一帧的延时,但是rtp例外
// 因为rtp的包很小,一个RtpPacket包中也不是完整的一帧图像,所以在关闭合并写时,
// 还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时
// 但是却对性能提升很大,这样做还是比较划算的
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
GET_CONFIG(int, rtspLowLatency, Rtsp::kLowLatency);
if (std::is_same<packet, RtpPacket>::value && rtspLowLatency) {
return true;
}
return std::is_same<packet, RtpPacket>::value ? false : (mergeWriteMS <= 0);
}
private:
bool _key_pos = false;
policy _policy;
std::shared_ptr<packet_list> _cache;
};
} /* namespace mediakit */
#endif //ZLMEDIAKIT_MEDIASOURCE_H
......@@ -11,6 +11,13 @@
#include <math.h>
#include "Common/config.h"
#include "MultiMediaSourceMuxer.h"
#include "Rtp/RtpSender.h"
#include "Record/HlsRecorder.h"
#include "Record/HlsMediaSource.h"
#include "Rtsp/RtspMediaSourceMuxer.h"
#include "Rtmp/RtmpMediaSourceMuxer.h"
#include "TS/TSMediaSourceMuxer.h"
#include "FMP4/FMP4MediaSourceMuxer.h"
using namespace std;
using namespace toolkit;
......
......@@ -12,16 +12,17 @@
#define ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H
#include "Common/Stamp.h"
#include "Rtp/RtpSender.h"
#include "Common/MediaSource.h"
#include "Common/MediaSink.h"
#include "Record/Recorder.h"
#include "Record/HlsRecorder.h"
#include "Record/HlsMediaSource.h"
#include "Rtsp/RtspMediaSourceMuxer.h"
#include "Rtmp/RtmpMediaSourceMuxer.h"
#include "TS/TSMediaSourceMuxer.h"
#include "FMP4/FMP4MediaSourceMuxer.h"
namespace mediakit {
class HlsRecorder;
class RtspMediaSourceMuxer;
class RtmpMediaSourceMuxer;
class TSMediaSourceMuxer;
class FMP4MediaSourceMuxer;
class RtpSender;
class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
......@@ -160,17 +161,17 @@ private:
Stamp _stamp[2];
std::weak_ptr<Listener> _track_listener;
#if defined(ENABLE_RTPPROXY)
std::unordered_map<std::string, RtpSender::Ptr> _rtp_sender;
std::unordered_map<std::string, std::shared_ptr<RtpSender>> _rtp_sender;
#endif //ENABLE_RTPPROXY
#if defined(ENABLE_MP4)
FMP4MediaSourceMuxer::Ptr _fmp4;
std::shared_ptr<FMP4MediaSourceMuxer> _fmp4;
#endif
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;
TSMediaSourceMuxer::Ptr _ts;
std::shared_ptr<RtmpMediaSourceMuxer> _rtmp;
std::shared_ptr<RtspMediaSourceMuxer> _rtsp;
std::shared_ptr<TSMediaSourceMuxer> _ts;
MediaSinkInterface::Ptr _mp4;
HlsRecorder::Ptr _hls;
std::shared_ptr<HlsRecorder> _hls;
toolkit::EventPoller::Ptr _poller;
//对象个数统计
......
#ifndef _SRC_PACKET_CACHE_H_
#define _SRC_PACKET_CACHE_H_
#include "Common/config.h"
#include "Util/List.h"
#pragma once
namespace mediakit {
/// 缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy() = default;
~FlushPolicy() = default;
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, size_t cache_size);
private:
// 音视频的最后时间戳
uint64_t _last_stamp[2] = { 0, 0 };
};
/// 合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = toolkit::List<std::shared_ptr<packet> > >
class PacketCache {
public:
PacketCache() { _cache = std::make_shared<packet_list>(); }
virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
bool flag = flushImmediatelyWhenCloseMerge();
if (!flag && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flush();
}
//追加数据到最后
_cache->emplace_back(std::move(pkt));
if (key_pos) {
_key_pos = key_pos;
}
if (flag) {
flush();
}
}
void flush() {
if (_cache->empty()) {
return;
}
onFlush(std::move(_cache), _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
bool flushImmediatelyWhenCloseMerge() {
// 一般的协议关闭合并写时,立即刷新缓存,这样可以减少一帧的延时,但是rtp例外
// 因为rtp的包很小,一个RtpPacket包中也不是完整的一帧图像,所以在关闭合并写时,
// 还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时
// 但是却对性能提升很大,这样做还是比较划算的
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
GET_CONFIG(int, rtspLowLatency, Rtsp::kLowLatency);
if (std::is_same<packet, RtpPacket>::value && rtspLowLatency) {
return true;
}
return std::is_same<packet, RtpPacket>::value ? false : (mergeWriteMS <= 0);
}
private:
bool _key_pos = false;
policy _policy;
std::shared_ptr<packet_list> _cache;
};
}
#endif
......@@ -22,7 +22,9 @@
#include "Opus.h"
#include "G711.h"
#include "L16.h"
#include "Util/base64.h"
#include "Common/Parser.h"
#include "Common/config.h"
using namespace std;
......
......@@ -12,6 +12,8 @@
#include "H264.h"
#include "H265.h"
#include "Common/Parser.h"
#include "Common/Stamp.h"
using namespace std;
using namespace toolkit;
......@@ -29,6 +31,13 @@ Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){
return std::make_shared<FrameCacheAble>(frame);
}
FrameStamp::FrameStamp(Frame::Ptr frame, Stamp &stamp, bool modify_stamp)
{
_frame = std::move(frame);
//覆盖时间戳
stamp.revise(_frame->dts(), _frame->pts(), _dts, _pts, modify_stamp);
}
TrackType getTrackType(CodecId codecId) {
switch (codecId) {
#define XX(name, type, value, str, mpeg_id) case name : return type;
......
......@@ -11,14 +11,14 @@
#ifndef ZLMEDIAKIT_FRAME_H
#define ZLMEDIAKIT_FRAME_H
#include <map>
#include <mutex>
#include <functional>
#include "Util/RingBuffer.h"
#include "Network/Socket.h"
#include "Common/Stamp.h"
#include "Util/List.h"
#include "Network/Buffer.h"
namespace mediakit {
class Stamp;
typedef enum {
TrackInvalid = -1,
TrackVideo = 0,
......@@ -441,11 +441,7 @@ private:
class FrameStamp : public Frame {
public:
using Ptr = std::shared_ptr<FrameStamp>;
FrameStamp(Frame::Ptr frame, Stamp &stamp, bool modify_stamp) {
_frame = std::move(frame);
//覆盖时间戳
stamp.revise(_frame->dts(), _frame->pts(), _dts, _pts, modify_stamp);
}
FrameStamp(Frame::Ptr frame, Stamp &stamp, bool modify_stamp);
~FrameStamp() override {}
uint64_t dts() const override { return (uint64_t)_dts; }
......
......@@ -11,6 +11,8 @@
#include "H264.h"
#include "SPSParser.h"
#include "Util/logger.h"
#include "Util/base64.h"
using namespace toolkit;
using namespace std;
......
......@@ -13,7 +13,6 @@
#include "Frame.h"
#include "Track.h"
#include "Util/base64.h"
#define H264_TYPE(v) ((uint8_t)(v) & 0x1F)
......
......@@ -13,7 +13,6 @@
#include "Rtmp/RtmpCodec.h"
#include "Extension/Track.h"
#include "Util/ResourcePool.h"
#include "Extension/H264.h"
namespace mediakit{
......
......@@ -9,6 +9,7 @@
*/
#include "H264Rtp.h"
#include "Common/config.h"
namespace mediakit{
......
......@@ -12,8 +12,8 @@
#define ZLMEDIAKIT_H264RTPCODEC_H
#include "Rtsp/RtpCodec.h"
#include "Util/ResourcePool.h"
#include "Extension/H264.h"
// for DtsGenerator
#include "Common/Stamp.h"
namespace mediakit{
......
......@@ -10,6 +10,7 @@
#include "H265.h"
#include "SPSParser.h"
#include "Util/base64.h"
using namespace std;
using namespace toolkit;
......
......@@ -13,7 +13,6 @@
#include "Frame.h"
#include "Track.h"
#include "Util/base64.h"
#include "H264.h"
#define H265_TYPE(v) (((uint8_t)(v) >> 1) & 0x3f)
......
......@@ -13,7 +13,6 @@
#include "Rtmp/RtmpCodec.h"
#include "Extension/Track.h"
#include "Util/ResourcePool.h"
#include "Extension/H265.h"
namespace mediakit{
......
......@@ -12,8 +12,8 @@
#define ZLMEDIAKIT_H265RTPCODEC_H
#include "Rtsp/RtpCodec.h"
#include "Util/ResourcePool.h"
#include "Extension/H265.h"
// for DtsGenerator
#include "Common/Stamp.h"
namespace mediakit{
......
......@@ -14,7 +14,6 @@
#include <memory>
#include <string>
#include "Frame.h"
#include "Util/RingBuffer.h"
#include "Rtsp/Rtsp.h"
namespace mediakit{
......
......@@ -12,6 +12,8 @@
#define ZLMEDIAKIT_FMP4MEDIASOURCE_H
#include "Common/MediaSource.h"
#include "Common/PacketCache.h"
#include "Util/RingBuffer.h"
#define FMP4_GOP_SIZE 512
......
......@@ -9,7 +9,7 @@
*/
#include "HlsPlayer.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -11,7 +11,6 @@
#ifndef HTTP_HLSPLAYER_H
#define HTTP_HLSPLAYER_H
#include "Common/Stamp.h"
#include "Player/PlayerBase.h"
#include "HttpTSPlayer.h"
#include "HlsParser.h"
......
......@@ -19,6 +19,7 @@
#include "HttpSession.h"
#include "Record/HlsMediaSource.h"
#include "Common/Parser.h"
#include "Common/config.h"
#include "strCoding.h"
using namespace std;
......
......@@ -13,7 +13,6 @@
#include <functional>
#include "Network/Session.h"
#include "Rtmp/RtmpMediaSource.h"
#include "Rtmp/FlvMuxer.h"
#include "HttpRequestSplitter.h"
#include "WebSocketSplitter.h"
......
......@@ -9,7 +9,7 @@
*/
#include "TsPlayer.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -10,6 +10,7 @@
#include "TsPlayerImp.h"
#include "HlsPlayer.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -10,8 +10,6 @@
#include <algorithm>
#include "MediaPlayer.h"
#include "Rtmp/RtmpPlayerImp.h"
#include "Rtsp/RtspPlayerImp.h"
using namespace std;
using namespace toolkit;
......
......@@ -14,9 +14,6 @@
#include <memory>
#include <string>
#include "PlayerBase.h"
#include "Rtsp/RtspPlayer.h"
#include "Rtmp/RtmpPlayer.h"
#include "Thread/TaskExecutor.h"
namespace mediakit {
......
......@@ -68,87 +68,4 @@ PlayerBase::PlayerBase() {
this->mINI::operator[](Client::kWaitTrackReady) = true;
}
///////////////////////////DemuxerSink//////////////////////////////
void MediaSinkDelegate::setTrackListener(TrackListener *listener) {
_listener = listener;
}
bool MediaSinkDelegate::onTrackReady(const Track::Ptr &track) {
if (_listener) {
_listener->addTrack(track);
}
return true;
}
void MediaSinkDelegate::onAllTrackReady() {
if (_listener) {
_listener->addTrackCompleted();
}
}
void MediaSinkDelegate::resetTracks() {
MediaSink::resetTracks();
if (_listener) {
_listener->resetTracks();
}
}
///////////////////////////Demuxer//////////////////////////////
void Demuxer::setTrackListener(TrackListener *listener, bool wait_track_ready) {
if (wait_track_ready) {
auto sink = std::make_shared<MediaSinkDelegate>();
sink->setTrackListener(listener);
_sink = std::move(sink);
}
_listener = listener;
}
bool Demuxer::addTrack(const Track::Ptr &track) {
if (!_sink) {
_origin_track.emplace_back(track);
return _listener ? _listener->addTrack(track) : false;
}
if (_sink->addTrack(track)) {
track->addDelegate([this](const Frame::Ptr &frame) {
return _sink->inputFrame(frame);
});
return true;
}
return false;
}
void Demuxer::addTrackCompleted() {
if (_sink) {
_sink->addTrackCompleted();
} else if (_listener) {
_listener->addTrackCompleted();
}
}
void Demuxer::resetTracks() {
if (_sink) {
_sink->resetTracks();
} else if (_listener) {
_listener->resetTracks();
}
}
vector<Track::Ptr> Demuxer::getTracks(bool ready) const {
if (_sink) {
return _sink->getTracks(ready);
}
vector<Track::Ptr> ret;
for (auto &track : _origin_track) {
if (ready && !track->ready()) {
continue;
}
ret.emplace_back(track);
}
return ret;
}
} /* namespace mediakit */
......@@ -17,7 +17,6 @@
#include <functional>
#include "Network/Socket.h"
#include "Util/mini.h"
#include "Util/RingBuffer.h"
#include "Common/MediaSource.h"
#include "Common/MediaSink.h"
#include "Extension/Frame.h"
......@@ -236,44 +235,6 @@ protected:
std::shared_ptr<Delegate> _delegate;
};
class MediaSinkDelegate : public MediaSink {
public:
MediaSinkDelegate() = default;
~MediaSinkDelegate() override = default;
/**
* 设置track监听器
*/
void setTrackListener(TrackListener *listener);
protected:
void resetTracks() override;
bool onTrackReady(const Track::Ptr & track) override;
void onAllTrackReady() override;
private:
TrackListener *_listener = nullptr;
};
class Demuxer : protected TrackListener, public TrackSource {
public:
Demuxer() = default;
~Demuxer() override = default;
void setTrackListener(TrackListener *listener, bool wait_track_ready = false);
std::vector<Track::Ptr> getTracks(bool trackReady = true) const override;
protected:
bool addTrack(const Track::Ptr &track) override;
void addTrackCompleted() override;
void resetTracks() override;
private:
MediaSink::Ptr _sink;
TrackListener *_listener = nullptr;
std::vector<Track::Ptr> _origin_track;
};
} /* namespace mediakit */
#endif /* SRC_PLAYER_PLAYERBASE_H_ */
......@@ -14,6 +14,10 @@
#include "Util/MD5.h"
#include "Util/logger.h"
#include "Extension/AAC.h"
#include "Rtmp/RtmpMediaSource.h"
#include "Rtsp/RtspMediaSource.h"
#include "Rtmp/RtmpPlayer.h"
#include "Rtsp/RtspPlayer.h"
using namespace toolkit;
using namespace std;
......
......@@ -12,9 +12,8 @@
#define SRC_DEVICE_PLAYERPROXY_H_
#include <memory>
#include "Common/Device.h"
#include "Common/MultiMediaSourceMuxer.h"
#include "Player/MediaPlayer.h"
#include "Util/TimeTicker.h"
namespace mediakit {
......@@ -72,7 +71,7 @@ private:
std::string _app;
std::string _stream_id;
std::string _pull_url;
toolkit::Timer::Ptr _timer;
std::shared_ptr<toolkit::Timer> _timer;
std::function<void(const toolkit::SockException &ex)> _on_close;
std::function<void(const toolkit::SockException &ex)> _on_play;
MultiMediaSourceMuxer::Ptr _muxer;
......
......@@ -14,7 +14,6 @@
#include <memory>
#include <string>
#include "PusherBase.h"
#include "Thread/TaskExecutor.h"
namespace mediakit {
......
......@@ -9,6 +9,7 @@
*/
#include "HlsMaker.h"
#include "Common/config.h"
using namespace std;
......
......@@ -11,13 +11,9 @@
#ifndef HLSMAKER_H
#define HLSMAKER_H
#include <string>
#include <deque>
#include <tuple>
#include "Common/config.h"
#include "Util/TimeTicker.h"
#include "Util/File.h"
#include "Util/util.h"
#include "Util/logger.h"
namespace mediakit {
......
......@@ -13,6 +13,8 @@
#include "HlsMakerImp.h"
#include "Util/util.h"
#include "Util/uv_errno.h"
#include "Util/File.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -9,6 +9,7 @@
*/
#include "HlsMediaSource.h"
#include "Common/config.h"
using namespace toolkit;
......@@ -64,4 +65,40 @@ HlsMediaSource::Ptr HlsCookieData::getMediaSource() const {
return _src.lock();
}
void HlsMediaSource::setIndexFile(std::string index_file)
{
if (!_ring) {
std::weak_ptr<HlsMediaSource> weakSelf = std::dynamic_pointer_cast<HlsMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
_ring = std::make_shared<RingType>(0, std::move(lam));
regist();
}
//赋值m3u8索引文件内容
std::lock_guard<std::mutex> lck(_mtx_index);
_index_file = std::move(index_file);
if (!_index_file.empty()) {
_list_cb.for_each([&](const std::function<void(const std::string& str)>& cb) { cb(_index_file); });
_list_cb.clear();
}
}
void HlsMediaSource::getIndexFile(std::function<void(const std::string& str)> cb)
{
std::lock_guard<std::mutex> lck(_mtx_index);
if (!_index_file.empty()) {
cb(_index_file);
return;
}
//等待生成m3u8文件
_list_cb.emplace_back(std::move(cb));
}
} // namespace mediakit
......@@ -13,6 +13,7 @@
#include "Common/MediaSource.h"
#include "Util/TimeTicker.h"
#include "Util/RingBuffer.h"
#include <atomic>
namespace mediakit {
......@@ -41,42 +42,12 @@ public:
/**
* 设置或清空m3u8索引文件内容
*/
void setIndexFile(std::string index_file) {
if (!_ring) {
std::weak_ptr<HlsMediaSource> weakSelf = std::dynamic_pointer_cast<HlsMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
_ring = std::make_shared<RingType>(0, std::move(lam));
regist();
}
//赋值m3u8索引文件内容
std::lock_guard<std::mutex> lck(_mtx_index);
_index_file = std::move(index_file);
if (!_index_file.empty()) {
_list_cb.for_each([&](const std::function<void(const std::string &str)> &cb) { cb(_index_file); });
_list_cb.clear();
}
}
void setIndexFile(std::string index_file);
/**
* 异步获取m3u8文件
*/
void getIndexFile(std::function<void(const std::string &str)> cb) {
std::lock_guard<std::mutex> lck(_mtx_index);
if (!_index_file.empty()) {
cb(_index_file);
return;
}
//等待生成m3u8文件
_list_cb.emplace_back(std::move(cb));
}
void getIndexFile(std::function<void(const std::string &str)> cb);
/**
* 同步获取m3u8文件
......
......@@ -13,6 +13,7 @@
#include "HlsMakerImp.h"
#include "MPEG.h"
#include "Common/config.h"
namespace mediakit {
......
......@@ -11,8 +11,11 @@
#ifdef ENABLE_MP4
#include "MP4Muxer.h"
#include "Util/File.h"
#include "Extension/AAC.h"
#include "Extension/G711.h"
#include "Extension/H264.h"
#include "Extension/H265.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -14,10 +14,6 @@
#ifdef ENABLE_MP4
#include "Common/MediaSink.h"
#include "Extension/AAC.h"
#include "Extension/G711.h"
#include "Extension/H264.h"
#include "Extension/H265.h"
#include "Common/Stamp.h"
#include "MP4.h"
......
......@@ -13,6 +13,7 @@
#include "MP4Reader.h"
#include "Common/config.h"
#include "Thread/WorkThreadPool.h"
#include "Util/File.h"
using namespace std;
using namespace toolkit;
......
......@@ -15,6 +15,7 @@
#include "Common/config.h"
#include "MP4Recorder.h"
#include "Thread/WorkThreadPool.h"
#include "MP4Muxer.h"
using namespace std;
using namespace toolkit;
......
......@@ -13,17 +13,14 @@
#include <mutex>
#include <memory>
#include "Player/PlayerBase.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Util/TimeTicker.h"
#include "Common/MediaSink.h"
#include "MP4Muxer.h"
#include "Record/Recorder.h"
namespace mediakit {
#ifdef ENABLE_MP4
class MP4Muxer;
class MP4Recorder final : public MediaSinkInterface {
public:
using Ptr = std::shared_ptr<MP4Recorder>;
......@@ -63,7 +60,7 @@ private:
std::string _full_path;
std::string _full_path_tmp;
RecordInfo _info;
MP4Muxer::Ptr _muxer;
std::shared_ptr<MP4Muxer> _muxer;
std::list<Track::Ptr> _tracks;
uint64_t _last_dts = 0;
};
......
......@@ -18,10 +18,8 @@
#include <unordered_map>
#include "Extension/Frame.h"
#include "Extension/Track.h"
#include "Util/File.h"
#include "Common/MediaSink.h"
#include "Common/Stamp.h"
#include "Util/ResourcePool.h"
namespace mediakit {
//该类用于产生MPEG-TS/MPEG-PS
......
......@@ -10,9 +10,11 @@
#include "Recorder.h"
#include "Common/config.h"
#include "Util/File.h"
#include "Common/MediaSource.h"
#include "MP4Recorder.h"
#include "HlsRecorder.h"
#include "Util/File.h"
using namespace std;
using namespace toolkit;
......
......@@ -11,7 +11,6 @@
#ifndef ZLMEDIAKIT_RTCPFCI_H
#define ZLMEDIAKIT_RTCPFCI_H
#include "Common/config.h"
#include "Rtcp.h"
namespace mediakit {
......
......@@ -13,8 +13,7 @@
#include "Rtmp/Rtmp.h"
#include "Rtmp/RtmpMediaSource.h"
#include "Network/Socket.h"
#include "Common/Stamp.h"
#include "Poller/EventPoller.h"
namespace mediakit {
......
......@@ -12,6 +12,16 @@
#include "Extension/Factory.h"
namespace mediakit{
TitleMeta::TitleMeta(float dur_sec, size_t fileSize, const std::map<std::string, std::string> &header)
{
_metadata.set("duration", dur_sec);
_metadata.set("fileSize", (int)fileSize);
_metadata.set("server", kServerName);
for (auto &pr : header) {
_metadata.set(pr.first, pr.second);
}
}
VideoMeta::VideoMeta(const VideoTrack::Ptr &video){
if(video->getVideoWidth() > 0 ){
_metadata.set("width", video->getVideoWidth());
......@@ -146,6 +156,108 @@ RtmpPacket::Ptr RtmpPacket::create(){
#endif
}
void RtmpPacket::clear()
{
is_abs_stamp = false;
time_stamp = 0;
ts_field = 0;
body_size = 0;
buffer.clear();
}
bool RtmpPacket::isVideoKeyFrame() const
{
return type_id == MSG_VIDEO && (uint8_t)buffer[0] >> 4 == FLV_KEY_FRAME && (uint8_t)buffer[1] == 1;
}
bool RtmpPacket::isCfgFrame() const
{
switch (type_id) {
case MSG_VIDEO: return buffer[1] == 0;
case MSG_AUDIO: {
switch (getMediaType()) {
case FLV_CODEC_AAC: return buffer[1] == 0;
default: return false;
}
}
default: return false;
}
}
int RtmpPacket::getMediaType() const
{
switch (type_id) {
case MSG_VIDEO: return (uint8_t)buffer[0] & 0x0F;
case MSG_AUDIO: return (uint8_t)buffer[0] >> 4;
default: return 0;
}
}
int RtmpPacket::getAudioSampleRate() const
{
if (type_id != MSG_AUDIO) {
return 0;
}
int flvSampleRate = ((uint8_t)buffer[0] & 0x0C) >> 2;
const static int sampleRate[] = { 5512, 11025, 22050, 44100 };
return sampleRate[flvSampleRate];
}
int RtmpPacket::getAudioSampleBit() const
{
if (type_id != MSG_AUDIO) {
return 0;
}
int flvSampleBit = ((uint8_t)buffer[0] & 0x02) >> 1;
const static int sampleBit[] = { 8, 16 };
return sampleBit[flvSampleBit];
}
int RtmpPacket::getAudioChannel() const
{
if (type_id != MSG_AUDIO) {
return 0;
}
int flvStereoOrMono = (uint8_t)buffer[0] & 0x01;
const static int channel[] = { 1, 2 };
return channel[flvStereoOrMono];
}
RtmpPacket & RtmpPacket::operator=(const RtmpPacket &that)
{
is_abs_stamp = that.is_abs_stamp;
stream_index = that.stream_index;
body_size = that.body_size;
type_id = that.type_id;
ts_field = that.ts_field;
time_stamp = that.time_stamp;
return *this;
}
RtmpHandshake::RtmpHandshake(uint32_t _time, uint8_t *_random /*= nullptr*/)
{
_time = htonl(_time);
memcpy(time_stamp, &_time, 4);
if (!_random) {
random_generate((char *)random, sizeof(random));
}
else {
memcpy(random, _random, sizeof(random));
}
}
void RtmpHandshake::random_generate(char *bytes, int size)
{
static char cdata[] = { 0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x2d, 0x72,
0x74, 0x6d, 0x70, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x2d, 0x77, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x2d, 0x77, 0x69,
0x6e, 0x74, 0x65, 0x72, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x40, 0x31, 0x32, 0x36, 0x2e, 0x63, 0x6f, 0x6d };
for (int i = 0; i < size; i++) {
bytes[i] = cdata[rand() % (sizeof(cdata) - 1)];
}
}
}//namespace mediakit
namespace toolkit {
......
......@@ -14,11 +14,8 @@
#include <memory>
#include <string>
#include <cstdlib>
#include "Util/util.h"
#include "Util/logger.h"
#include "Network/Buffer.h"
#include "Network/sockutil.h"
#include "amf.h"
#include "Network/Buffer.h"
#include "Extension/Track.h"
#if !defined(_WIN32)
......@@ -86,30 +83,13 @@ namespace mediakit {
class RtmpHandshake {
public:
RtmpHandshake(uint32_t _time, uint8_t *_random = nullptr) {
_time = htonl(_time);
memcpy(time_stamp, &_time, 4);
if (!_random) {
random_generate((char *) random, sizeof(random));
} else {
memcpy(random, _random, sizeof(random));
}
}
RtmpHandshake(uint32_t _time, uint8_t *_random = nullptr);
uint8_t time_stamp[4];
uint8_t zero[4] = {0};
uint8_t random[RANDOM_LEN];
void random_generate(char *bytes, int size) {
static char cdata[] = {0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x2d, 0x72,
0x74, 0x6d, 0x70, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x2d, 0x77, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x2d, 0x77, 0x69,
0x6e, 0x74, 0x65, 0x72, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x40, 0x31, 0x32, 0x36, 0x2e, 0x63, 0x6f, 0x6d};
for (int i = 0; i < size; i++) {
bytes[i] = cdata[rand() % (sizeof(cdata) - 1)];
}
}
void random_generate(char *bytes, int size);
void create_complex_c0c1();
......@@ -196,65 +176,16 @@ public:
return buffer.size();
}
void clear(){
is_abs_stamp = false;
time_stamp = 0;
ts_field = 0;
body_size = 0;
buffer.clear();
}
bool isVideoKeyFrame() const {
return type_id == MSG_VIDEO && (uint8_t) buffer[0] >> 4 == FLV_KEY_FRAME && (uint8_t) buffer[1] == 1;
}
bool isCfgFrame() const {
switch (type_id){
case MSG_VIDEO : return buffer[1] == 0;
case MSG_AUDIO : {
switch (getMediaType()){
case FLV_CODEC_AAC : return buffer[1] == 0;
default : return false;
}
}
default : return false;
}
}
int getMediaType() const {
switch (type_id) {
case MSG_VIDEO : return (uint8_t) buffer[0] & 0x0F;
case MSG_AUDIO : return (uint8_t) buffer[0] >> 4;
default : return 0;
}
}
void clear();
int getAudioSampleRate() const {
if (type_id != MSG_AUDIO) {
return 0;
}
int flvSampleRate = ((uint8_t) buffer[0] & 0x0C) >> 2;
const static int sampleRate[] = { 5512, 11025, 22050, 44100 };
return sampleRate[flvSampleRate];
}
bool isVideoKeyFrame() const;
bool isCfgFrame() const;
int getAudioSampleBit() const {
if (type_id != MSG_AUDIO) {
return 0;
}
int flvSampleBit = ((uint8_t) buffer[0] & 0x02) >> 1;
const static int sampleBit[] = { 8, 16 };
return sampleBit[flvSampleBit];
}
int getMediaType() const;
int getAudioChannel() const {
if (type_id != MSG_AUDIO) {
return 0;
}
int flvStereoOrMono = (uint8_t) buffer[0] & 0x01;
const static int channel[] = { 1, 2 };
return channel[flvStereoOrMono];
}
int getAudioSampleRate() const;
int getAudioSampleBit() const;
int getAudioChannel() const;
private:
friend class toolkit::ResourcePool_l<RtmpPacket>;
......@@ -262,15 +193,7 @@ private:
clear();
}
RtmpPacket &operator=(const RtmpPacket &that) {
is_abs_stamp = that.is_abs_stamp;
stream_index = that.stream_index;
body_size = that.body_size;
type_id = that.type_id;
ts_field = that.ts_field;
time_stamp = that.time_stamp;
return *this;
}
RtmpPacket &operator=(const RtmpPacket &that);
private:
//对象个数统计
......@@ -304,14 +227,7 @@ public:
TitleMeta(float dur_sec = 0,
size_t fileSize = 0,
const std::map<std::string, std::string> &header = std::map<std::string, std::string>()){
_metadata.set("duration", dur_sec);
_metadata.set("fileSize", (int)fileSize);
_metadata.set("server",kServerName);
for (auto &pr : header){
_metadata.set(pr.first, pr.second);
}
}
const std::map<std::string, std::string> &header = std::map<std::string, std::string>());
CodecId getCodecId() const override{
return CodecInvalid;
......
......@@ -53,7 +53,7 @@ protected:
RingType::Ptr _ring;
};
class RtmpCodec : public RtmpRing, public FrameDispatcher , public CodecInfo{
class RtmpCodec : public RtmpRing, public FrameDispatcher, public CodecInfo {
public:
typedef std::shared_ptr<RtmpCodec> Ptr;
RtmpCodec() = default;
......
......@@ -7,7 +7,7 @@
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "RtmpCodec.h"
#include "RtmpDemuxer.h"
#include "Extension/Factory.h"
......
......@@ -15,12 +15,10 @@
#include <unordered_map>
#include "Rtmp/amf.h"
#include "Rtmp/Rtmp.h"
#include "Player/PlayerBase.h"
#include "Util/TimeTicker.h"
#include "RtmpCodec.h"
#include "Common/MediaSink.h"
namespace mediakit {
class RtmpCodec;
class RtmpDemuxer : public Demuxer {
public:
using Ptr = std::shared_ptr<RtmpDemuxer>;
......@@ -54,8 +52,8 @@ private:
float _duration = 0;
AudioTrack::Ptr _audio_track;
VideoTrack::Ptr _video_track;
RtmpCodec::Ptr _audio_rtmp_decoder;
RtmpCodec::Ptr _video_rtmp_decoder;
std::shared_ptr<RtmpCodec> _audio_rtmp_decoder;
std::shared_ptr<RtmpCodec> _video_rtmp_decoder;
};
} /* namespace mediakit */
......
......@@ -18,16 +18,9 @@
#include <unordered_map>
#include "amf.h"
#include "Rtmp.h"
#include "RtmpDemuxer.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Common/PacketCache.h"
#include "Util/RingBuffer.h"
#include "Util/TimeTicker.h"
#include "Util/ResourcePool.h"
#include "Util/NoticeCenter.h"
#include "Thread/ThreadPool.h"
#define RTMP_GOP_SIZE 512
......@@ -126,66 +119,12 @@ public:
* 输入rtmp包
* @param pkt rtmp包
*/
void onWrite(RtmpPacket::Ptr pkt, bool = true) override {
bool is_video = pkt->type_id == MSG_VIDEO;
_speed[is_video ? TrackVideo : TrackAudio] += pkt->size();
//保存当前时间戳
switch (pkt->type_id) {
case MSG_VIDEO : _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break;
case MSG_AUDIO : _track_stamps[TrackAudio] = pkt->time_stamp, _have_audio = true; break;
default : break;
}
if (pkt->isCfgFrame()) {
std::lock_guard<std::recursive_mutex> lock(_mtx);
_config_frame_map[pkt->type_id] = pkt;
if (!_ring) {
//注册后收到config帧更新到各播放器
return;
}
}
if (!_ring) {
std::weak_ptr<RtmpMediaSource> weakSelf = std::dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
//GOP默认缓冲512组RTMP包,每组RTMP包时间戳相同(如果开启合并写了,那么每组为合并写时间内的RTMP包),
//每次遇到关键帧第一个RTMP包,则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开)
_ring = std::make_shared<RingType>(_ring_size,std::move(lam));
if(_metadata){
regist();
}
}
bool key = pkt->isVideoKeyFrame();
auto stamp = pkt->time_stamp;
PacketCache<RtmpPacket>::inputPacket(stamp, is_video, std::move(pkt), key);
}
void onWrite(RtmpPacket::Ptr pkt, bool = true) override;
/**
* 获取当前时间戳
*/
uint32_t getTimeStamp(TrackType trackType) override {
assert(trackType >= TrackInvalid && trackType < TrackMax);
if (trackType != TrackInvalid) {
//获取某track的时间戳
return _track_stamps[trackType];
}
//获取所有track的最小时间戳
uint32_t ret = UINT32_MAX;
for (auto &stamp : _track_stamps) {
if (stamp > 0 && stamp < ret) {
ret = stamp;
}
}
return ret;
}
uint32_t getTimeStamp(TrackType trackType) override;
void clearCache() override{
PacketCache<RtmpPacket>::clearCache();
......
#include "RtmpDemuxer.h"
#include "RtmpMediaSourceImp.h"
namespace mediakit {
uint32_t RtmpMediaSource::getTimeStamp(TrackType trackType)
{
assert(trackType >= TrackInvalid && trackType < TrackMax);
if (trackType != TrackInvalid) {
//获取某track的时间戳
return _track_stamps[trackType];
}
//获取所有track的最小时间戳
uint32_t ret = UINT32_MAX;
for (auto &stamp : _track_stamps) {
if (stamp > 0 && stamp < ret) {
ret = stamp;
}
}
return ret;
}
void RtmpMediaSource::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/)
{
bool is_video = pkt->type_id == MSG_VIDEO;
_speed[is_video ? TrackVideo : TrackAudio] += pkt->size();
//保存当前时间戳
switch (pkt->type_id) {
case MSG_VIDEO: _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break;
case MSG_AUDIO: _track_stamps[TrackAudio] = pkt->time_stamp, _have_audio = true; break;
default: break;
}
if (pkt->isCfgFrame()) {
std::lock_guard<std::recursive_mutex> lock(_mtx);
_config_frame_map[pkt->type_id] = pkt;
if (!_ring) {
//注册后收到config帧更新到各播放器
return;
}
}
if (!_ring) {
std::weak_ptr<RtmpMediaSource> weakSelf = std::dynamic_pointer_cast<RtmpMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
//GOP默认缓冲512组RTMP包,每组RTMP包时间戳相同(如果开启合并写了,那么每组为合并写时间内的RTMP包),
//每次遇到关键帧第一个RTMP包,则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开)
_ring = std::make_shared<RingType>(_ring_size, std::move(lam));
if (_metadata) {
regist();
}
}
bool key = pkt->isVideoKeyFrame();
auto stamp = pkt->time_stamp;
PacketCache<RtmpPacket>::inputPacket(stamp, is_video, std::move(pkt), key);
}
RtmpMediaSourceImp::RtmpMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize) : RtmpMediaSource(vhost, app, id, ringSize)
{
_demuxer = std::make_shared<RtmpDemuxer>();
_demuxer->setTrackListener(this);
}
void RtmpMediaSourceImp::setMetaData(const AMFValue &metadata)
{
if (!_demuxer->loadMetaData(metadata)) {
//该metadata无效,需要重新生成
_metadata = metadata;
_recreate_metadata = true;
}
RtmpMediaSource::setMetaData(metadata);
}
void RtmpMediaSourceImp::onWrite(RtmpPacket::Ptr pkt, bool /*= true*/)
{
if (!_all_track_ready || _muxer->isEnabled()) {
//未获取到所有Track后,或者开启转协议,那么需要解复用rtmp
_demuxer->inputRtmp(pkt);
}
RtmpMediaSource::onWrite(std::move(pkt));
}
int RtmpMediaSourceImp::totalReaderCount()
{
return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0);
}
void RtmpMediaSourceImp::setProtocolOption(const ProtocolOption &option)
{
//不重复生成rtmp
_option = option;
//不重复生成rtmp协议
_option.enable_rtmp = false;
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), _option);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(std::static_pointer_cast<RtmpMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)
MediaSource::setListener(_muxer);
for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
}
bool RtmpMediaSourceImp::addTrack(const Track::Ptr &track)
{
if (_muxer) {
if (_muxer->addTrack(track)) {
track->addDelegate(_muxer);
return true;
}
}
return false;
}
void RtmpMediaSourceImp::addTrackCompleted()
{
if (_muxer) {
_muxer->addTrackCompleted();
}
}
void RtmpMediaSourceImp::resetTracks()
{
if (_muxer) {
_muxer->resetTracks();
}
}
void RtmpMediaSourceImp::onAllTrackReady()
{
_all_track_ready = true;
if (_recreate_metadata) {
//更新metadata
for (auto &track : _muxer->getTracks()) {
Metadata::addTrack(_metadata, track);
}
RtmpMediaSource::updateMetaData(_metadata);
}
}
void RtmpMediaSourceImp::setListener(const std::weak_ptr<MediaSourceEvent> &listener)
{
if (_muxer) {
//_muxer对象不能处理的事件再给listener处理
_muxer->setMediaListener(listener);
}
else {
//未创建_muxer对象,事件全部给listener处理
MediaSource::setListener(listener);
}
}
}
......@@ -16,16 +16,13 @@
#include <memory>
#include <functional>
#include <unordered_map>
#include "Util/util.h"
#include "Util/logger.h"
#include "amf.h"
#include "Rtmp.h"
#include "RtmpMediaSource.h"
#include "RtmpDemuxer.h"
#include "Common/MultiMediaSourceMuxer.h"
namespace mediakit {
class RtmpDemuxer;
class RtmpMediaSourceImp final : public RtmpMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener {
public:
using Ptr = std::shared_ptr<RtmpMediaSourceImp>;
......@@ -37,62 +34,29 @@ public:
* @param id 流id
* @param ringSize 环形缓存大小
*/
RtmpMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize = RTMP_GOP_SIZE) : RtmpMediaSource(vhost, app, id, ringSize) {
_demuxer = std::make_shared<RtmpDemuxer>();
_demuxer->setTrackListener(this);
}
RtmpMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize = RTMP_GOP_SIZE);
~RtmpMediaSourceImp() override = default;
/**
* 设置metadata
*/
void setMetaData(const AMFValue &metadata) override{
if(!_demuxer->loadMetaData(metadata)){
//该metadata无效,需要重新生成
_metadata = metadata;
_recreate_metadata = true;
}
RtmpMediaSource::setMetaData(metadata);
}
void setMetaData(const AMFValue &metadata) override;
/**
* 输入rtmp并解析
*/
void onWrite(RtmpPacket::Ptr pkt, bool = true) override {
if (!_all_track_ready || _muxer->isEnabled()) {
//未获取到所有Track后,或者开启转协议,那么需要解复用rtmp
_demuxer->inputRtmp(pkt);
}
RtmpMediaSource::onWrite(std::move(pkt));
}
void onWrite(RtmpPacket::Ptr pkt, bool = true) override;
/**
* 获取观看总人数,包括(hls/rtsp/rtmp)
*/
int totalReaderCount() override{
return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0);
}
int totalReaderCount() override;
/**
* 设置协议转换
*/
void setProtocolOption(const ProtocolOption &option) {
//不重复生成rtmp
_option = option;
//不重复生成rtmp协议
_option.enable_rtmp = false;
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), _option);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(std::static_pointer_cast<RtmpMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)
MediaSource::setListener(_muxer);
for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
}
void setProtocolOption(const ProtocolOption &option);
const ProtocolOption &getProtocolOption() const {
return _option;
......@@ -101,66 +65,32 @@ public:
/**
* _demuxer触发的添加Track事件
*/
bool addTrack(const Track::Ptr &track) override {
if (_muxer) {
if (_muxer->addTrack(track)) {
track->addDelegate(_muxer);
return true;
}
}
return false;
}
bool addTrack(const Track::Ptr &track) override;
/**
* _demuxer触发的Track添加完毕事件
*/
void addTrackCompleted() override {
if (_muxer) {
_muxer->addTrackCompleted();
}
}
void addTrackCompleted() override;
void resetTracks() override {
if (_muxer) {
_muxer->resetTracks();
}
}
void resetTracks() override;
/**
* _muxer触发的所有Track就绪的事件
*/
void onAllTrackReady() override{
_all_track_ready = true;
if (_recreate_metadata) {
//更新metadata
for (auto &track : _muxer->getTracks()) {
Metadata::addTrack(_metadata, track);
}
RtmpMediaSource::updateMetaData(_metadata);
}
}
void onAllTrackReady() override;
/**
* 设置事件监听器
* @param listener 监听器
*/
void setListener(const std::weak_ptr<MediaSourceEvent> &listener) override{
if (_muxer) {
//_muxer对象不能处理的事件再给listener处理
_muxer->setMediaListener(listener);
} else {
//未创建_muxer对象,事件全部给listener处理
MediaSource::setListener(listener);
}
}
void setListener(const std::weak_ptr<MediaSourceEvent> &listener) override;
private:
bool _all_track_ready = false;
bool _recreate_metadata = false;
ProtocolOption _option;
AMFValue _metadata;
RtmpDemuxer::Ptr _demuxer;
std::shared_ptr<RtmpDemuxer> _demuxer;
MultiMediaSourceMuxer::Ptr _muxer;
};
......
......@@ -13,6 +13,12 @@
#include "Util/util.h"
#include "Util/onceToken.h"
#include "Thread/ThreadPool.h"
#include "Common/config.h"
#include "Common/Parser.h"
#include "RtmpDemuxer.h"
#include "RtmpPlayerImp.h"
using namespace toolkit;
using namespace std;
......@@ -171,7 +177,7 @@ void RtmpPlayer::speed(float speed) {
//todo
}
inline void RtmpPlayer::send_connect() {
void RtmpPlayer::send_connect() {
AMFValue obj(AMF_OBJECT);
obj.set("app", _app);
obj.set("tcUrl", _tc_url);
......@@ -199,7 +205,7 @@ inline void RtmpPlayer::send_connect() {
});
}
inline void RtmpPlayer::send_createStream() {
void RtmpPlayer::send_createStream() {
AMFValue obj(AMF_NULL);
sendInvoke("createStream", obj);
addOnResultCB([this](AMFDecoder &dec) {
......@@ -210,7 +216,7 @@ inline void RtmpPlayer::send_createStream() {
});
}
inline void RtmpPlayer::send_play() {
void RtmpPlayer::send_play() {
AMFEncoder enc;
enc << "play" << ++_send_req_id << nullptr << _stream_id << -2000;
sendRequest(MSG_CMD, enc.data());
......@@ -226,7 +232,7 @@ inline void RtmpPlayer::send_play() {
addOnStatusCB(fun);
}
inline void RtmpPlayer::send_pause(bool pause) {
void RtmpPlayer::send_pause(bool pause) {
AMFEncoder enc;
enc << "pause" << ++_send_req_id << nullptr << pause;
sendRequest(MSG_CMD, enc.data());
......@@ -414,4 +420,49 @@ void RtmpPlayer::seekToMilliSecond(uint32_t seekMS){
});
}
////////////////////////////////////////////
float RtmpPlayerImp::getDuration() const
{
return _demuxer ? _demuxer->getDuration() : 0;
}
std::vector<mediakit::Track::Ptr> RtmpPlayerImp::getTracks(bool ready /*= true*/) const
{
return _demuxer ? _demuxer->getTracks(ready) : Super::getTracks(ready);
}
bool RtmpPlayerImp::onCheckMeta(const AMFValue &val)
{
//无metadata或metadata中无track信息时,需要从数据包中获取track
_wait_track_ready = (*this)[Client::kWaitTrackReady].as<bool>() || RtmpDemuxer::trackCount(val) == 0;
onCheckMeta_l(val);
return true;
}
void RtmpPlayerImp::onMediaData(RtmpPacket::Ptr chunkData)
{
if (!_demuxer) {
//有些rtmp流没metadata
onCheckMeta_l(TitleMeta().getMetadata());
}
_demuxer->inputRtmp(chunkData);
if (_rtmp_src) {
_rtmp_src->onWrite(std::move(chunkData));
}
}
void RtmpPlayerImp::onCheckMeta_l(const AMFValue &val)
{
_rtmp_src = std::dynamic_pointer_cast<RtmpMediaSource>(_media_src);
if (_rtmp_src) {
_rtmp_src->setMetaData(val);
}
if (_demuxer) {
return;
}
_demuxer = std::make_shared<RtmpDemuxer>();
//TraceL<<" _wait_track_ready "<<_wait_track_ready;
_demuxer->setTrackListener(this, _wait_track_ready);
_demuxer->loadMetaData(val);
}
} /* namespace mediakit */
......@@ -18,8 +18,6 @@
#include "Rtmp.h"
#include "RtmpProtocol.h"
#include "Player/PlayerBase.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Network/Socket.h"
#include "Network/TcpClient.h"
......
......@@ -13,15 +13,11 @@
#include <memory>
#include <functional>
#include "Common/config.h"
#include "RtmpPlayer.h"
#include "RtmpMediaSource.h"
#include "RtmpDemuxer.h"
#include "Poller/Timer.h"
#include "Util/TimeTicker.h"
namespace mediakit {
class RtmpDemuxer;
class RtmpPlayerImp: public PlayerImp<RtmpPlayer,PlayerBase>, private TrackListener {
public:
using Ptr = std::shared_ptr<RtmpPlayerImp>;
......@@ -50,33 +46,15 @@ public:
seekToMilliSecond(pos);
}
float getDuration() const override {
return _demuxer ? _demuxer->getDuration() : 0;
}
float getDuration() const override;
std::vector<Track::Ptr> getTracks(bool ready = true) const override {
return _demuxer ? _demuxer->getTracks(ready) : Super::getTracks(ready);
}
std::vector<Track::Ptr> getTracks(bool ready = true) const override;
private:
//派生类回调函数
bool onCheckMeta(const AMFValue &val) override {
//无metadata或metadata中无track信息时,需要从数据包中获取track
_wait_track_ready = (*this)[Client::kWaitTrackReady].as<bool>() || RtmpDemuxer::trackCount(val) == 0;
onCheckMeta_l(val);
return true;
}
bool onCheckMeta(const AMFValue &val) override;
void onMediaData(RtmpPacket::Ptr chunkData) override {
if (!_demuxer) {
//有些rtmp流没metadata
onCheckMeta_l(TitleMeta().getMetadata());
}
_demuxer->inputRtmp(chunkData);
if (_rtmp_src) {
_rtmp_src->onWrite(std::move(chunkData));
}
}
void onMediaData(RtmpPacket::Ptr chunkData) override;
void onPlayResult(const toolkit::SockException &ex) override {
if (!_wait_track_ready || ex) {
......@@ -94,23 +72,11 @@ private:
}
private:
void onCheckMeta_l(const AMFValue &val) {
_rtmp_src = std::dynamic_pointer_cast<RtmpMediaSource>(_media_src);
if (_rtmp_src) {
_rtmp_src->setMetaData(val);
}
if(_demuxer){
return;
}
_demuxer = std::make_shared<RtmpDemuxer>();
//TraceL<<" _wait_track_ready "<<_wait_track_ready;
_demuxer->setTrackListener(this, _wait_track_ready);
_demuxer->loadMetaData(val);
}
void onCheckMeta_l(const AMFValue &val);
private:
bool _wait_track_ready = true;
RtmpDemuxer::Ptr _demuxer;
std::shared_ptr<RtmpDemuxer> _demuxer;
RtmpMediaSource::Ptr _rtmp_src;
};
......
......@@ -12,6 +12,7 @@
#include "RtmpProtocol.h"
#include "Rtmp/utils.h"
#include "RtmpMediaSource.h"
#include "Util/util.h"
using namespace std;
using namespace toolkit;
......
......@@ -14,12 +14,9 @@
#include <memory>
#include <string>
#include <functional>
#include <unordered_map>
#include "amf.h"
#include "Rtmp.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Network/Socket.h"
#include "Util/ResourcePool.h"
#include "Http/HttpRequestSplitter.h"
......
......@@ -13,6 +13,8 @@
#include "Util/util.h"
#include "Util/onceToken.h"
#include "Thread/ThreadPool.h"
#include "Common/Parser.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......@@ -127,7 +129,7 @@ void RtmpPusher::onRecv(const Buffer::Ptr &buf){
}
}
inline void RtmpPusher::send_connect() {
void RtmpPusher::send_connect() {
AMFValue obj(AMF_OBJECT);
obj.set("app", _app);
obj.set("type", "nonprivate");
......@@ -147,7 +149,7 @@ inline void RtmpPusher::send_connect() {
});
}
inline void RtmpPusher::send_createStream() {
void RtmpPusher::send_createStream() {
AMFValue obj(AMF_NULL);
sendInvoke("createStream", obj);
addOnResultCB([this](AMFDecoder &dec) {
......@@ -159,7 +161,7 @@ inline void RtmpPusher::send_createStream() {
}
#define RTMP_STREAM_LIVE "live"
inline void RtmpPusher::send_publish() {
void RtmpPusher::send_publish() {
AMFEncoder enc;
enc << "publish" << ++_send_req_id << nullptr << _stream_id << RTMP_STREAM_LIVE;
sendRequest(MSG_CMD, enc.data());
......@@ -175,7 +177,7 @@ inline void RtmpPusher::send_publish() {
});
}
inline void RtmpPusher::send_metaData(){
void RtmpPusher::send_metaData(){
auto src = _publish_src.lock();
if (!src) {
throw std::runtime_error("the media source was released");
......
......@@ -15,13 +15,10 @@
#include "amf.h"
#include "Rtmp.h"
#include "utils.h"
#include "Common/config.h"
#include "RtmpProtocol.h"
#include "RtmpMediaSourceImp.h"
#include "Util/util.h"
#include "Util/TimeTicker.h"
#include "Network/Session.h"
#include "Common/Stamp.h"
namespace mediakit {
......
......@@ -15,7 +15,7 @@
#include "Util/util.h"
#include "Util/logger.h"
#include "Network/sockutil.h"
#include "Util/util.h"
#include "Network/Buffer.h"
using namespace std;
using namespace toolkit;
......
......@@ -14,12 +14,11 @@
#include <assert.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <map>
#include <stdexcept>
#include <functional>
#include "Network/Buffer.h"
namespace toolkit {
class BufferLikeString;
}
enum AMFType {
AMF_NUMBER,
AMF_INTEGER,
......
......@@ -18,6 +18,9 @@
#include "Extension/Opus.h"
#include "Http/HttpTSPlayer.h"
#include "Util/File.h"
#include "Common/config.h"
#include "Rtsp/RtpCodec.h"
#include "Rtsp/RtpReceiver.h"
using namespace std;
using namespace toolkit;
......
......@@ -15,12 +15,12 @@
#include "Decoder.h"
#include "ProcessInterface.h"
#include "Rtsp/RtpCodec.h"
#include "Rtsp/RtpReceiver.h"
#include "Http/HttpRequestSplitter.h"
// for MediaInfo
#include "Common/MediaSource.h"
namespace mediakit{
class RtpCodec;
class RtpReceiverImp;
class GB28181Process : public ProcessInterface {
public:
......
......@@ -11,7 +11,9 @@
#if defined(ENABLE_RTPPROXY)
#include "PSEncoder.h"
#include "Common/config.h"
#include "Extension/H264.h"
#include "Extension/CommonRtp.h"
#include "Rtsp/RtspMuxer.h"
using namespace toolkit;
......
......@@ -15,11 +15,9 @@
#include "Record/MPEG.h"
#include "Common/MediaSink.h"
#include "Common/Stamp.h"
#include "Extension/CommonRtp.h"
namespace mediakit{
class CommonRtpEncoder;
class PSEncoderImp : public MpegMuxer{
public:
PSEncoderImp(uint32_t ssrc, uint8_t payload_type = 96);
......
......@@ -14,11 +14,9 @@
#if defined(ENABLE_RTPPROXY)
#include "Common/MediaSink.h"
#include "Common/Stamp.h"
#include "Extension/CommonRtp.h"
namespace mediakit {
class RtpCodec;
class RawEncoderImp : public MediaSinkInterface {
public:
RawEncoderImp(uint32_t ssrc, uint8_t payload_type = 96, bool send_audio = true);
......@@ -44,13 +42,13 @@ protected:
virtual void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) = 0;
private:
RtpCodec::Ptr createRtpEncoder(const Track::Ptr &track);
std::shared_ptr<RtpCodec> createRtpEncoder(const Track::Ptr &track);
private:
bool _send_audio;
uint8_t _payload_type;
uint32_t _ssrc;
RtpCodec::Ptr _rtp_encoder;
std::shared_ptr<RtpCodec> _rtp_encoder;
};
} // namespace mediakit
......
......@@ -15,7 +15,7 @@
#include "PSEncoder.h"
#include "RawEncoder.h"
#include "Extension/CommonRtp.h"
#include "Common/PacketCache.h"
namespace mediakit{
......
......@@ -12,6 +12,8 @@
#include "GB28181Process.h"
#include "RtpProcess.h"
#include "Http/HttpTSPlayer.h"
#include "Util/File.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -14,6 +14,7 @@
#include "Thread/WorkThreadPool.h"
#include "Util/uv_errno.h"
#include "RtpCache.h"
#include "Rtcp/RtcpContext.h"
using namespace std;
using namespace toolkit;
......
......@@ -14,6 +14,8 @@
#include "PSEncoder.h"
#include "Extension/CommonRtp.h"
#include "Rtcp/RtcpContext.h"
#include "Common/MediaSource.h"
#include "Common/MediaSink.h"
namespace mediakit{
......
......@@ -13,6 +13,7 @@
#include "RtpServer.h"
#include "RtpSelector.h"
#include "Rtcp/RtcpContext.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -13,6 +13,7 @@
#include "RtpSelector.h"
#include "Network/TcpServer.h"
#include "Rtsp/RtpReceiver.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -12,8 +12,9 @@
#define ZLMEDIAKIT_RTPCODEC_H
#include <memory>
#include "Extension/Frame.h"
#include "Util/RingBuffer.h"
#include "Player/PlayerBase.h"
#include "Rtsp/Rtsp.h"
namespace mediakit {
......@@ -58,7 +59,7 @@ protected:
RingType::Ptr _ring;
};
class RtpInfo{
class RtpInfo {
public:
using Ptr = std::shared_ptr<RtpInfo>;
......
......@@ -14,6 +14,7 @@
#include "Util/util.h"
#include "Network/sockutil.h"
#include "RtspSession.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -15,9 +15,7 @@
#include <memory>
#include <unordered_set>
#include <unordered_map>
#include "Common/config.h"
#include "RtspMediaSource.h"
#include "Util/mini.h"
#include "Network/Socket.h"
namespace mediakit{
......
......@@ -14,8 +14,9 @@
#include <map>
#include <string>
#include <memory>
#include "RtpCodec.h"
#include "RtspMediaSource.h"
#include "Rtsp/Rtsp.h"
#include "Extension/Frame.h"
// for NtpStamp
#include "Common/Stamp.h"
namespace mediakit {
......@@ -166,7 +167,7 @@ private:
std::function<void(SEQ seq, T &packet)> _cb;
};
class RtpTrack : private PacketSortor<RtpPacket::Ptr>{
class RtpTrack : private PacketSortor<RtpPacket::Ptr> {
public:
class BadRtpException : public std::invalid_argument {
public:
......
......@@ -12,6 +12,8 @@
#include <cinttypes>
#include "Rtsp.h"
#include "Common/Parser.h"
#include "Common/config.h"
#include "Network/Socket.h"
using namespace std;
using namespace toolkit;
......@@ -589,6 +591,41 @@ RtpPacket::Ptr RtpPacket::create() {
#endif
}
/**
* 构造title类型sdp
* @param dur_sec rtsp点播时长,0代表直播,单位秒
* @param header 自定义sdp描述
* @param version sdp版本
*/
TitleSdp::TitleSdp(float dur_sec, const std::map<std::string, std::string>& header, int version) : Sdp(0, 0) {
_printer << "v=" << version << "\r\n";
if (!header.empty()) {
for (auto &pr : header) {
_printer << pr.first << "=" << pr.second << "\r\n";
}
}
else {
_printer << "o=- 0 0 IN IP4 0.0.0.0\r\n";
_printer << "s=Streamed by " << kServerName << "\r\n";
_printer << "c=IN IP4 0.0.0.0\r\n";
_printer << "t=0 0\r\n";
}
if (dur_sec <= 0) {
//直播
_printer << "a=range:npt=now-\r\n";
}
else {
//点播
_dur_sec = dur_sec;
_printer << "a=range:npt=0-" << dur_sec << "\r\n";
}
_printer << "a=control:*\r\n";
}
}//namespace mediakit
namespace toolkit {
......
......@@ -15,10 +15,11 @@
#include <string>
#include <memory>
#include <unordered_map>
#include "Util/util.h"
#include "Common/config.h"
#include "Common/macros.h"
#include "Extension/Frame.h"
namespace toolkit {
class Socket;
}
namespace mediakit {
......@@ -312,30 +313,7 @@ public:
*/
TitleSdp(float dur_sec = 0,
const std::map<std::string, std::string> &header = std::map<std::string, std::string>(),
int version = 0) : Sdp(0, 0) {
_printer << "v=" << version << "\r\n";
if (!header.empty()) {
for (auto &pr : header) {
_printer << pr.first << "=" << pr.second << "\r\n";
}
} else {
_printer << "o=- 0 0 IN IP4 0.0.0.0\r\n";
_printer << "s=Streamed by " << kServerName << "\r\n";
_printer << "c=IN IP4 0.0.0.0\r\n";
_printer << "t=0 0\r\n";
}
if (dur_sec <= 0) {
//直播
_printer << "a=range:npt=now-\r\n";
} else {
//点播
_dur_sec = dur_sec;
_printer << "a=range:npt=0-" << dur_sec << "\r\n";
}
_printer << "a=control:*\r\n";
}
int version = 0);
std::string getSdp() const override {
return _printer;
......@@ -357,7 +335,7 @@ private:
//创建rtp over tcp4个字节的头
toolkit::Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved);
//创建rtp-rtcp端口对
void makeSockPair(std::pair<toolkit::Socket::Ptr, toolkit::Socket::Ptr> &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true);
void makeSockPair(std::pair<std::shared_ptr<toolkit::Socket>, std::shared_ptr<toolkit::Socket>> &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true);
//十六进制方式打印ssrc
std::string printSSRC(uint32_t ui32Ssrc);
......
......@@ -10,6 +10,7 @@
#include <cctype>
#include <algorithm>
#include "RtpCodec.h"
#include "RtspDemuxer.h"
#include "Util/base64.h"
#include "Extension/Factory.h"
......
......@@ -12,12 +12,10 @@
#define SRC_RTP_RTSPDEMUXER_H_
#include <unordered_map>
#include "Player/PlayerBase.h"
#include "Util/TimeTicker.h"
#include "RtpCodec.h"
#include "Common/MediaSink.h"
namespace mediakit {
class RtpCodec;
class RtspDemuxer : public Demuxer {
public:
typedef std::shared_ptr<RtspDemuxer> Ptr;
......@@ -51,8 +49,8 @@ private:
float _duration = 0;
AudioTrack::Ptr _audio_track;
VideoTrack::Ptr _video_track;
RtpCodec::Ptr _audio_rtp_decoder;
RtpCodec::Ptr _video_rtp_decoder;
std::shared_ptr<RtpCodec> _audio_rtp_decoder;
std::shared_ptr<RtpCodec> _video_rtp_decoder;
};
} /* namespace mediakit */
......
......@@ -15,16 +15,9 @@
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "RtpCodec.h"
#include "Util/logger.h"
#include "Common/PacketCache.h"
#include "Util/RingBuffer.h"
#include "Util/TimeTicker.h"
#include "Util/ResourcePool.h"
#include "Util/NoticeCenter.h"
#include "Thread/ThreadPool.h"
#define RTP_GOP_SIZE 512
......@@ -38,7 +31,6 @@ namespace mediakit {
*/
class RtspMediaSource : public MediaSource, public toolkit::RingDelegate<RtpPacket::Ptr>, private PacketCache<RtpPacket> {
public:
using PoolType = toolkit::ResourcePool<RtpPacket>;
using Ptr = std::shared_ptr<RtspMediaSource>;
using RingDataType = std::shared_ptr<toolkit::List<RtpPacket::Ptr> >;
using RingType = toolkit::RingBuffer<RingDataType>;
......@@ -111,85 +103,24 @@ public:
/**
* 获取相应轨道的时间戳,单位毫秒
*/
uint32_t getTimeStamp(TrackType trackType) override {
assert(trackType >= TrackInvalid && trackType < TrackMax);
if (trackType != TrackInvalid) {
//获取某track的时间戳
auto &track = _tracks[trackType];
if (track) {
return track->_time_stamp;
}
}
//获取所有track的最小时间戳
uint32_t ret = UINT32_MAX;
for (auto &track : _tracks) {
if (track && track->_time_stamp < ret) {
ret = track->_time_stamp;
}
}
return ret;
}
uint32_t getTimeStamp(TrackType trackType) override;
/**
* 更新时间戳
*/
void setTimeStamp(uint32_t stamp) override {
for (auto &track : _tracks) {
if (track) {
track->_time_stamp = stamp;
}
}
}
void setTimeStamp(uint32_t stamp) override;
/**
* 设置sdp
*/
virtual void setSdp(const std::string &sdp) {
SdpParser sdp_parser(sdp);
_tracks[TrackVideo] = sdp_parser.getTrack(TrackVideo);
_tracks[TrackAudio] = sdp_parser.getTrack(TrackAudio);
_have_video = (bool) _tracks[TrackVideo];
_sdp = sdp_parser.toString();
if (_ring) {
regist();
}
}
virtual void setSdp(const std::string &sdp);
/**
* 输入rtp
* @param rtp rtp包
* @param keyPos 该包是否为关键帧的第一个包
*/
void onWrite(RtpPacket::Ptr rtp, bool keyPos) override {
_speed[rtp->type] += rtp->size();
assert(rtp->type >= 0 && rtp->type < TrackMax);
auto &track = _tracks[rtp->type];
auto stamp = rtp->getStampMS();
if (track) {
track->_seq = rtp->getSeq();
track->_time_stamp = rtp->getStamp() * uint64_t(1000) / rtp->sample_rate;
track->_ssrc = rtp->getSSRC();
}
if (!_ring) {
std::weak_ptr<RtspMediaSource> weakSelf = std::dynamic_pointer_cast<RtspMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
//GOP默认缓冲512组RTP包,每组RTP包时间戳相同(如果开启合并写了,那么每组为合并写时间内的RTP包),
//每次遇到关键帧第一个RTP包,则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开)
_ring = std::make_shared<RingType>(_ring_size, std::move(lam));
if (!_sdp.empty()) {
regist();
}
}
bool is_video = rtp->type == TrackVideo;
PacketCache<RtpPacket>::inputPacket(stamp, is_video, std::move(rtp), keyPos);
}
void onWrite(RtpPacket::Ptr rtp, bool keyPos) override;
void clearCache() override{
PacketCache<RtpPacket>::clearCache();
......
#include "RtspMediaSourceImp.h"
#include "RtspDemuxer.h"
#include "Common/config.h"
namespace mediakit {
void RtspMediaSource::setSdp(const std::string &sdp) {
SdpParser sdp_parser(sdp);
_tracks[TrackVideo] = sdp_parser.getTrack(TrackVideo);
_tracks[TrackAudio] = sdp_parser.getTrack(TrackAudio);
_have_video = (bool)_tracks[TrackVideo];
_sdp = sdp_parser.toString();
if (_ring) {
regist();
}
}
uint32_t RtspMediaSource::getTimeStamp(TrackType trackType) {
assert(trackType >= TrackInvalid && trackType < TrackMax);
if (trackType != TrackInvalid) {
//获取某track的时间戳
auto &track = _tracks[trackType];
if (track) {
return track->_time_stamp;
}
}
//获取所有track的最小时间戳
uint32_t ret = UINT32_MAX;
for (auto &track : _tracks) {
if (track && track->_time_stamp < ret) {
ret = track->_time_stamp;
}
}
return ret;
}
/**
* 更新时间戳
*/
void RtspMediaSource::setTimeStamp(uint32_t stamp) {
for (auto &track : _tracks) {
if (track) {
track->_time_stamp = stamp;
}
}
}
void RtspMediaSource::onWrite(RtpPacket::Ptr rtp, bool keyPos) {
_speed[rtp->type] += rtp->size();
assert(rtp->type >= 0 && rtp->type < TrackMax);
auto &track = _tracks[rtp->type];
auto stamp = rtp->getStampMS();
if (track) {
track->_seq = rtp->getSeq();
track->_time_stamp = rtp->getStamp() * uint64_t(1000) / rtp->sample_rate;
track->_ssrc = rtp->getSSRC();
}
if (!_ring) {
std::weak_ptr<RtspMediaSource> weakSelf = std::dynamic_pointer_cast<RtspMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
//GOP默认缓冲512组RTP包,每组RTP包时间戳相同(如果开启合并写了,那么每组为合并写时间内的RTP包),
//每次遇到关键帧第一个RTP包,则会清空GOP缓存(因为有新的关键帧了,同样可以实现秒开)
_ring = std::make_shared<RingType>(_ring_size, std::move(lam));
if (!_sdp.empty()) {
regist();
}
}
bool is_video = rtp->type == TrackVideo;
PacketCache<RtpPacket>::inputPacket(stamp, is_video, std::move(rtp), keyPos);
}
RtspMediaSourceImp::RtspMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize)
: RtspMediaSource(vhost, app, id, ringSize)
{
_demuxer = std::make_shared<RtspDemuxer>();
_demuxer->setTrackListener(this);
}
void RtspMediaSourceImp::setSdp(const std::string &strSdp)
{
if (!getSdp().empty()) {
return;
}
_demuxer->loadSdp(strSdp);
RtspMediaSource::setSdp(strSdp);
}
void RtspMediaSourceImp::onWrite(RtpPacket::Ptr rtp, bool key_pos)
{
if (_all_track_ready && !_muxer->isEnabled()) {
//获取到所有Track后,并且未开启转协议,那么不需要解复用rtp
//在关闭rtp解复用后,无法知道是否为关键帧,这样会导致无法秒开,或者开播花屏
key_pos = rtp->type == TrackVideo;
} else {
//需要解复用rtp
key_pos = _demuxer->inputRtp(rtp);
}
GET_CONFIG(bool, directProxy, Rtsp::kDirectProxy);
if (directProxy) {
//直接代理模式才直接使用原始rtp
RtspMediaSource::onWrite(std::move(rtp), key_pos);
}
}
void RtspMediaSourceImp::setProtocolOption(const ProtocolOption &option)
{
GET_CONFIG(bool, direct_proxy, Rtsp::kDirectProxy);
//开启直接代理模式时,rtsp直接代理,不重复产生;但是有些rtsp推流端,由于sdp中已有sps pps,rtp中就不再包括sps pps,
//导致rtc无法播放,所以在rtsp推流rtc播放时,建议关闭直接代理模式
_option = option;
_option.enable_rtsp = !direct_proxy;
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), _option);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(std::static_pointer_cast<RtspMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)
MediaSource::setListener(_muxer);
for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
}
}
......@@ -11,12 +11,11 @@
#ifndef SRC_RTSP_RTSPTORTMPMEDIASOURCE_H_
#define SRC_RTSP_RTSPTORTMPMEDIASOURCE_H_
#include "Rtmp/amf.h"
#include "RtspMediaSource.h"
#include "RtspDemuxer.h"
#include "Common/MultiMediaSourceMuxer.h"
namespace mediakit {
class RtspDemuxer;
class RtspMediaSourceImp final : public RtspMediaSource, private TrackListener, public MultiMediaSourceMuxer::Listener {
public:
using Ptr = std::shared_ptr<RtspMediaSourceImp>;
......@@ -28,70 +27,31 @@ public:
* @param id 流id
* @param ringSize 环形缓存大小
*/
RtspMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize = RTP_GOP_SIZE) : RtspMediaSource(vhost, app, id,ringSize) {
_demuxer = std::make_shared<RtspDemuxer>();
_demuxer->setTrackListener(this);
}
RtspMediaSourceImp(const std::string &vhost, const std::string &app, const std::string &id, int ringSize = RTP_GOP_SIZE);
~RtspMediaSourceImp() override = default;
/**
* 设置sdp
*/
void setSdp(const std::string &strSdp) override {
if (!getSdp().empty()) {
return;
}
_demuxer->loadSdp(strSdp);
RtspMediaSource::setSdp(strSdp);
}
void setSdp(const std::string &strSdp) override;
/**
* 输入rtp并解析
*/
void onWrite(RtpPacket::Ptr rtp, bool key_pos) override {
if (_all_track_ready && !_muxer->isEnabled()) {
//获取到所有Track后,并且未开启转协议,那么不需要解复用rtp
//在关闭rtp解复用后,无法知道是否为关键帧,这样会导致无法秒开,或者开播花屏
key_pos = rtp->type == TrackVideo;
} else {
//需要解复用rtp
key_pos = _demuxer->inputRtp(rtp);
}
GET_CONFIG(bool, directProxy, Rtsp::kDirectProxy);
if (directProxy) {
//直接代理模式才直接使用原始rtp
RtspMediaSource::onWrite(std::move(rtp), key_pos);
}
}
void onWrite(RtpPacket::Ptr rtp, bool key_pos) override;
/**
* 获取观看总人数,包括(hls/rtsp/rtmp)
*/
int totalReaderCount() override{
int totalReaderCount() override {
return readerCount() + (_muxer ? _muxer->totalReaderCount() : 0);
}
/**
* 设置协议转换选项
*/
void setProtocolOption(const ProtocolOption &option) {
GET_CONFIG(bool, direct_proxy, Rtsp::kDirectProxy);
//开启直接代理模式时,rtsp直接代理,不重复产生;但是有些rtsp推流端,由于sdp中已有sps pps,rtp中就不再包括sps pps,
//导致rtc无法播放,所以在rtsp推流rtc播放时,建议关闭直接代理模式
_option = option;
_option.enable_rtsp = !direct_proxy;
_muxer = std::make_shared<MultiMediaSourceMuxer>(getVhost(), getApp(), getId(), _demuxer->getDuration(), _option);
_muxer->setMediaListener(getListener());
_muxer->setTrackListener(std::static_pointer_cast<RtspMediaSourceImp>(shared_from_this()));
//让_muxer对象拦截一部分事件(比如说录像相关事件)
MediaSource::setListener(_muxer);
for (auto &track : _demuxer->getTracks(false)) {
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
}
void setProtocolOption(const ProtocolOption &option);
const ProtocolOption &getProtocolOption() const {
return _option;
......@@ -149,7 +109,7 @@ public:
private:
bool _all_track_ready = false;
ProtocolOption _option;
RtspDemuxer::Ptr _demuxer;
std::shared_ptr<RtspDemuxer> _demuxer;
MultiMediaSourceMuxer::Ptr _muxer;
};
} /* namespace mediakit */
......
......@@ -17,6 +17,11 @@
#include "Util/MD5.h"
#include "Util/base64.h"
#include "Rtcp/Rtcp.h"
#include "Rtcp/RtcpContext.h"
#include "RtspMediaSource.h"
#include "RtspDemuxer.h"
#include "RtspPlayerImp.h"
using namespace toolkit;
using namespace std;
......@@ -745,4 +750,49 @@ int RtspPlayer::getTrackIndexByTrackType(TrackType track_type) const {
throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << getTrackString(track_type));
}
///////////////////////////////////////////////////
// RtspPlayerImp
float RtspPlayerImp::getDuration() const
{
return _demuxer ? _demuxer->getDuration() : 0;
}
void RtspPlayerImp::onPlayResult(const toolkit::SockException &ex) {
if (!(*this)[Client::kWaitTrackReady].as<bool>() || ex) {
Super::onPlayResult(ex);
return;
}
}
void RtspPlayerImp::addTrackCompleted() {
if ((*this)[Client::kWaitTrackReady].as<bool>()) {
Super::onPlayResult(toolkit::SockException(toolkit::Err_success, "play success"));
}
}
std::vector<Track::Ptr> RtspPlayerImp::getTracks(bool ready /*= true*/) const
{
return _demuxer ? _demuxer->getTracks(ready) : Super::getTracks(ready);
}
bool RtspPlayerImp::onCheckSDP(const std::string &sdp)
{
_rtsp_media_src = std::dynamic_pointer_cast<RtspMediaSource>(_media_src);
if (_rtsp_media_src) {
_rtsp_media_src->setSdp(sdp);
}
_demuxer = std::make_shared<RtspDemuxer>();
_demuxer->setTrackListener(this, (*this)[Client::kWaitTrackReady].as<bool>());
_demuxer->loadSdp(sdp);
return true;
}
void RtspPlayerImp::onRecvRTP(RtpPacket::Ptr rtp, const SdpTrack::Ptr &track) {
//rtp解复用时可以判断是否为关键帧起始位置
auto key_pos = _demuxer->inputRtp(rtp);
if (_rtsp_media_src) {
_rtsp_media_src->onWrite(std::move(rtp), key_pos);
}
}
} /* namespace mediakit */
......@@ -13,22 +13,16 @@
#include <string>
#include <memory>
#include "RtspSession.h"
#include "RtspMediaSource.h"
#include "Player/PlayerBase.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
#include "Network/Socket.h"
#include "Player/PlayerBase.h"
#include "Network/TcpClient.h"
#include "RtspSplitter.h"
#include "RtpReceiver.h"
#include "Common/Stamp.h"
#include "Rtcp/RtcpContext.h"
namespace mediakit {
class RtcpContext;
//实现了rtsp播放器协议部分的功能,及数据接收功能
class RtspPlayer : public PlayerBase, public toolkit::TcpClient, public RtspSplitter, public RtpReceiver {
public:
......@@ -149,7 +143,7 @@ private:
//rtcp发送时间,trackid idx 为数组下标
toolkit::Ticker _rtcp_send_ticker[2];
//统计rtp并发送rtcp
std::vector<RtcpContext::Ptr> _rtcp_context;
std::vector<std::shared_ptr<RtcpContext>> _rtcp_context;
};
} /* namespace mediakit */
......
......@@ -14,14 +14,11 @@
#include <memory>
#include <algorithm>
#include <functional>
#include "Common/config.h"
#include "RtspPlayer.h"
#include "RtspDemuxer.h"
#include "Poller/Timer.h"
#include "Util/TimeTicker.h"
namespace mediakit {
class RtspDemuxer;
class RtspMediaSource;
class RtspPlayerImp : public PlayerImp<RtspPlayer, PlayerBase> ,private TrackListener {
public:
using Ptr = std::shared_ptr<RtspPlayerImp>;
......@@ -57,53 +54,25 @@ public:
seekToMilliSecond(pos);
}
float getDuration() const override {
return _demuxer ? _demuxer->getDuration() : 0;
}
float getDuration() const override;
std::vector<Track::Ptr> getTracks(bool ready = true) const override {
return _demuxer ? _demuxer->getTracks(ready) : Super::getTracks(ready);
}
std::vector<Track::Ptr> getTracks(bool ready = true) const override;
private:
//派生类回调函数
bool onCheckSDP(const std::string &sdp) override {
_rtsp_media_src = std::dynamic_pointer_cast<RtspMediaSource>(_media_src);
if (_rtsp_media_src) {
_rtsp_media_src->setSdp(sdp);
}
_demuxer = std::make_shared<RtspDemuxer>();
_demuxer->setTrackListener(this, (*this)[Client::kWaitTrackReady].as<bool>());
_demuxer->loadSdp(sdp);
return true;
}
bool onCheckSDP(const std::string &sdp) override;
void onRecvRTP(RtpPacket::Ptr rtp, const SdpTrack::Ptr &track) override {
//rtp解复用时可以判断是否为关键帧起始位置
auto key_pos = _demuxer->inputRtp(rtp);
if (_rtsp_media_src) {
_rtsp_media_src->onWrite(std::move(rtp), key_pos);
}
}
void onRecvRTP(RtpPacket::Ptr rtp, const SdpTrack::Ptr &track) override;
void onPlayResult(const toolkit::SockException &ex) override {
if (!(*this)[Client::kWaitTrackReady].as<bool>() || ex) {
Super::onPlayResult(ex);
return;
}
}
void onPlayResult(const toolkit::SockException &ex) override;
bool addTrack(const Track::Ptr &track) override { return true; }
void addTrackCompleted() override {
if ((*this)[Client::kWaitTrackReady].as<bool>()) {
Super::onPlayResult(toolkit::SockException(toolkit::Err_success, "play success"));
}
}
void addTrackCompleted() override;
private:
RtspDemuxer::Ptr _demuxer;
RtspMediaSource::Ptr _rtsp_media_src;
std::shared_ptr<RtspDemuxer> _demuxer;
std::shared_ptr<RtspMediaSource> _rtsp_media_src;
};
} /* namespace mediakit */
......
......@@ -12,6 +12,8 @@
#include "Util/base64.h"
#include "RtspPusher.h"
#include "RtspSession.h"
#include "Rtcp/RtcpContext.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;
......
......@@ -14,17 +14,14 @@
#include <string>
#include <memory>
#include "RtspMediaSource.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Poller/Timer.h"
#include "Network/Socket.h"
#include "Network/TcpClient.h"
#include "RtspSplitter.h"
#include "Pusher/PusherBase.h"
#include "Rtcp/RtcpContext.h"
namespace mediakit {
class RtcpContext;
class RtspPusher : public toolkit::TcpClient, public RtspSplitter, public PusherBase {
public:
typedef std::shared_ptr<RtspPusher> Ptr;
......@@ -96,7 +93,7 @@ private:
//rtcp发送时间,trackid idx 为数组下标
toolkit::Ticker _rtcp_send_ticker[2];
//统计rtp并发送rtcp
std::vector<RtcpContext::Ptr> _rtcp_context;
std::vector<std::shared_ptr<RtcpContext>> _rtcp_context;
};
using RtspPusherImp = PusherImp<RtspPusher, PusherBase>;
......
......@@ -15,6 +15,8 @@
#include "RtspSession.h"
#include "Util/MD5.h"
#include "Util/base64.h"
#include "RtpMultiCaster.h"
#include "Rtcp/RtcpContext.h"
using namespace std;
using namespace toolkit;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论