Commit d611a189 by xiongguangjie

rtmp repush replace pre pusher

parent 67d5ca02
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include "RtmpSession.h" #include "RtmpSession.h"
#include "Common/config.h" #include "Common/config.h"
#include "Util/onceToken.h" #include "Util/onceToken.h"
#include "Network/Server.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
...@@ -142,7 +143,24 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -142,7 +143,24 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_media_info._schema = RTMP_SCHEMA; _media_info._schema = RTMP_SCHEMA;
auto now_stream_index = _now_stream_index; auto now_stream_index = _now_stream_index;
auto on_res = [this, token, now_stream_index](const string &err, const ProtocolOption &option) { auto on_res_push = [this](ProtocolOption option){
if (!_push_src) {
_push_src = std::make_shared<RtmpMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
//获取所有权
_push_src_ownership = _push_src->getOwnership();
_push_src->setProtocolOption(option);
}
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_continue_push_ms = option.continue_push_ms;
sendStatus({"level", "status",
"code", "NetStream.Publish.Start",
"description", "Started publishing stream.",
"clientid", "0" });
setSocketFlags();
};
auto on_res = [this, on_res_push,token, now_stream_index](const string &err, const ProtocolOption &option) {
_now_stream_index = now_stream_index; _now_stream_index = now_stream_index;
if (!err.empty()) { if (!err.empty()) {
sendStatus({ "level", "error", sendStatus({ "level", "error",
...@@ -156,10 +174,10 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -156,10 +174,10 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
assert(!_push_src); assert(!_push_src);
auto src = MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid); auto src = MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid);
auto push_failed = (bool)src; auto push_failed = (bool)src;
RtmpMediaSourceImp::Ptr rtmp_src;
while (src) { while (src) {
//尝试断连后继续推流 //尝试断连后继续推流
auto rtmp_src = dynamic_pointer_cast<RtmpMediaSourceImp>(src); rtmp_src = dynamic_pointer_cast<RtmpMediaSourceImp>(src);
if (!rtmp_src) { if (!rtmp_src) {
//源不是rtmp推流产生的 //源不是rtmp推流产生的
break; break;
...@@ -175,7 +193,28 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -175,7 +193,28 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
break; break;
} }
if (push_failed) { if (push_failed && rtmp_src) {
auto sock = rtmp_src->getOriginSock();
if(sock){
auto session = SessionMap::Instance().get(sock->getIdentifier());
if(session){
session->getPoller()->async_first([this,rtmp_src,on_res_push,option,session](){
session->shutdown(SockException(Err_eof, "other pusher kick session"));
_push_src_ownership = rtmp_src->getOwnership();
if(_push_src_ownership){
_push_src = std::move(rtmp_src);
}else{
WarnL<<"not reach this";
}
this->getPoller()->async_first([option,on_res_push](){
on_res_push(option);
});
});
return;
}else{
TraceL;
}
}
sendStatus({"level", "error", sendStatus({"level", "error",
"code", "NetStream.Publish.BadName", "code", "NetStream.Publish.BadName",
"description", "Already publishing.", "description", "Already publishing.",
...@@ -184,21 +223,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { ...@@ -184,21 +223,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
return; return;
} }
if (!_push_src) { on_res_push(option);
_push_src = std::make_shared<RtmpMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
//获取所有权
_push_src_ownership = _push_src->getOwnership();
_push_src->setProtocolOption(option);
}
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_continue_push_ms = option.continue_push_ms;
sendStatus({"level", "status",
"code", "NetStream.Publish.Start",
"description", "Started publishing stream.",
"clientid", "0" });
setSocketFlags();
}; };
if(_media_info._app.empty() || _media_info._streamid.empty()){ if(_media_info._app.empty() || _media_info._streamid.empty()){
......
...@@ -287,9 +287,13 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { ...@@ -287,9 +287,13 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
if(session && rtsp_session){ if(session && rtsp_session){
rtsp_src_imp = rtsp_session->getPushSrc(); rtsp_src_imp = rtsp_session->getPushSrc();
session->getPoller()->async_first([this,rtsp_src,option,rtsp_src_imp,onResPushSrc,session](){ session->getPoller()->async_first([this,rtsp_src,option,rtsp_src_imp,onResPushSrc,session](){
session->shutdown(SockException(Err_shutdown, "other pusher kick session")); session->shutdown(SockException(Err_eof, "other pusher kick session"));
_push_src_ownership = rtsp_src_imp->getOwnership(); _push_src_ownership = rtsp_src_imp->getOwnership();
if(_push_src_ownership){
_push_src = std::move(rtsp_src_imp); _push_src = std::move(rtsp_src_imp);
}else{
WarnL<<"not reach this";
}
this->getPoller()->async_first([option,onResPushSrc](){ this->getPoller()->async_first([option,onResPushSrc](){
onResPushSrc(option); onResPushSrc(option);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论