Commit c7225dff by xiongziliang

整理rtsp客户端代码

parent 5d436d0a
......@@ -107,13 +107,13 @@ void RtspPlayer::onConnect(const SockException &err){
sendOptions();
}
void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
void RtspPlayer::onRecv(const Buffer::Ptr& buf) {
if(_benchmark_mode && !_play_check_timer){
//在性能测试模式下,如果rtsp握手完毕后,不再解析rtp包
_rtp_recv_ticker.resetTime();
return;
}
input(pBuf->data(),pBuf->size());
input(buf->data(), buf->size());
}
void RtspPlayer::onErr(const SockException &ex) {
......@@ -218,9 +218,9 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){
}
//发送SETUP命令
void RtspPlayer::sendSetup(unsigned int trackIndex) {
_on_response = std::bind(&RtspPlayer::handleResSETUP, this, placeholders::_1, trackIndex);
auto &track = _sdp_track[trackIndex];
void RtspPlayer::sendSetup(unsigned int track_idx) {
_on_response = std::bind(&RtspPlayer::handleResSETUP, this, placeholders::_1, track_idx);
auto &track = _sdp_track[track_idx];
auto baseUrl = _content_base + "/" + track->_control_surffix;
switch (_rtp_type) {
case Rtsp::RTP_TCP: {
......@@ -232,11 +232,11 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) {
}
break;
case Rtsp::RTP_UDP: {
createUdpSockIfNecessary(trackIndex);
createUdpSockIfNecessary(track_idx);
sendRtspRequest("SETUP", baseUrl, {"Transport",
StrPrinter << "RTP/AVP;unicast;client_port="
<< _rtp_sock[trackIndex]->get_local_port() << "-"
<< _rtcp_sock[trackIndex]->get_local_port()});
<< _rtp_sock[track_idx]->get_local_port() << "-"
<< _rtcp_sock[track_idx]->get_local_port()});
}
break;
default:
......@@ -244,12 +244,12 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) {
}
}
void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) {
void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) {
if (parser.Url() != "200") {
throw std::runtime_error(
StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl);
}
if (uiTrackIndex == 0) {
if (track_idx == 0) {
_session_id = parser["Session"];
_session_id.append(";");
_session_id = FindField(_session_id.data(), nullptr, ";");
......@@ -268,14 +268,14 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
if(_rtp_type == Rtsp::RTP_TCP) {
string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-");
_sdp_track[uiTrackIndex]->_interleaved = atoi(interleaved.data());
_sdp_track[track_idx]->_interleaved = atoi(interleaved.data());
}else{
const char *strPos = (_rtp_type == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ;
auto port_str = FindField((strTransport + ";").data(), strPos, ";");
uint16_t rtp_port = atoi(FindField(port_str.data(), NULL, "-").data());
uint16_t rtcp_port = atoi(FindField(port_str.data(), "-",NULL).data());
auto &pRtpSockRef = _rtp_sock[uiTrackIndex];
auto &pRtcpSockRef = _rtcp_sock[uiTrackIndex];
auto &pRtpSockRef = _rtp_sock[track_idx];
auto &pRtcpSockRef = _rtcp_sock[track_idx];
if (_rtp_type == Rtsp::RTP_MULTICAST) {
//udp组播
......@@ -290,7 +290,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data());
}
} else {
createUdpSockIfNecessary(uiTrackIndex);
createUdpSockIfNecessary(track_idx);
//udp单播
struct sockaddr_in rtpto;
rtpto.sin_port = ntohs(rtp_port);
......@@ -310,7 +310,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
auto srcIP = inet_addr(get_peer_ip().data());
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
//设置rtp over udp接收回调处理函数
pRtpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
pRtpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
......@@ -319,13 +319,13 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return;
}
strongSelf->handleOneRtp(uiTrackIndex, strongSelf->_sdp_track[uiTrackIndex]->_type,
strongSelf->_sdp_track[uiTrackIndex]->_samplerate, (unsigned char *) buf->data(), buf->size());
strongSelf->handleOneRtp(track_idx, strongSelf->_sdp_track[track_idx]->_type,
strongSelf->_sdp_track[track_idx]->_samplerate, (unsigned char *) buf->data(), buf->size());
});
if(pRtcpSockRef) {
//设置rtcp over udp接收回调处理函数
pRtcpSockRef->setOnRead([srcIP, uiTrackIndex, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
pRtcpSockRef->setOnRead([srcIP, track_idx, weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int addr_len) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
......@@ -334,14 +334,14 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr);
return;
}
strongSelf->onRtcpPacket(uiTrackIndex, strongSelf->_sdp_track[uiTrackIndex], (unsigned char *) buf->data(), buf->size());
strongSelf->onRtcpPacket(track_idx, strongSelf->_sdp_track[track_idx], (unsigned char *) buf->data(), buf->size());
});
}
}
if (uiTrackIndex < _sdp_track.size() - 1) {
if (track_idx < _sdp_track.size() - 1) {
//需要继续发送SETUP命令
sendSetup(uiTrackIndex + 1);
sendSetup(track_idx + 1);
return;
}
//所有setup命令发送完毕
......@@ -404,8 +404,8 @@ void RtspPlayer::sendPause(int type , uint32_t seekMS){
}
}
void RtspPlayer::pause(bool bPause) {
sendPause(bPause ? type_pause : type_seek, getProgressMilliSecond());
void RtspPlayer::pause(bool pause_flag) {
sendPause(pause_flag ? type_pause : type_seek, getProgressMilliSecond());
}
void RtspPlayer::handleResPAUSE(const Parser& parser,int type) {
......@@ -416,6 +416,7 @@ void RtspPlayer::handleResPAUSE(const Parser& parser,int type) {
break;
case type_play:
WarnL << "Play failed:" << parser.Url() << " " << parser.Tail() << endl;
onPlayResult_l(SockException(Err_shutdown, StrPrinter << "rtsp play failed:" << parser.Url() << " " << parser.Tail() ), !_play_check_timer);
break;
case type_seek:
WarnL << "Seek failed:" << parser.Url() << " " << parser.Tail() << endl;
......@@ -442,8 +443,8 @@ void RtspPlayer::handleResPAUSE(const Parser& parser,int type) {
iSeekTo = 1000 * atof(strStart.data());
DebugL << "seekTo(ms):" << iSeekTo;
}
//设置相对时间戳
onPlayResult_l(SockException(Err_success, type == type_seek ? "resum rtsp success" : "rtsp play success"), type == type_seek);
onPlayResult_l(SockException(Err_success, type == type_seek ? "resum rtsp success" : "rtsp play success"), !_play_check_timer);
}
void RtspPlayer::onWholeRtspPacket(Parser &parser) {
......@@ -473,7 +474,7 @@ void RtspPlayer::onRtpPacket(const char *data, uint64_t len) {
}
//此处预留rtcp处理函数
void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){}
void RtspPlayer::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){}
#if 0
//改代码提取自FFmpeg,参考之
......@@ -533,12 +534,12 @@ void RtspPlayer::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char
avio_w8(pb, 0);
#endif
void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){
void RtspPlayer::sendReceiverReport(bool over_tcp, int track_idx){
static const char s_cname[] = "ZLMediaKitRtsp";
uint8_t aui8Rtcp[4 + 32 + 10 + sizeof(s_cname) + 1] = {0};
uint8_t *pui8Rtcp_RR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_RR + 32;
auto &track = _sdp_track[iTrackIndex];
auto &counter = _rtcp_counter[iTrackIndex];
auto &track = _sdp_track[track_idx];
auto &counter = _rtcp_counter[track_idx];
aui8Rtcp[0] = '$';
aui8Rtcp[1] = track->_interleaved + 1;
......@@ -564,13 +565,13 @@ void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){
pui8Rtcp_RR[15] = 0x00;
//FIXME: max sequence received
int cycleCount = getCycleCount(iTrackIndex);
int cycleCount = getCycleCount(track_idx);
pui8Rtcp_RR[16] = cycleCount >> 8;
pui8Rtcp_RR[17] = cycleCount & 0xFF;
pui8Rtcp_RR[18] = counter.pktCnt >> 8;
pui8Rtcp_RR[19] = counter.pktCnt & 0xFF;
uint32_t jitter = htonl(getJitterSize(iTrackIndex));
uint32_t jitter = htonl(getJitterSize(track_idx));
//FIXME: jitter
memcpy(pui8Rtcp_RR + 20, &jitter , 4);
/* last SR timestamp */
......@@ -592,10 +593,10 @@ void RtspPlayer::sendReceiverReport(bool overTcp,int iTrackIndex){
memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname));
pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00;
if(overTcp){
if (over_tcp) {
send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp)));
}else if(_rtcp_sock[iTrackIndex]) {
_rtcp_sock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
} else if (_rtcp_sock[track_idx]) {
_rtcp_sock[track_idx]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4);
}
}
......@@ -613,24 +614,24 @@ void RtspPlayer::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx){
}
float RtspPlayer::getPacketLossRate(TrackType type) const{
int iTrackIdx = getTrackIndexByTrackType(type);
if(iTrackIdx == -1){
try {
auto track_idx = getTrackIndexByTrackType(type);
if (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1 == 0) {
return 0;
}
return 1.0 - (double) _rtp_recv_count[track_idx] / (_rtp_seq_now[track_idx] - _rtp_seq_start[track_idx] + 1);
} catch (...) {
uint64_t totalRecv = 0;
uint64_t totalSend = 0;
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
totalRecv += _rtp_recv_count[i];
totalSend += (_rtp_seq_now[i] - _rtp_seq_start[i] + 1);
}
if(totalSend == 0){
if (totalSend == 0) {
return 0;
}
return 1.0 - (double)totalRecv / totalSend;
}
if(_rtp_seq_now[iTrackIdx] - _rtp_seq_start[iTrackIdx] + 1 == 0){
return 0;
return 1.0 - (double) totalRecv / totalSend;
}
return 1.0 - (double)_rtp_recv_count[iTrackIdx] / (_rtp_seq_now[iTrackIdx] - _rtp_seq_start[iTrackIdx] + 1);
}
uint32_t RtspPlayer::getProgressMilliSecond() const{
......@@ -705,26 +706,26 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
SockSender::send(printer << "\r\n");
}
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pkt, const SdpTrack::Ptr &track) {
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) {
_rtp_recv_ticker.resetTime();
onRecvRTP(pkt, track);
onRecvRTP(rtp, track);
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
RtcpCounter &counter = _rtcp_counter[iTrackIndex];
counter.pktCnt = pkt->sequence;
auto &ticker = _rtcp_send_ticker[iTrackIndex];
int track_idx = getTrackIndexByTrackType(rtp->type);
RtcpCounter &counter = _rtcp_counter[track_idx];
counter.pktCnt = rtp->sequence;
auto &ticker = _rtcp_send_ticker[track_idx];
if (ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
counter.lastTimeStamp = counter.timeStamp;
//直接保存网络字节序
memcpy(&counter.timeStamp, pkt->data() + 8, 4);
memcpy(&counter.timeStamp, rtp->data() + 8, 4);
if (counter.lastTimeStamp != 0) {
sendReceiverReport(_rtp_type == Rtsp::RTP_TCP, iTrackIndex);
sendReceiverReport(_rtp_type == Rtsp::RTP_TCP, track_idx);
ticker.resetTime();
}
//有些rtsp服务器需要rtcp保活,有些需要发送信令保活
if (iTrackIndex == 0) {
if (track_idx == 0) {
//只需要发送一次心跳信令包
sendKeepAlive();
}
......@@ -785,16 +786,16 @@ int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const {
throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
}
int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const {
int RtspPlayer::getTrackIndexByTrackType(TrackType track_type) const {
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (_sdp_track[i]->_type == trackType) {
if (_sdp_track[i]->_type == track_type) {
return i;
}
}
if (_sdp_track.size() == 1) {
return 0;
}
throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) trackType);
throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) track_type);
}
} /* namespace mediakit */
......@@ -36,16 +36,18 @@ class RtspPlayer: public PlayerBase,public TcpClient, public RtspSplitter, publi
public:
typedef std::shared_ptr<RtspPlayer> Ptr;
RtspPlayer(const EventPoller::Ptr &poller) ;
virtual ~RtspPlayer(void);
RtspPlayer(const EventPoller::Ptr &poller);
~RtspPlayer() override;
void play(const string &strUrl) override;
void pause(bool bPause) override;
void pause(bool pause_flag) override;
void teardown() override;
float getPacketLossRate(TrackType type) const override;
protected:
//派生类回调函数
virtual bool onCheckSDP(const string &strSdp) = 0;
virtual void onRecvRTP(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) = 0;
virtual bool onCheckSDP(const string &sdp) = 0;
virtual void onRecvRTP(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) = 0;
uint32_t getProgressMilliSecond() const;
void seekToMilliSecond(uint32_t ms);
......@@ -64,47 +66,48 @@ protected:
/**
* rtp数据包排序后输出
* @param rtppt rtp数据包
* @param trackidx track索引
* @param rtp rtp数据包
* @param track_idx track索引
*/
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override;
void onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) override;
/**
* 收到RTCP包回调
* @param iTrackidx
* @param track
* @param pucData
* @param uiLen
* @param track_idx track索引
* @param track sdp相关信息
* @param data rtcp内容
* @param len rtcp内容长度
*/
virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len);
/////////////TcpClient override/////////////
void onConnect(const SockException &err) override;
void onRecv(const Buffer::Ptr &pBuf) override;
void onRecv(const Buffer::Ptr &buf) override;
void onErr(const SockException &ex) override;
private:
void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track);
void onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track);
void onPlayResult_l(const SockException &ex , bool handshakeCompleted);
int getTrackIndexByInterleaved(int interleaved) const;
int getTrackIndexByTrackType(TrackType trackType) const;
int getTrackIndexByTrackType(TrackType track_type) const;
void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex);
void handleResSETUP(const Parser &parser, unsigned int track_idx);
void handleResDESCRIBE(const Parser &parser);
bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr);
void handleResPAUSE(const Parser &parser, int type);
bool handleResponse(const string &cmd, const Parser &parser);
void sendOptions();
void sendSetup(unsigned int uiTrackIndex);
void sendSetup(unsigned int track_idx);
void sendPause(int type , uint32_t ms);
void sendDescribe();
void sendKeepAlive();
void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap());
void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header);
void sendReceiverReport(bool overTcp,int iTrackIndex);
void sendReceiverReport(bool over_tcp, int track_idx);
void createUdpSockIfNecessary(int track_idx);
private:
string _play_url;
vector<SdpTrack::Ptr> _sdp_track;
......@@ -148,5 +151,4 @@ private:
};
} /* namespace mediakit */
#endif /* SRC_RTSPPLAYER_RTSPPLAYER_H_TXT_ */
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论