Commit 4792f621 by Alex

新增Ts拉流

parent a4411680
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
namespace mediakit { namespace mediakit {
void HttpClient::sendRequest(const string &url, float timeout_sec) { void HttpClient::sendRequest(const string &url, float timeout_sec, float recv_timeout_sec) {
_recv_timeout_second = recv_timeout_sec;
clearResponse(); clearResponse();
_url = url; _url = url;
auto protocol = FindField(url.data(), NULL, "://"); auto protocol = FindField(url.data(), NULL, "://");
...@@ -188,7 +189,7 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) { ...@@ -188,7 +189,7 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) {
} }
if (onRedirectUrl(new_url, _parser.Url() == "302")) { if (onRedirectUrl(new_url, _parser.Url() == "302")) {
setMethod("GET"); setMethod("GET");
HttpClient::sendRequest(new_url, _timeout_second); HttpClient::sendRequest(new_url, _timeout_second, _recv_timeout_second);
return 0; return 0;
} }
} }
...@@ -278,7 +279,7 @@ void HttpClient::onFlush() { ...@@ -278,7 +279,7 @@ void HttpClient::onFlush() {
} }
void HttpClient::onManager() { void HttpClient::onManager() {
if (_recv_timeout_ticker.elapsedTime() > 3 * 1000 && _total_body_size < 0 && !_chunked_splitter) { if (_recv_timeout_ticker.elapsedTime() > _recv_timeout_second * 1000 && _total_body_size < 0 && !_chunked_splitter) {
//如果Content-Length未指定 但接收数据超时 //如果Content-Length未指定 但接收数据超时
//则认为本次http请求完成 //则认为本次http请求完成
onResponseCompleted_l(); onResponseCompleted_l();
......
...@@ -63,7 +63,7 @@ public: ...@@ -63,7 +63,7 @@ public:
* @param url 请求url * @param url 请求url
* @param timeout_sec 超时时间 * @param timeout_sec 超时时间
*/ */
virtual void sendRequest(const string &url, float timeout_sec); virtual void sendRequest(const string &url, float timeout_sec, float recv_timeout_sec = 3);
/** /**
* 重置对象 * 重置对象
......
...@@ -49,7 +49,14 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_si ...@@ -49,7 +49,14 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_si
} }
if (_split_ts) { if (_split_ts) {
_segment.input(buf, size); try {
_segment.input(buf, size);
}catch (std::exception &ex) {
WarnL << ex.what();
//ts解析失败,清空缓存数据
_segment.reset();
throw;
}
} else { } else {
onPacket(buf, size); onPacket(buf, size);
} }
......
//
// Created by alex on 2021/4/6.
//
#include "TsPlayer.h"
namespace mediakit {
TsPlayer::TsPlayer(const EventPoller::Ptr &poller):HttpTSPlayer(poller, true) {}
TsPlayer::~TsPlayer() {}
void TsPlayer::play(const string &strUrl) {
_ts_url.append(strUrl);
playTs();
}
void TsPlayer::teardown_l(const SockException &ex) {
HttpClient::clear();
shutdown(ex);
}
void TsPlayer::teardown() {
teardown_l(SockException(Err_shutdown, "teardown"));
}
void TsPlayer::playTs() {
if (waitResponse()) {
//播放器目前还存活,正在下载中
return;
}
WarnL << "fetch:" << _ts_url;
_request_complete = false;
weak_ptr<TsPlayer> weak_self = dynamic_pointer_cast<TsPlayer>(shared_from_this());
setMethod("GET");
sendRequest(_ts_url, 3600 * 2, 60);
}
void TsPlayer::onResponseCompleted() {
//接收完毕
teardown_l(SockException(Err_success, StrPrinter << _ts_url << ": play completed"));
}
void TsPlayer::onDisconnect(const SockException &ex) {
WarnL << _ts_url << " :" << ex.getErrCode() << " " << ex.what();
if (_first) {
//第一次失败,则播放失败
_first = false;
onPlayResult(ex);
return;
}
if (ex.getErrCode() == Err_shutdown) {
onShutdown(ex);
}else{
onResponseCompleted();
onShutdown(ex);
}
}
ssize_t TsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) {
ssize_t ret = HttpTSPlayer::onResponseHeader(status, header);
if (_first) {
_first = false;
onPlayResult(SockException(Err_success, "play success"));
}
return ret;
}
}//namespace mediakit
\ No newline at end of file
//
// Created by alex on 2021/4/6.
//
/*
* Copyright (c) 2020 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef HTTP_TSPLAYER_H
#define HTTP_TSPLAYER_H
#include <unordered_set>
#include "Util/util.h"
#include "Poller/Timer.h"
#include "Http/HttpDownloader.h"
#include "Player/MediaPlayer.h"
#include "Rtp/Decoder.h"
#include "Rtp/TSDecoder.h"
#include "HttpTSPlayer.h"
using namespace toolkit;
namespace mediakit {
class TsPlayer : public HttpTSPlayer , public PlayerBase {
public:
TsPlayer(const EventPoller::Ptr &poller);
~TsPlayer() override;
/**
* 开始播放
* @param strUrl
*/
void play(const string &strUrl) override;
/**
* 停止播放
*/
void teardown() override;
private:
void playTs();
void teardown_l(const SockException &ex);
protected:
virtual void onResponseCompleted() override;
virtual void onDisconnect(const SockException &ex) override;
virtual ssize_t onResponseHeader(const string &status, const HttpHeader &header) override;
private:
bool _first = true;
string _ts_url;
};
}//namespace mediakit
#endif //HTTP_TSPLAYER_H
//
// Created by alex on 2021/7/5.
//
#ifndef HTTP_TSPLAYERIMP_H
#define HTTP_TSPLAYERIMP_H
#include <unordered_set>
#include "Util/util.h"
#include "Poller/Timer.h"
#include "Http/HttpDownloader.h"
#include "Player/MediaPlayer.h"
#include "Rtp/Decoder.h"
#include "Rtp/TSDecoder.h"
#include "TsPlayer.h"
using namespace toolkit;
namespace mediakit {
class TsDemuxer : public MediaSinkInterface, public TrackSource, public std::enable_shared_from_this<TsDemuxer> {
public:
TsDemuxer() = default;
~TsDemuxer() override { _timer = nullptr; }
void start(const EventPoller::Ptr &poller, TrackListener *listener);
bool inputFrame(const Frame::Ptr &frame) override;
bool addTrack(const Track::Ptr &track) override {
return _delegate.addTrack(track);
}
void addTrackCompleted() override {
_delegate.addTrackCompleted();
}
void resetTracks() override {
((MediaSink &) _delegate).resetTracks();
}
vector<Track::Ptr> getTracks(bool ready = true) const override {
return _delegate.getTracks(ready);
}
private:
void onTick();
int64_t getBufferMS();
int64_t getPlayPosition();
void setPlayPosition(int64_t pos);
private:
int64_t _ticker_offset = 0;
Ticker _ticker;
Stamp _stamp[2];
Timer::Ptr _timer;
MediaSinkDelegate _delegate;
multimap<int64_t, Frame::Ptr> _frame_cache;
};
// class TsPlayerImp : public PlayerImp<TsPlayer, PlayerBase>, private TrackListener {
// public:
// typedef std::shared_ptr<TsPlayerImp> Ptr;
// TsPlayerImp(const EventPoller::Ptr &poller = nullptr);
// ~TsPlayerImp() override = default;
//
// private:
// //// TsPlayer override////
// void onPacket(const char *data, size_t len) override{
//
// if (!_decoder) {
// _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get());
// }
//
// if (_decoder && _demuxer) {
// _decoder->input((uint8_t *) data, len);
// }
//
// }
// //// PlayerBase override////
// void onPlayResult(const SockException &ex) override{
// WarnL << ex.getErrCode() << " " << ex.what();
// if (ex) {
// PlayerImp<TsPlayer, PlayerBase>::onPlayResult(ex);
// } else {
// auto demuxer = std::make_shared<HlsDemuxer>();
// demuxer->start(getPoller(), this);
// _demuxer = std::move(demuxer);
// }
// }
//
// bool inputFrame(const Frame::Ptr &frame) override{
// //计算相对时间戳
// int64_t dts, pts;
//// _stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts);
//// WarnL << getTrackString(frame->getTrackType()) << "当前dts/pts[" << frame->dts() << "/" << frame->pts() << "]["<<dts<<"/"<<pts<< "]最后dts[" << last_dts << "]";
// // 加速trackReady速度, 避免网络抖动时由于超时而放弃video track, 同时加速首屏显示速度
// if(!MediaSink::isTrackReady()){
// MediaSink::inputFrame(Frame::getCacheAbleFrame(frame));
// return true;
// }
// // 根据时间戳缓存frame
//// _frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame));
// _frame_cache.emplace(frame->dts(), Frame::getCacheAbleFrame(frame));
//// WarnL << "缓存时间[" << getBufferMS() << "]";
// if (getBufferMS() > 180 * 1000) {
// // 缓存限制最大180秒, 超过180秒强制消费60秒(减少延时或内存占用)
// // 拉取http-ts流时, 部分的上游输出并不是平滑的, 也就是可能一秒输出几mb数据, 然后会间隔几秒甚至十来秒再次输出,
// // 这种情况特别会出现在跨国家地域的拉流情况中, 一部分是上游源输出的问题, 一部分是由于国际出口网络或者isp导致的失速问题.
// // 对于输出http-ts/rtmp/rtsp等流媒体影响并不大, 但是如果输出hls, 那么就会导致m3u8文件中更新segment并不是平滑的,
// // 而是会出现跳段的现象.
// // 所以如果主要是输出hls, 那么最好是不强制消费, 只通过tick周期性平滑消费帧.因为输出是hls, 等于一直在进行消费.
// // 因此不会存在内存占用过大的问题.
// // 所以比较好的解决方法应该是判断是否存在hls输出, 如果存在hls输出则跳过这个限制缓存时间的逻辑.否则限制缓存时间为30秒,
// // 超过缓存时间则强制消费15秒. 但是目前并没有找到一个低成本的判断是否存在hls输出的方法, 所以只有增加缓存时间.
// while (getBufferMS() > 60 * 1000) {
// MediaSink::inputFrame(_frame_cache.begin()->second);
// _frame_cache.erase(_frame_cache.begin());
// }
// //接着播放缓存中最早的帧
// setPlayPosition(_frame_cache.begin()->first);
// }
// return true;
// }
//
// void onShutdown(const SockException &ex) override {
// PlayerImp<TsPlayer, PlayerBase>::onShutdown(ex);
// _demuxer = nullptr;
// }
//
// void onTick(){
// auto it = _frame_cache.begin();
// while (it != _frame_cache.end()) {
// if (it->first > getPlayPosition()) {
// //这些帧还未到时间播放
// break;
// }
// if (getBufferMS() < 3 * 1000) {
// //缓存小于3秒,那么降低定时器消费速度(让剩余的数据在3秒后消费完毕)
// //目的是为了防止定时器长时间干等后,数据瞬间消费完毕
// setPlayPosition(_frame_cache.begin()->first);
// }
// //消费掉已经到期的帧
// MediaSink::inputFrame(it->second);
// it = _frame_cache.erase(it);
// }
// }
// //// TrackListener override////
// bool addTrack(const Track::Ptr &track) override { return true; };
//
// void addTrackCompleted() override{
// PlayerImp<TsPlayer, PlayerBase>::onPlayResult(SockException(Err_success, "play hls success"));
// };
//
// private:
// DecoderImp::Ptr _decoder;
// MediaSinkInterface::Ptr _demuxer;
// };
//
class TsPlayerImp : public PlayerImp<TsPlayer, PlayerBase>, private TrackListener {
public:
typedef std::shared_ptr<TsPlayerImp> Ptr;
TsPlayerImp(const EventPoller::Ptr &poller = nullptr);
~TsPlayerImp() override = default;
private:
//// HlsPlayer override////
void onPacket(const char *data, size_t len) override;
private:
//// PlayerBase override////
void onPlayResult(const SockException &ex) override;
vector<Track::Ptr> getTracks(bool ready = true) const override;
void onShutdown(const SockException &ex) override;
private:
//// TrackListener override////
bool addTrack(const Track::Ptr &track) override { return true; };
void addTrackCompleted() override;
private:
DecoderImp::Ptr _decoder;
MediaSinkInterface::Ptr _demuxer;
};
}//namespace mediakit
#endif //HTTP_TSPLAYERIMP_H
#include "TsPlayerImp.h"
namespace mediakit {
void TsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) {
_frame_cache.clear();
_stamp[TrackAudio].setRelativeStamp(0);
_stamp[TrackVideo].setRelativeStamp(0);
_stamp[TrackAudio].syncTo(_stamp[TrackVideo]);
setPlayPosition(0);
_delegate.setTrackListener(listener);
//每50毫秒执行一次
weak_ptr<TsDemuxer> weak_self = shared_from_this();
_timer = std::make_shared<Timer>(0.05f, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->onTick();
return true;
}, poller);
}
bool TsDemuxer::inputFrame(const Frame::Ptr &frame) {
//为了避免track准备时间过长, 因此在没准备好之前, 直接消费掉所有的帧
if (!_delegate.isAllTrackReady()) {
_delegate.inputFrame(frame);
return true;
}
//计算相对时间戳
int64_t dts, pts;
//根据时间戳缓存frame
_stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts);
_frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame));
//根据时间戳缓存frame
// _frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame));
if (getBufferMS() > 30 * 1000) {
//缓存超过30秒,强制消费至15秒(减少延时或内存占用)
while (getBufferMS() > 15 * 1000) {
_delegate.inputFrame(_frame_cache.begin()->second);
_frame_cache.erase(_frame_cache.begin());
}
//接着播放缓存中最早的帧
setPlayPosition(_frame_cache.begin()->first);
}
return true;
}
int64_t TsDemuxer::getPlayPosition() {
return _ticker.elapsedTime() + _ticker_offset;
}
int64_t TsDemuxer::getBufferMS() {
if (_frame_cache.empty()) {
return 0;
}
return _frame_cache.rbegin()->first - _frame_cache.begin()->first;
}
void TsDemuxer::setPlayPosition(int64_t pos) {
_ticker.resetTime();
_ticker_offset = pos;
}
void TsDemuxer::onTick() {
auto it = _frame_cache.begin();
while (it != _frame_cache.end()) {
if (it->first > getPlayPosition()) {
//这些帧还未到时间播放
break;
}
if (getBufferMS() < 3 * 1000) {
//缓存小于3秒,那么降低定时器消费速度(让剩余的数据在3秒后消费完毕)
//目的是为了防止定时器长时间干等后,数据瞬间消费完毕
setPlayPosition(_frame_cache.begin()->first);
}
//消费掉已经到期的帧
_delegate.inputFrame(it->second);
it = _frame_cache.erase(it);
}
}
//////////////////////////////////////////////////////////////////////////
TsPlayerImp::TsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp<TsPlayer, PlayerBase>(poller) {}
void TsPlayerImp::onPacket(const char *data, size_t len) {
if (!_decoder) {
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get());
}
if (_decoder && _demuxer) {
_decoder->input((uint8_t *) data, len);
}
}
void TsPlayerImp::addTrackCompleted() {
PlayerImp<TsPlayer, PlayerBase>::onPlayResult(SockException(Err_success, "play hls success"));
}
void TsPlayerImp::onPlayResult(const SockException &ex) {
WarnL << ex.getErrCode() << " " << ex.what();
if (ex) {
PlayerImp<TsPlayer, PlayerBase>::onPlayResult(ex);
} else {
auto demuxer = std::make_shared<TsDemuxer>();
demuxer->start(getPoller(), this);
_demuxer = std::move(demuxer);
}
}
void TsPlayerImp::onShutdown(const SockException &ex) {
PlayerImp<TsPlayer, PlayerBase>::onShutdown(ex);
_demuxer = nullptr;
}
vector<Track::Ptr> TsPlayerImp::getTracks(bool ready) const {
return static_pointer_cast<TsDemuxer>(_demuxer)->getTracks(ready);
}
}//namespace mediakit
\ No newline at end of file
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
#include "Rtsp/RtspPlayerImp.h" #include "Rtsp/RtspPlayerImp.h"
#include "Rtmp/RtmpPlayerImp.h" #include "Rtmp/RtmpPlayerImp.h"
#include "Http/HlsPlayer.h" #include "Http/HlsPlayer.h"
#include "Http/TsPlayerImp.h"
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
...@@ -48,9 +48,14 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller, const s ...@@ -48,9 +48,14 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller, const s
if (strcasecmp("rtmp", prefix.data()) == 0) { if (strcasecmp("rtmp", prefix.data()) == 0) {
return PlayerBase::Ptr(new RtmpPlayerImp(poller), releasePlayer); return PlayerBase::Ptr(new RtmpPlayerImp(poller), releasePlayer);
} }
if ((strcasecmp("http",prefix.data()) == 0 || strcasecmp("https",prefix.data()) == 0)) {
if ((strcasecmp("http", prefix.data()) == 0 || strcasecmp("https", prefix.data()) == 0) && end_with(url, ".m3u8")) { if (end_with(url, ".m3u8") || end_with(url_in, ".m3u8")) {
return PlayerBase::Ptr(new HlsPlayerImp(poller), releasePlayer); return PlayerBase::Ptr(new HlsPlayerImp(poller),releasePlayer);
}
else if (end_with(url, ".ts") || end_with(url_in, ".ts")) {
return PlayerBase::Ptr(new TsPlayerImp(poller),releasePlayer);
}
return PlayerBase::Ptr(new TsPlayerImp(poller),releasePlayer);
} }
return PlayerBase::Ptr(new RtspPlayerImp(poller), releasePlayer); return PlayerBase::Ptr(new RtspPlayerImp(poller), releasePlayer);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论