Commit b1666eb6 by xiongziliang

实现媒体源pause/speed接口:#1129

parent 507eadf2
...@@ -269,6 +269,7 @@ bool HttpSession::checkLiveStreamFMP4(const function<void()> &cb){ ...@@ -269,6 +269,7 @@ bool HttpSession::checkLiveStreamFMP4(const function<void()> &cb){
setSocketFlags(); setSocketFlags();
onWrite(std::make_shared<BufferString>(fmp4_src->getInitSegment()), true); onWrite(std::make_shared<BufferString>(fmp4_src->getInitSegment()), true);
weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
fmp4_src->pause(false);
_fmp4_reader = fmp4_src->getRing()->attach(getPoller()); _fmp4_reader = fmp4_src->getRing()->attach(getPoller());
_fmp4_reader->setDetachCB([weak_self]() { _fmp4_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
...@@ -309,6 +310,7 @@ bool HttpSession::checkLiveStreamTS(const function<void()> &cb){ ...@@ -309,6 +310,7 @@ bool HttpSession::checkLiveStreamTS(const function<void()> &cb){
//直播牺牲延时提升发送性能 //直播牺牲延时提升发送性能
setSocketFlags(); setSocketFlags();
weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
ts_src->pause(false);
_ts_reader = ts_src->getRing()->attach(getPoller()); _ts_reader = ts_src->getRing()->attach(getPoller());
_ts_reader->setDetachCB([weak_self](){ _ts_reader->setDetachCB([weak_self](){
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
......
...@@ -47,6 +47,12 @@ MP4Reader::MP4Reader(const string &strVhost,const string &strApp, const string & ...@@ -47,6 +47,12 @@ MP4Reader::MP4Reader(const string &strVhost,const string &strApp, const string &
} }
bool MP4Reader::readSample() { bool MP4Reader::readSample() {
if (_paused) {
//确保暂停时,时间轴不走动
_seek_ticker.resetTime();
return true;
}
bool keyFrame = false; bool keyFrame = false;
bool eof = false; bool eof = false;
while (!eof) { while (!eof) {
...@@ -89,20 +95,53 @@ void MP4Reader::startReadMP4() { ...@@ -89,20 +95,53 @@ void MP4Reader::startReadMP4() {
} }
uint32_t MP4Reader::getCurrentStamp() { uint32_t MP4Reader::getCurrentStamp() {
return (uint32_t)(_seek_to + _seek_ticker.elapsedTime()); return (uint32_t)(_seek_to + !_paused * _speed * _seek_ticker.elapsedTime());
} }
void MP4Reader::setCurrentStamp(uint32_t ui32Stamp){ void MP4Reader::setCurrentStamp(uint32_t new_stamp){
_seek_to = ui32Stamp; auto old_stamp = getCurrentStamp();
_seek_to = new_stamp;
_seek_ticker.resetTime(); _seek_ticker.resetTime();
_mediaMuxer->setTimeStamp(ui32Stamp); if (old_stamp != new_stamp) {
//时间轴未拖动时不操作
_mediaMuxer->setTimeStamp(new_stamp);
}
}
bool MP4Reader::seekTo(MediaSource &sender, uint32_t stamp) {
//拖动进度条后应该恢复播放
pause(sender, false);
TraceL << getOriginUrl(sender) << ",stamp:" << stamp;
return seekTo(stamp);
}
bool MP4Reader::pause(MediaSource &sender, bool pause) {
if (_paused == pause) {
return true;
}
//_seek_ticker重新计时,不管是暂停还是seek都不影响总的播放进度
setCurrentStamp(getCurrentStamp());
_paused = pause;
TraceL << getOriginUrl(sender) << ",pause:" << pause;
return true;
} }
bool MP4Reader::seekTo(MediaSource &sender,uint32_t ui32Stamp){ bool MP4Reader::speed(MediaSource &sender, float speed) {
return seekTo(ui32Stamp); if (speed < 0.1 && speed > 20) {
WarnL << "播放速度取值范围非法:" << speed;
return false;
}
//设置播放速度后应该恢复播放
pause(sender, false);
if (_speed == speed) {
return true;
}
_speed = speed;
TraceL << getOriginUrl(sender) << ",speed:" << speed;
return true;
} }
bool MP4Reader::seekTo(uint32_t ui32Stamp){ bool MP4Reader::seekTo(uint32_t ui32Stamp) {
lock_guard<recursive_mutex> lck(_mtx); lock_guard<recursive_mutex> lck(_mtx);
if (ui32Stamp > _demuxer->getDurationMS()) { if (ui32Stamp > _demuxer->getDurationMS()) {
//超过文件长度 //超过文件长度
......
...@@ -38,7 +38,10 @@ public: ...@@ -38,7 +38,10 @@ public:
private: private:
//MediaSourceEvent override //MediaSourceEvent override
bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override; bool seekTo(MediaSource &sender,uint32_t stamp) override;
bool pause(MediaSource &sender, bool pause) override;
bool speed(MediaSource &sender, float speed) override;
bool close(MediaSource &sender,bool force) override; bool close(MediaSource &sender,bool force) override;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
MediaOriginType getOriginType(MediaSource &sender) const override; MediaOriginType getOriginType(MediaSource &sender) const override;
...@@ -51,6 +54,8 @@ private: ...@@ -51,6 +54,8 @@ private:
private: private:
bool _have_video = false; bool _have_video = false;
bool _paused = false;
float _speed = 1.0;
uint32_t _seek_to; uint32_t _seek_to;
string _file_path; string _file_path;
recursive_mutex _mtx; recursive_mutex _mtx;
......
...@@ -36,6 +36,7 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr ...@@ -36,6 +36,7 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr
onWriteFlvHeader(media); onWriteFlvHeader(media);
std::weak_ptr<FlvMuxer> weakSelf = getSharedPtr(); std::weak_ptr<FlvMuxer> weakSelf = getSharedPtr();
media->pause(false);
_ring_reader = media->getRing()->attach(poller); _ring_reader = media->getRing()->attach(poller);
_ring_reader->setDetachCB([weakSelf]() { _ring_reader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
......
...@@ -204,6 +204,7 @@ inline void RtmpPusher::send_metaData(){ ...@@ -204,6 +204,7 @@ inline void RtmpPusher::send_metaData(){
sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id); sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id);
}); });
src->pause(false);
_rtmp_reader = src->getRing()->attach(getPoller()); _rtmp_reader = src->getRing()->attach(getPoller());
weak_ptr<RtmpPusher> weak_self = dynamic_pointer_cast<RtmpPusher>(shared_from_this()); weak_ptr<RtmpPusher> weak_self = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
_rtmp_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) { _rtmp_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) {
......
...@@ -268,6 +268,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -268,6 +268,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
//音频同步于视频 //音频同步于视频
_stamp[0].syncTo(_stamp[1]); _stamp[0].syncTo(_stamp[1]);
src->pause(false);
_ring_reader = src->getRing()->attach(getPoller()); _ring_reader = src->getRing()->attach(getPoller());
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this()); weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
_ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { _ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) {
...@@ -275,9 +276,6 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -275,9 +276,6 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
if(strongSelf->_paused){
return;
}
size_t i = 0; size_t i = 0;
auto size = pkt->size(); auto size = pkt->size();
strongSelf->setSendFlushFlag(false); strongSelf->setSendFlushFlag(false);
...@@ -295,10 +293,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr ...@@ -295,10 +293,8 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
} }
strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached"));
}); });
src->pause(false);
_player_src = src; _player_src = src;
if (src->totalReaderCount() == 1) {
src->seekTo(0);
}
//提高服务器发送性能 //提高服务器发送性能
setSocketFlags(); setSocketFlags();
} }
...@@ -411,8 +407,6 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) { ...@@ -411,8 +407,6 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) {
sendReply("onStatus", nullptr, status); sendReply("onStatus", nullptr, status);
//streamBegin //streamBegin
sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA); sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA);
_paused = paused;
auto strongSrc = _player_src.lock(); auto strongSrc = _player_src.lock();
if (strongSrc) { if (strongSrc) {
strongSrc->pause(paused); strongSrc->pause(paused);
......
...@@ -87,7 +87,6 @@ private: ...@@ -87,7 +87,6 @@ private:
void dumpMetadata(const AMFValue &metadata); void dumpMetadata(const AMFValue &metadata);
private: private:
bool _paused = false;
bool _set_meta_data = false; bool _set_meta_data = false;
double _recv_req_id = 0; double _recv_req_id = 0;
//消耗的总流量 //消耗的总流量
......
...@@ -132,6 +132,7 @@ RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, con ...@@ -132,6 +132,7 @@ RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, con
_udp_sock[i]->bindPeerAddr((struct sockaddr *) &peer); _udp_sock[i]->bindPeerAddr((struct sockaddr *) &peer);
} }
src->pause(false);
_rtp_reader = src->getRing()->attach(helper.getPoller()); _rtp_reader = src->getRing()->attach(helper.getPoller());
_rtp_reader->setReadCB([this](const RtspMediaSource::RingDataType &pkt) { _rtp_reader->setReadCB([this](const RtspMediaSource::RingDataType &pkt) {
size_t i = 0; size_t i = 0;
......
...@@ -334,7 +334,7 @@ private: ...@@ -334,7 +334,7 @@ private:
*/ */
class TitleSdp : public Sdp{ class TitleSdp : public Sdp{
public: public:
using Ptr = std::shared_ptr<TitleSdp>;
/** /**
* 构造title类型sdp * 构造title类型sdp
* @param dur_sec rtsp点播时长,0代表直播,单位秒 * @param dur_sec rtsp点播时长,0代表直播,单位秒
...@@ -342,12 +342,12 @@ public: ...@@ -342,12 +342,12 @@ public:
* @param version sdp版本 * @param version sdp版本
*/ */
TitleSdp(float dur_sec = 0, TitleSdp(float dur_sec = 0,
const map<string,string> &header = map<string,string>(), const map<string, string> &header = map<string, string>(),
int version = 0) : Sdp(0,0){ int version = 0) : Sdp(0, 0) {
_printer << "v=" << version << "\r\n"; _printer << "v=" << version << "\r\n";
if(!header.empty()){ if (!header.empty()) {
for (auto &pr : header){ for (auto &pr : header) {
_printer << pr.first << "=" << pr.second << "\r\n"; _printer << pr.first << "=" << pr.second << "\r\n";
} }
} else { } else {
...@@ -357,23 +357,31 @@ public: ...@@ -357,23 +357,31 @@ public:
_printer << "t=0 0\r\n"; _printer << "t=0 0\r\n";
} }
if(dur_sec <= 0){ if (dur_sec <= 0) {
//直播 //直播
_printer << "a=range:npt=now-\r\n"; _printer << "a=range:npt=now-\r\n";
}else{ } else {
//点播 //点播
_printer << "a=range:npt=0-" << dur_sec << "\r\n"; _dur_sec = dur_sec;
_printer << "a=range:npt=0-" << dur_sec << "\r\n";
} }
_printer << "a=control:*\r\n"; _printer << "a=control:*\r\n";
} }
string getSdp() const override { string getSdp() const override {
return _printer; return _printer;
} }
CodecId getCodecId() const override{ CodecId getCodecId() const override {
return CodecInvalid; return CodecInvalid;
} }
float getDuration() const {
return _dur_sec;
}
private: private:
float _dur_sec = 0;
_StrPrinter _printer; _StrPrinter _printer;
}; };
......
...@@ -14,18 +14,23 @@ ...@@ -14,18 +14,23 @@
namespace mediakit { namespace mediakit {
void RtspMuxer::onRtp(RtpPacket::Ptr in, bool is_key) { void RtspMuxer::onRtp(RtpPacket::Ptr in, bool is_key) {
if (_rtp_stamp[in->type] != in->getHeader()->stamp) { if (_live) {
//rtp时间戳变化才计算ntp,节省cpu资源 if (_rtp_stamp[in->type] != in->getHeader()->stamp) {
int64_t stamp_ms = in->getStamp() * uint64_t(1000) / in->sample_rate; //rtp时间戳变化才计算ntp,节省cpu资源
int64_t stamp_ms_inc; int64_t stamp_ms = in->getStamp() * uint64_t(1000) / in->sample_rate;
//求rtp时间戳增量 int64_t stamp_ms_inc;
_stamp[in->type].revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc); //求rtp时间戳增量
_rtp_stamp[in->type] = in->getHeader()->stamp; _stamp[in->type].revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc);
_ntp_stamp[in->type] = stamp_ms_inc + _ntp_stamp_start; _rtp_stamp[in->type] = in->getHeader()->stamp;
_ntp_stamp[in->type] = stamp_ms_inc + _ntp_stamp_start;
}
//rtp拦截入口,此处统一赋值ntp
in->ntp_stamp = _ntp_stamp[in->type];
} else {
//点播情况下设置ntp时间戳为rtp时间戳
in->ntp_stamp = in->getStamp() * uint64_t(1000) / in->sample_rate;
} }
//rtp拦截入口,此处统一赋值ntp
in->ntp_stamp = _ntp_stamp[in->type];
_rtpRing->write(std::move(in), is_key); _rtpRing->write(std::move(in), is_key);
} }
...@@ -33,6 +38,7 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title) { ...@@ -33,6 +38,7 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title) {
if (!title) { if (!title) {
_sdp = std::make_shared<TitleSdp>()->getSdp(); _sdp = std::make_shared<TitleSdp>()->getSdp();
} else { } else {
_live = title->getDuration() == 0;
_sdp = title->getSdp(); _sdp = title->getSdp();
} }
_rtpRing = std::make_shared<RtpRing::RingType>(); _rtpRing = std::make_shared<RtpRing::RingType>();
......
...@@ -81,6 +81,7 @@ private: ...@@ -81,6 +81,7 @@ private:
void trySyncTrack(); void trySyncTrack();
private: private:
bool _live = true;
uint32_t _rtp_stamp[TrackMax]{0}; uint32_t _rtp_stamp[TrackMax]{0};
uint64_t _ntp_stamp[TrackMax]{0}; uint64_t _ntp_stamp[TrackMax]{0};
uint64_t _ntp_stamp_start; uint64_t _ntp_stamp_start;
......
...@@ -452,6 +452,7 @@ void RtspPusher::sendRecord() { ...@@ -452,6 +452,7 @@ void RtspPusher::sendRecord() {
throw std::runtime_error("the media source was released"); throw std::runtime_error("the media source was released");
} }
src->pause(false);
_rtsp_reader = src->getRing()->attach(getPoller()); _rtsp_reader = src->getRing()->attach(getPoller());
weak_ptr<RtspPusher> weak_self = dynamic_pointer_cast<RtspPusher>(shared_from_this()); weak_ptr<RtspPusher> weak_self = dynamic_pointer_cast<RtspPusher>(shared_from_this());
_rtsp_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { _rtsp_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
......
...@@ -105,7 +105,7 @@ void RtspSession::onManager() { ...@@ -105,7 +105,7 @@ void RtspSession::onManager() {
return; return;
} }
if (!_push_src && _rtp_type == Rtsp::RTP_UDP && _enable_send_rtp && _alive_ticker.elapsedTime() > keep_alive_sec * 4000) { if (!_push_src && _rtp_type == Rtsp::RTP_UDP && _alive_ticker.elapsedTime() > keep_alive_sec * 4000) {
//rtp over udp播放器超时 //rtp over udp播放器超时
shutdown(SockException(Err_timeout, "rtp over udp player timeout")); shutdown(SockException(Err_timeout, "rtp over udp player timeout"));
} }
...@@ -774,12 +774,12 @@ void RtspSession::handleReq_Play(const Parser &parser) { ...@@ -774,12 +774,12 @@ void RtspSession::handleReq_Play(const Parser &parser) {
} }
bool useGOP = true; bool useGOP = true;
float iStartTime = 0;
auto &strRange = parser["Range"];
auto &strScale = parser["Scale"]; auto &strScale = parser["Scale"];
auto &strRange = parser["Range"];
StrCaseMap res_header;
if (!strScale.empty()) { if (!strScale.empty()) {
//这是设置播放速度 //这是设置播放速度
res_header.emplace("Scale", strScale);
auto speed = atof(strScale.data()); auto speed = atof(strScale.data());
play_src->speed(speed); play_src->speed(speed);
InfoP(this) << "rtsp set play speed:" << speed; InfoP(this) << "rtsp set play speed:" << speed;
...@@ -787,12 +787,12 @@ void RtspSession::handleReq_Play(const Parser &parser) { ...@@ -787,12 +787,12 @@ void RtspSession::handleReq_Play(const Parser &parser) {
if (!strRange.empty()) { if (!strRange.empty()) {
//这是seek操作 //这是seek操作
_enable_send_rtp = false; res_header.emplace("Range", strRange);
auto strStart = FindField(strRange.data(), "npt=", "-"); auto strStart = FindField(strRange.data(), "npt=", "-");
if (strStart == "now") { if (strStart == "now") {
strStart = "0"; strStart = "0";
} }
iStartTime = 1000 * (float) atof(strStart.data()); auto iStartTime = 1000 * (float) atof(strStart.data());
useGOP = !play_src->seekTo((uint32_t) iStartTime); useGOP = !play_src->seekTo((uint32_t) iStartTime);
InfoP(this) << "rtsp seekTo(ms):" << iStartTime; InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
} }
...@@ -814,13 +814,13 @@ void RtspSession::handleReq_Play(const Parser &parser) { ...@@ -814,13 +814,13 @@ void RtspSession::handleReq_Play(const Parser &parser) {
} }
rtp_info.pop_back(); rtp_info.pop_back();
sendRtspResponse("200 OK",
{"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useGOP ? play_src->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000), res_header.emplace("RTP-Info", rtp_info);
"RTP-Info",rtp_info //已存在Range时不覆盖
}); res_header.emplace("Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << play_src->getTimeStamp(TrackInvalid) / 1000.0);
sendRtspResponse("200 OK", res_header);
//在回复rtsp信令后再恢复播放 //在回复rtsp信令后再恢复播放
_enable_send_rtp = true;
play_src->pause(false); play_src->pause(false);
setSocketFlags(); setSocketFlags();
...@@ -840,9 +840,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { ...@@ -840,9 +840,7 @@ void RtspSession::handleReq_Play(const Parser &parser) {
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
if (strongSelf->_enable_send_rtp) { strongSelf->sendRtpPacket(pack);
strongSelf->sendRtpPacket(pack);
}
}); });
} }
} }
...@@ -854,8 +852,6 @@ void RtspSession::handleReq_Pause(const Parser &parser) { ...@@ -854,8 +852,6 @@ void RtspSession::handleReq_Pause(const Parser &parser) {
} }
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
_enable_send_rtp = false;
auto play_src = _play_src.lock(); auto play_src = _play_src.lock();
if (play_src) { if (play_src) {
play_src->pause(true); play_src->pause(true);
......
...@@ -165,8 +165,6 @@ private: ...@@ -165,8 +165,6 @@ private:
private: private:
//是否已经触发on_play事件 //是否已经触发on_play事件
bool _emit_on_play = false; bool _emit_on_play = false;
//是否开始发送rtp
bool _enable_send_rtp;
//推流或拉流客户端采用的rtp传输方式 //推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid; Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid;
//收到的seq,回复时一致 //收到的seq,回复时一致
......
...@@ -499,6 +499,7 @@ void WebRtcTransportImp::onStartWebRTC() { ...@@ -499,6 +499,7 @@ void WebRtcTransportImp::onStartWebRTC() {
} }
} }
_play_src->pause(false);
_reader = _play_src->getRing()->attach(getPoller(), true); _reader = _play_src->getRing()->attach(getPoller(), true);
weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this()); weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论