Commit 09e48479 by xiongziliang

实现ps-rtp的发送 #366

parent ac705972
...@@ -1048,6 +1048,108 @@ ...@@ -1048,6 +1048,108 @@
} }
}, },
"response": [] "response": []
},
{
"name": "开始发送rtp(startSendRtp)",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/startSendRtp?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs&ssrc=1&dst_url=127.0.0.1&dst_port=10000&is_udp=0",
"host": [
"{{ZLMediaKit_URL}}"
],
"path": [
"index",
"api",
"startSendRtp"
],
"query": [
{
"key": "secret",
"value": "{{ZLMediaKit_secret}}",
"description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数"
},
{
"key": "vhost",
"value": "{{defaultVhost}}",
"description": "虚拟主机,例如__defaultVhost__"
},
{
"key": "app",
"value": "live",
"description": "应用名,例如 live"
},
{
"key": "stream",
"value": "obs",
"description": "流id,例如 obs"
},
{
"key": "ssrc",
"value": "1",
"description": "rtp的ssrc"
},
{
"key": "dst_url",
"value": "127.0.0.1",
"description": "目标ip或域名"
},
{
"key": "dst_port",
"value": "10000",
"description": "目标端口"
},
{
"key": "is_udp",
"value": "0",
"description": "是否为udp模式,否则为tcp模式"
}
]
}
},
"response": []
},
{
"name": "停止 发送rtp(stopSendRtp)",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/stopSendRtp?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=obs",
"host": [
"{{ZLMediaKit_URL}}"
],
"path": [
"index",
"api",
"stopSendRtp"
],
"query": [
{
"key": "secret",
"value": "{{ZLMediaKit_secret}}",
"description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数"
},
{
"key": "vhost",
"value": "{{defaultVhost}}",
"description": "虚拟主机,例如__defaultVhost__"
},
{
"key": "app",
"value": "live",
"description": "应用名,例如 live"
},
{
"key": "stream",
"value": "obs",
"description": "流id,例如 obs"
}
]
}
},
"response": []
} }
], ],
"event": [ "event": [
...@@ -1074,17 +1176,17 @@ ...@@ -1074,17 +1176,17 @@
], ],
"variable": [ "variable": [
{ {
"id": "0e272976-965b-4f25-8b9e-5916c59234d7", "id": "ce426571-eb1e-4067-8901-01978c982fed",
"key": "ZLMediaKit_URL", "key": "ZLMediaKit_URL",
"value": "zlmediakit.com:8880" "value": "zlmediakit.com:8880"
}, },
{ {
"id": "321374c3-3357-4405-915e-9cb524d95fbc", "id": "2d3dfd4a-a39c-47d8-a3e9-37d80352ea5f",
"key": "ZLMediaKit_secret", "key": "ZLMediaKit_secret",
"value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc" "value": "035c73f7-bb6b-4889-a715-d9eb2d1925cc"
}, },
{ {
"id": "468ce1f6-ec79-44d2-819e-5cb9f42cd396", "id": "0aacc473-3a2e-4ef9-b415-e86ce71e0c42",
"key": "defaultVhost", "key": "defaultVhost",
"value": "__defaultVhost__" "value": "__defaultVhost__"
} }
......
...@@ -810,6 +810,45 @@ void installWebApi() { ...@@ -810,6 +810,45 @@ void installWebApi() {
} }
}); });
static auto getMediaSource = [](const string &vhost, const string &app, const string &stream_id){
auto src = MediaSource::find(RTMP_SCHEMA, vhost, app, stream_id);
if(src){
return src;
}
return MediaSource::find(RTSP_SCHEMA, vhost, app, stream_id);
};
api_regist2("/index/api/startSendRtp",[](API_ARGS2){
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream", "ssrc", "dst_url", "dst_port", "is_udp");
auto src = getMediaSource(allArgs["vhost"], allArgs["app"], allArgs["stream"]);
if (!src) {
throw ApiRetException("该媒体流不存在", API::OtherFailed);
}
src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], [val, headerOut, invoker](const SockException &ex){
if (ex) {
const_cast<Value &>(val)["code"] = API::OtherFailed;
const_cast<Value &>(val)["msg"] = ex.what();
}
invoker("200 OK", headerOut, val.toStyledString());
});
});
api_regist1("/index/api/stopSendRtp",[](API_ARGS1){
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream");
auto src = getMediaSource(allArgs["vhost"], allArgs["app"], allArgs["stream"]);
if (!src) {
throw ApiRetException("该媒体流不存在", API::OtherFailed);
}
val["result"] = src->stopSendRtp();
});
#endif//ENABLE_RTPPROXY #endif//ENABLE_RTPPROXY
// 开始录制hls或MP4 // 开始录制hls或MP4
......
...@@ -120,6 +120,23 @@ bool MediaSource::isRecording(Recorder::type type){ ...@@ -120,6 +120,23 @@ bool MediaSource::isRecording(Recorder::type type){
return listener->isRecording(*this, type); return listener->isRecording(*this, type);
} }
void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
auto listener = _listener.lock();
if (!listener) {
cb(SockException(Err_other, "尚未设置事件监听器"));
return;
}
return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, cb);
}
bool MediaSource::stopSendRtp() {
auto listener = _listener.lock();
if (!listener) {
return false;
}
return listener->stopSendRtp(*this);
}
void MediaSource::for_each_media(const function<void(const MediaSource::Ptr &src)> &cb) { void MediaSource::for_each_media(const function<void(const MediaSource::Ptr &src)> &cb) {
decltype(s_media_source_map) copy; decltype(s_media_source_map) copy;
{ {
...@@ -553,6 +570,23 @@ vector<Track::Ptr> MediaSourceEventInterceptor::getTracks(MediaSource &sender, b ...@@ -553,6 +570,23 @@ vector<Track::Ptr> MediaSourceEventInterceptor::getTracks(MediaSource &sender, b
return listener->getTracks(sender, trackReady); return listener->getTracks(sender, trackReady);
} }
void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
auto listener = _listener.lock();
if (listener) {
listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb);
} else {
MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb);
}
}
bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender){
auto listener = _listener.lock();
if (listener) {
return listener->stopSendRtp(sender);
}
return false;
}
/////////////////////////////////////FlushPolicy////////////////////////////////////// /////////////////////////////////////FlushPolicy//////////////////////////////////////
static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size) { static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
......
...@@ -61,6 +61,10 @@ public: ...@@ -61,6 +61,10 @@ public:
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }; virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
// 获取所有track相关信息 // 获取所有track相关信息
virtual vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const { return vector<Track::Ptr>(); }; virtual vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const { return vector<Track::Ptr>(); };
// 开始发送ps-rtp
virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) { cb(SockException(Err_other, "not implemented"));};
// 停止发送ps-rtp
virtual bool stopSendRtp(MediaSource &sender) {return false; }
private: private:
Timer::Ptr _async_close_timer; Timer::Ptr _async_close_timer;
...@@ -80,6 +84,8 @@ public: ...@@ -80,6 +84,8 @@ public:
bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override;
bool isRecording(MediaSource &sender, Recorder::type type) override; bool isRecording(MediaSource &sender, Recorder::type type) override;
vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override; vector<Track::Ptr> getTracks(MediaSource &sender, bool trackReady = true) const override;
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) override;
bool stopSendRtp(MediaSource &sender) override;
protected: protected:
std::weak_ptr<MediaSourceEvent> _listener; std::weak_ptr<MediaSourceEvent> _listener;
...@@ -160,6 +166,10 @@ public: ...@@ -160,6 +166,10 @@ public:
bool setupRecord(Recorder::type type, bool start, const string &custom_path); bool setupRecord(Recorder::type type, bool start, const string &custom_path);
// 获取录制状态 // 获取录制状态
bool isRecording(Recorder::type type); bool isRecording(Recorder::type type);
// 开始发送ps-rtp
void startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb);
// 停止发送ps-rtp
bool stopSendRtp();
////////////////static方法,查找或生成MediaSource//////////////// ////////////////static方法,查找或生成MediaSource////////////////
......
...@@ -255,6 +255,37 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type ...@@ -255,6 +255,37 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type
return _muxer->isRecording(sender,type); return _muxer->isRecording(sender,type);
} }
void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb){
#if defined(ENABLE_RTPPROXY)
auto ps_rtp_sender = std::make_shared<PSRtpSender>(ssrc);
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
ps_rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, ps_rtp_sender, cb](const SockException &ex) {
cb(ex);
auto strong_self = weak_self.lock();
if (!strong_self || ex) {
return;
}
for (auto &track : strong_self->_muxer->getTracks(false)) {
ps_rtp_sender->addTrack(track);
}
ps_rtp_sender->addTrackCompleted();
strong_self->_ps_rtp_sender = ps_rtp_sender;
});
#else
cb(SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏"));
#endif//ENABLE_RTPPROXY
}
bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){
#if defined(ENABLE_RTPPROXY)
if (_ps_rtp_sender) {
_ps_rtp_sender = nullptr;
return true;
}
#endif//ENABLE_RTPPROXY
return false;
}
void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) { void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) {
_muxer->addTrack(track); _muxer->addTrack(track);
} }
...@@ -327,21 +358,26 @@ private: ...@@ -327,21 +358,26 @@ private:
Frame::Ptr _frame; Frame::Ptr _frame;
}; };
void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame) { void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) {
GET_CONFIG(bool, modify_stamp, General::kModifyStamp); GET_CONFIG(bool, modify_stamp, General::kModifyStamp);
if (!modify_stamp) { auto frame = frame_in;
//未开启时间戳覆盖 if (modify_stamp) {
_muxer->inputFrame(frame);
} else {
//开启了时间戳覆盖 //开启了时间戳覆盖
FrameModifyStamp::Ptr new_frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]); frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]);
//输入时间戳覆盖后的帧
_muxer->inputFrame(new_frame);
} }
_muxer->inputFrame(frame);
#if defined(ENABLE_RTPPROXY)
auto ps_rtp_sender = _ps_rtp_sender;
if (ps_rtp_sender) {
ps_rtp_sender->inputFrame(frame);
}
#endif //ENABLE_RTPPROXY
} }
bool MultiMediaSourceMuxer::isEnabled(){ bool MultiMediaSourceMuxer::isEnabled(){
return _muxer->isEnabled(); return _muxer->isEnabled() || _ps_rtp_sender;
} }
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#include "Record/Recorder.h" #include "Record/Recorder.h"
#include "Record/HlsMediaSource.h" #include "Record/HlsMediaSource.h"
#include "Record/HlsRecorder.h" #include "Record/HlsRecorder.h"
#include "Rtp/PSRtpSender.h"
namespace mediakit{ namespace mediakit{
class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this<MultiMuxerPrivate>{ class MultiMuxerPrivate : public MediaSink, public std::enable_shared_from_this<MultiMuxerPrivate>{
...@@ -126,6 +128,22 @@ public: ...@@ -126,6 +128,22 @@ public:
*/ */
bool isRecording(MediaSource &sender, Recorder::type type) override; bool isRecording(MediaSource &sender, Recorder::type type) override;
/**
* 开始发送ps-rtp流
* @param dst_url 目标ip或域名
* @param dst_port 目标端口
* @param ssrc rtp的ssrc
* @param is_udp 是否为udp
* @param cb 启动成功或失败回调
*/
void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function<void(const SockException &ex)> &cb) override;
/**
* 停止ps-rtp发送
* @return 是否成功
*/
bool stopSendRtp(MediaSource &sender) override;
/////////////////////////////////MediaSinkInterface override///////////////////////////////// /////////////////////////////////MediaSinkInterface override/////////////////////////////////
/** /**
...@@ -133,7 +151,7 @@ public: ...@@ -133,7 +151,7 @@ public:
* 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系 * 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系
* @param track 添加音频或视频轨道 * @param track 添加音频或视频轨道
*/ */
void addTrack(const Track::Ptr & track) override; void addTrack(const Track::Ptr &track) override;
/** /**
* 添加track完毕 * 添加track完毕
...@@ -162,6 +180,9 @@ private: ...@@ -162,6 +180,9 @@ private:
Stamp _stamp[2]; Stamp _stamp[2];
MultiMuxerPrivate::Ptr _muxer; MultiMuxerPrivate::Ptr _muxer;
std::weak_ptr<MultiMuxerPrivate::Listener> _track_listener; std::weak_ptr<MultiMuxerPrivate::Listener> _track_listener;
#if defined(ENABLE_RTPPROXY)
PSRtpSender::Ptr _ps_rtp_sender;
#endif //ENABLE_RTPPROXY
}; };
}//namespace mediakit }//namespace mediakit
......
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "PSEncoder.h"
#include "Extension/H264.h"
namespace mediakit{
PSEncoder::PSEncoder() {
_buffer = std::make_shared<BufferRaw>();
init();
}
PSEncoder::~PSEncoder() {
}
void PSEncoder::init() {
static struct ps_muxer_func_t func = {
/*alloc*/
[](void *param, size_t bytes) {
PSEncoder *thiz = (PSEncoder *) param;
thiz->_buffer->setCapacity(bytes + 1);
return (void *) thiz->_buffer->data();
},
/*free*/
[](void *param, void *packet) {
//什么也不做
},
/*wtite*/
[](void *param, int stream, void *packet, size_t bytes) {
PSEncoder *thiz = (PSEncoder *) param;
thiz->onPS(thiz->_timestamp, packet, bytes);
}
};
_muxer.reset(ps_muxer_create(&func, this), [](struct ps_muxer_t *ptr) {
ps_muxer_destroy(ptr);
});
}
void PSEncoder::addTrack(const Track::Ptr &track) {
switch (track->getCodecId()) {
case CodecH264: {
_codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_VIDEO_H264, nullptr, 0);
break;
}
case CodecH265: {
_codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_VIDEO_H265, nullptr, 0);
break;
}
case CodecAAC: {
_codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_AAC, nullptr, 0);
break;
}
case CodecG711A: {
_codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_G711A, nullptr, 0);
break;
}
case CodecG711U: {
_codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_G711U, nullptr, 0);
break;
}
case CodecOpus: {
_codec_to_trackid[track->getCodecId()].track_id = ps_muxer_add_stream(_muxer.get(), STREAM_AUDIO_OPUS, nullptr, 0);
break;
}
default: WarnL << "mpeg-ps 不支持该编码格式,已忽略:" << track->getCodecName(); break;
}
//尝试音视频同步
stampSync();
}
void PSEncoder::stampSync(){
if(_codec_to_trackid.size() < 2){
return;
}
Stamp *audio = nullptr, *video = nullptr;
for(auto &pr : _codec_to_trackid){
switch (getTrackType((CodecId) pr.first)){
case TrackAudio : audio = &pr.second.stamp; break;
case TrackVideo : video = &pr.second.stamp; break;
default : break;
}
}
if(audio && video){
//音频时间戳同步于视频,因为音频时间戳被修改后不影响播放
audio->syncTo(*video);
}
}
void PSEncoder::resetTracks() {
init();
}
void PSEncoder::inputFrame(const Frame::Ptr &frame) {
auto it = _codec_to_trackid.find(frame->getCodecId());
if (it == _codec_to_trackid.end()) {
return;
}
auto &track_info = it->second;
int64_t dts_out, pts_out;
switch (frame->getCodecId()) {
case CodecH264: {
int type = H264_TYPE(*((uint8_t *) frame->data() + frame->prefixSize()));
if (type == H264Frame::NAL_SEI) {
break;
}
}
case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理,
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
if (_frameCached.size() != 1) {
string merged;
_frameCached.for_each([&](const Frame::Ptr &frame) {
if (frame->prefixSize()) {
merged.append(frame->data(), frame->size());
} else {
merged.append("\x00\x00\x00\x01", 4);
merged.append(frame->data(), frame->size());
}
});
merged_frame = std::make_shared<BufferString>(std::move(merged));
}
track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out);
_timestamp = dts_out;
ps_muxer_input(_muxer.get(), track_info.track_id, back->keyFrame() ? 0x0001 : 0, pts_out * 90LL,
dts_out * 90LL, merged_frame->data(), merged_frame->size());
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
break;
case CodecAAC: {
if (frame->prefixSize() == 0) {
WarnL << "必须提供adts头才能mpeg-ps打包";
break;
}
}
default: {
track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
_timestamp = dts_out;
ps_muxer_input(_muxer.get(), track_info.track_id, frame->keyFrame() ? 0x0001 : 0, pts_out * 90LL,
dts_out * 90LL, frame->data(), frame->size());
}
break;
}
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_PSENCODER_H
#define ZLMEDIAKIT_PSENCODER_H
#if defined(ENABLE_RTPPROXY)
#include "mpeg-ps.h"
#include "Common/MediaSink.h"
#include "Common/Stamp.h"
namespace mediakit{
//该类实现mpeg-ps容器格式的打包
class PSEncoder : public MediaSinkInterface {
public:
PSEncoder();
~PSEncoder() override;
/**
* 添加音视频轨道
*/
void addTrack(const Track::Ptr &track) override;
/**
* 重置音视频轨道
*/
void resetTracks() override;
/**
* 输入帧数据
*/
void inputFrame(const Frame::Ptr &frame) override;
protected:
/**
* 输出mpeg-ps的回调函数
* @param stamp 时间戳,毫秒
* @param packet 数据指针
* @param bytes 数据长度
*/
virtual void onPS(uint32_t stamp, void *packet, size_t bytes) = 0;
private:
void init();
//音视频时间戳同步用
void stampSync();
private:
struct track_info {
int track_id = -1;
Stamp stamp;
};
private:
uint32_t _timestamp = 0;
BufferRaw::Ptr _buffer;
List<Frame::Ptr> _frameCached;
std::shared_ptr<struct ps_muxer_t> _muxer;
unordered_map<int, track_info> _codec_to_trackid;
};
}//namespace mediakit
#endif //ENABLE_RTPPROXY
#endif //ZLMEDIAKIT_PSENCODER_H
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "PSRtpSender.h"
#include "Rtsp/RtspSession.h"
#include "Thread/WorkThreadPool.h"
namespace mediakit{
PSRtpSender::PSRtpSender(uint32_t ssrc, uint8_t payload_type) {
GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize);
_rtp_encoder = std::make_shared<CommonRtpEncoder>(CodecInvalid, ssrc, video_mtu, 90000, payload_type, 0);
_rtp_encoder->setRtpRing(std::make_shared<RtpRing::RingType>());
_rtp_encoder->getRtpRing()->setDelegate(std::make_shared<RingDelegateHelper>([this](const RtpPacket::Ptr &rtp, bool is_key){
onRtp(rtp, is_key);
}));
_poller = EventPollerPool::Instance().getPoller();
InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc());
}
PSRtpSender::~PSRtpSender() {
InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc());
}
void PSRtpSender::onPS(uint32_t stamp, void *packet, size_t bytes) {
_rtp_encoder->inputFrame(std::make_shared<FrameFromPtr>((char *) packet, bytes, stamp));
}
void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function<void(const SockException &ex)> &cb){
_is_udp = is_udp;
_socket = std::make_shared<Socket>();
_dst_url = dst_url;
_dst_port = dst_port;
weak_ptr<PSRtpSender> weak_self = shared_from_this();
if (is_udp) {
_socket->bindUdpSock(0);
WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self]() {
//切换线程目的是为了dns解析放在后台线程执行
struct sockaddr addr;
if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) {
cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url));
return;
}
cb(SockException());
auto strong_self = weak_self.lock();
if (strong_self) {
//dns解析成功
strong_self->_socket->setSendPeerAddr(&addr);
strong_self->onConnect();
}
});
} else {
_socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) {
cb(err);
auto strong_self = weak_self.lock();
if (strong_self && !err) {
//tcp连接成功
strong_self->onConnect();
}
});
}
}
void PSRtpSender::onConnect(){
_is_connect = true;
//加大发送缓存,防止udp丢包之类的问题
SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024);
if(!_is_udp){
//关闭tcp no_delay并开启MSG_MORE, 提高发送性能
SockUtil::setNoDelay(_socket->rawFD(), false);
_socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
//连接建立成功事件
weak_ptr<PSRtpSender> weak_self = shared_from_this();
_socket->setOnErr([weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->onErr(err);
}
});
InfoL << "开始发送 ps rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp;
}
void PSRtpSender::onRtp(const RtpPacket::Ptr &rtp, bool) {
if(!_is_connect){
return;
}
//开启合并写提高发送性能
PacketCache<RtpPacket>::inputPacket(true, rtp, false);
}
void PSRtpSender::onFlush(shared_ptr<List<RtpPacket::Ptr>> &rtp_list, bool key_pos) {
int i = 0;
int size = rtp_list->size();
rtp_list->for_each([&](const RtpPacket::Ptr &packet) {
if (_is_udp) {
//udp模式,rtp over tcp前4个字节可以忽略
_socket->send(std::make_shared<BufferRtp>(packet, 4), nullptr, 0, ++i == size);
} else {
//tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节
_socket->send(std::make_shared<BufferRtp>(packet, 2), nullptr, 0, ++i == size);
}
});
}
void PSRtpSender::onErr(const SockException &ex, bool is_connect) {
_is_connect = false;
//监听socket断开事件,方便重连
if (is_connect) {
WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what();
} else {
WarnL << "停止发送 ps rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what();
}
weak_ptr<PSRtpSender> weak_self = shared_from_this();
_connect_timer = std::make_shared<Timer>(10.0, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, [weak_self](const SockException &ex){
auto strong_self = weak_self.lock();
if (strong_self && ex) {
//连接失败且本对象未销毁,那么重试连接
strong_self->onErr(ex, true);
}
});
return false;
}, _poller);
}
}//namespace mediakit
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef ZLMEDIAKIT_PSRTPSENDER_H
#define ZLMEDIAKIT_PSRTPSENDER_H
#if defined(ENABLE_RTPPROXY)
#include "PSEncoder.h"
#include "Extension/CommonRtp.h"
namespace mediakit{
class RingDelegateHelper : public RingDelegate<RtpPacket::Ptr> {
public:
typedef function<void(const RtpPacket::Ptr &in, bool is_key)> onRtp;
~RingDelegateHelper() override{}
RingDelegateHelper(onRtp on_rtp){
_on_rtp = std::move(on_rtp);
}
void onWrite(const RtpPacket::Ptr &in, bool is_key) override{
_on_rtp(in, is_key);
}
private:
onRtp _on_rtp;
};
//该类在PSEncoder的基础上,实现了mpeg-ps的rtp打包以及发送
class PSRtpSender : public PSEncoder, public std::enable_shared_from_this<PSRtpSender>, public PacketCache<RtpPacket>{
public:
typedef std::shared_ptr<PSRtpSender> Ptr;
/**
* 构造函数
* @param ssrc rtp的ssrc
* @param payload_type 国标中ps-rtp的pt一般为96
*/
PSRtpSender(uint32_t ssrc, uint8_t payload_type = 96);
~PSRtpSender() override;
/**
* 开始发送ps-rtp包
* @param dst_url 目标ip或域名
* @param dst_port 目标端口
* @param is_udp 是否采用udp方式发送rtp
* @param cb 连接目标端口是否成功的回调
*/
void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function<void(const SockException &ex)> &cb);
protected:
//mpeg-ps回调
void onPS(uint32_t stamp, void *packet, size_t bytes) override;
/**
* 批量flush rtp包时触发该函数
* @param rtp_list rtp包列表
* @param key_pos 是否包含关键帧
*/
void onFlush(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override;
private:
//rtp打包后回调
void onRtp(const RtpPacket::Ptr &in, bool is_key);
//udp/tcp连接成功回调
void onConnect();
//异常断开socket事件
void onErr(const SockException &ex, bool is_connect = false);
private:
bool _is_udp;
bool _is_connect = false;
string _dst_url;
uint16_t _dst_port;
Socket::Ptr _socket;
EventPoller::Ptr _poller;
Timer::Ptr _connect_timer;
std::shared_ptr<CommonRtpEncoder> _rtp_encoder;
};
}//namespace mediakit
#endif// defined(ENABLE_RTPPROXY)
#endif //ZLMEDIAKIT_PSRTPSENDER_H
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论