HttpSession.cpp 26.9 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3
 *
4
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
xiongziliang committed
5
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9
 */
xiongziliang committed
10

xzl committed
11 12 13
#include <stdio.h>
#include <sys/stat.h>
#include <algorithm>
xiongziliang committed
14
#include "Common/config.h"
xzl committed
15 16
#include "strCoding.h"
#include "HttpSession.h"
17
#include "HttpConst.h"
xiongziliang committed
18 19
#include "Util/base64.h"
#include "Util/SHA1.h"
xiongziliang committed
20
using namespace toolkit;
21

xiongziliang committed
22
namespace mediakit {
xzl committed
23

xiongziliang committed
24
HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
xiongziliang committed
25
    TraceP(this);
26 27
    GET_CONFIG(uint32_t,keep_alive_sec,Http::kKeepAliveSecond);
    pSock->setSendTimeOutSecond(keep_alive_sec);
xzl committed
28 29 30
}

HttpSession::~HttpSession() {
xiongziliang committed
31
    TraceP(this);
xzl committed
32 33
}

34
void HttpSession::Handle_Req_HEAD(ssize_t &content_len){
35 36 37
    //暂时全部返回200 OK,因为HTTP GET存在按需生成流的操作,所以不能按照HTTP GET的流程返回
    //如果直接返回404,那么又会导致按需生成流的逻辑失效,所以HTTP HEAD在静态文件或者已存在资源时才有效
    //对于按需生成流的直播场景并不适用
38
    sendResponse(200, true);
39
}
ziyue committed
40 41 42 43 44 45 46 47 48

void HttpSession::Handle_Req_OPTIONS(ssize_t &content_len) {
    KeyValue header;
    header.emplace("Allow", "GET, POST, OPTIONS");
    header.emplace("Access-Control-Allow-Origin", "*");
    header.emplace("Access-Control-Allow-Credentials", "true");
    header.emplace("Access-Control-Request-Methods", "GET, POST, OPTIONS");
    header.emplace("Access-Control-Request-Headers", "Accept,Accept-Language,Content-Language,Content-Type");
    sendResponse(200, true, nullptr, header);
CharleyWangHZ committed
49
}
50

51 52
ssize_t HttpSession::onRecvHeader(const char *header,size_t len) {
    typedef void (HttpSession::*HttpCMDHandle)(ssize_t &);
53 54 55 56
    static unordered_map<string, HttpCMDHandle> s_func_map;
    static onceToken token([]() {
        s_func_map.emplace("GET",&HttpSession::Handle_Req_GET);
        s_func_map.emplace("POST",&HttpSession::Handle_Req_POST);
57
        s_func_map.emplace("HEAD",&HttpSession::Handle_Req_HEAD);
CharleyWangHZ committed
58
        s_func_map.emplace("OPTIONS",&HttpSession::Handle_Req_OPTIONS);
59
    }, nullptr);
xiongziliang committed
60

61 62 63 64 65
    _parser.Parse(header);
    urlDecode(_parser);
    string cmd = _parser.Method();
    auto it = s_func_map.find(cmd);
    if (it == s_func_map.end()) {
xiongziliang committed
66
        WarnP(this) << "不支持该命令:" << cmd;
67
        sendResponse(405, true);
xiongziliang committed
68
        return 0;
69
    }
70

xiongziliang committed
71 72 73 74
    //跨域
    _origin = _parser["Origin"];

    //默认后面数据不是content而是header
75
    ssize_t content_len = 0;
76
    auto &fun = it->second;
xiongziliang committed
77 78 79 80 81 82
    try {
        (this->*fun)(content_len);
    }catch (exception &ex){
        shutdown(SockException(Err_shutdown,ex.what()));
    }

83 84 85 86
    //清空解析器节省内存
    _parser.Clear();
    //返回content长度
    return content_len;
xzl committed
87
}
88

89
void HttpSession::onRecvContent(const char *data,size_t len) {
90 91 92 93 94
    if(_contentCallBack){
        if(!_contentCallBack(data,len)){
            _contentCallBack = nullptr;
        }
    }
95 96 97
}

void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
98
    _ticker.resetTime();
99
    input(pBuf->data(),pBuf->size());
100 101
}

xzl committed
102
void HttpSession::onError(const SockException& err) {
103
    if(_is_live_stream){
104
        uint64_t duration = _ticker.createdTime()/1000;
105
        //flv/ts播放器
106
        WarnP(this) << "FLV/TS/FMP4播放器("
xiongziliang committed
107 108 109
                    << _mediaInfo._vhost << "/"
                    << _mediaInfo._app << "/"
                    << _mediaInfo._streamid
110 111
                    << ")断开:" << err.what()
                    << ",耗时(s):" << duration;
xiongziliang committed
112 113

        GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
114
        if(_total_bytes_usage >= iFlowThreshold * 1024){
115
            NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, duration , true, static_cast<SockInfo &>(*this));
xiongziliang committed
116 117 118 119 120
        }
        return;
    }

    //http客户端
xiongziliang committed
121 122 123 124 125
    if(_ticker.createdTime() < 10 * 1000){
        TraceP(this) << err.what();
    }else{
        WarnP(this) << err.what();
    }
xzl committed
126 127 128
}

void HttpSession::onManager() {
129
    GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond);
130

131
    if(_ticker.elapsedTime() > keepAliveSec * 1000){
132 133 134
        //1分钟超时
        shutdown(SockException(Err_timeout,"session timeouted"));
    }
xzl committed
135
}
xiongziliang committed
136

137
bool HttpSession::checkWebSocket(){
138
    auto Sec_WebSocket_Key = _parser["Sec-WebSocket-Key"];
139
    if (Sec_WebSocket_Key.empty()) {
140 141
        return false;
    }
142
    auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
143 144 145 146 147

    KeyValue headerOut;
    headerOut["Upgrade"] = "websocket";
    headerOut["Connection"] = "Upgrade";
    headerOut["Sec-WebSocket-Accept"] = Sec_WebSocket_Accept;
148
    if (!_parser["Sec-WebSocket-Protocol"].empty()) {
149 150
        headerOut["Sec-WebSocket-Protocol"] = _parser["Sec-WebSocket-Protocol"];
    }
151

152
    auto res_cb = [this, headerOut]() {
153
        _live_over_websocket = true;
154
        sendResponse(101, false, nullptr, headerOut, nullptr, true);
155 156 157
    };

    //判断是否为websocket-flv
158
    if (checkLiveStreamFlv(res_cb)) {
159 160 161 162
        //这里是websocket-flv直播请求
        return true;
    }

163 164 165 166 167 168
    //判断是否为websocket-ts
    if (checkLiveStreamTS(res_cb)) {
        //这里是websocket-ts直播请求
        return true;
    }

169 170 171 172 173 174
    //判断是否为websocket-fmp4
    if (checkLiveStreamFMP4(res_cb)) {
        //这里是websocket-fmp4直播请求
        return true;
    }

175
    //这是普通的websocket连接
176
    if (!onWebSocketConnect(_parser)) {
177
        sendResponse(501, true, nullptr, headerOut);
178 179
        return true;
    }
180
    sendResponse(101, false, nullptr, headerOut, nullptr, true);
181
    return true;
xiongziliang committed
182
}
183

184
bool HttpSession::checkLiveStream(const string &schema, const string  &url_suffix, const function<void(const MediaSource::Ptr &src)> &cb){
185 186
    auto pos = strcasestr(_parser.Url().data(), url_suffix.data());
    if (!pos || pos + url_suffix.size() != 1 + &_parser.Url().back()) {
187
        //未找到后缀
188 189 190
        return false;
    }

191 192 193
    //这是个符合后缀的直播的流
    _mediaInfo.parse(schema + "://" + _parser["Host"] + _parser.FullUrl());
    if (_mediaInfo._app.empty() || _mediaInfo._streamid.size() < url_suffix.size() + 1) {
194
        //url不合法
195
        return false;
196
    }
197 198 199 200 201
    //去除后缀
    bool close_flag = !strcasecmp(_parser["Connection"].data(), "close");
    //流id去除后缀
    _mediaInfo._streamid.erase(_mediaInfo._streamid.size() - url_suffix.size());
    weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
202 203

    //鉴权结果回调
204 205 206
    auto onRes = [cb, weak_self, close_flag](const string &err) {
        auto strong_self = weak_self.lock();
        if (!strong_self) {
207 208 209
            //本对象已经销毁
            return;
        }
210

211
        if (!err.empty()) {
212
            //播放鉴权失败
213
            strong_self->sendResponse(401, close_flag, nullptr, KeyValue(), std::make_shared<HttpStringBody>(err));
214 215
            return;
        }
216

217 218 219 220
        //异步查找直播流
        MediaSource::findAsync(strong_self->_mediaInfo, strong_self, [weak_self, close_flag, cb](const MediaSource::Ptr &src) {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
221 222 223
                //本对象已经销毁
                return;
            }
224
            if (!src) {
225
                //未找到该流
226
                strong_self->sendNotFound(close_flag);
227
                return;
228
            }
229 230 231
            strong_self->_is_live_stream = true;
            //触发回调
            cb(src);
232 233
        });
    };
234

235 236
    Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) {
        auto strongSelf = weak_self.lock();
237 238
        if (!strongSelf) {
            return;
239
        }
240 241 242 243 244 245 246 247 248 249
        strongSelf->async([onRes, err]() {
            onRes(err);
        });
    };

    auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, static_cast<SockInfo &>(*this));
    if (!flag) {
        //该事件无人监听,默认不鉴权
        onRes("");
    }
250
    return true;
xiongziliang committed
251
}
252

253 254 255 256 257 258 259
//http-fmp4 链接格式:http://vhost-url:port/app/streamid.live.mp4?key1=value1&key2=value2
bool HttpSession::checkLiveStreamFMP4(const function<void()> &cb){
    return checkLiveStream(FMP4_SCHEMA, ".live.mp4", [this, cb](const MediaSource::Ptr &src) {
        auto fmp4_src = dynamic_pointer_cast<FMP4MediaSource>(src);
        assert(fmp4_src);
        if (!cb) {
            //找到源,发送http头,负载后续发送
260
            sendResponse(200, false, HttpFileManager::getContentType(".mp4").data(), KeyValue(), nullptr, true);
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
        } else {
            //自定义发送http头
            cb();
        }

        //直播牺牲延时提升发送性能
        setSocketFlags();
        onWrite(std::make_shared<BufferString>(fmp4_src->getInitSegment()), true);
        weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
        _fmp4_reader = fmp4_src->getRing()->attach(getPoller());
        _fmp4_reader->setDetachCB([weak_self]() {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                //本对象已经销毁
                return;
            }
            strong_self->shutdown(SockException(Err_shutdown, "fmp4 ring buffer detached"));
        });
        _fmp4_reader->setReadCB([weak_self](const FMP4MediaSource::RingDataType &fmp4_list) {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                //本对象已经销毁
                return;
            }
285 286
            size_t i = 0;
            auto size = fmp4_list->size();
287 288 289 290 291 292 293
            fmp4_list->for_each([&](const FMP4Packet::Ptr &ts) {
                strong_self->onWrite(ts, ++i == size);
            });
        });
    });
}

294
//http-ts 链接格式:http://vhost-url:port/app/streamid.live.ts?key1=value1&key2=value2
295
bool HttpSession::checkLiveStreamTS(const function<void()> &cb){
296
    return checkLiveStream(TS_SCHEMA, ".live.ts", [this, cb](const MediaSource::Ptr &src) {
297 298 299 300
        auto ts_src = dynamic_pointer_cast<TSMediaSource>(src);
        assert(ts_src);
        if (!cb) {
            //找到源,发送http头,负载后续发送
301
            sendResponse(200, false, HttpFileManager::getContentType(".ts").data(), KeyValue(), nullptr, true);
302 303 304 305 306 307 308
        } else {
            //自定义发送http头
            cb();
        }

        //直播牺牲延时提升发送性能
        setSocketFlags();
309
        weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
310
        _ts_reader = ts_src->getRing()->attach(getPoller());
311 312 313 314 315 316 317 318 319 320 321
        _ts_reader->setDetachCB([weak_self](){
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                //本对象已经销毁
                return;
            }
            strong_self->shutdown(SockException(Err_shutdown,"ts ring buffer detached"));
        });
        _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
322 323 324
                //本对象已经销毁
                return;
            }
325 326
            size_t i = 0;
            auto size = ts_list->size();
327
            ts_list->for_each([&](const TSPacket::Ptr &ts) {
328
                strong_self->onWrite(ts, ++i == size);
329 330 331 332 333 334 335 336 337 338 339 340
            });
        });
    });
}

//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2
bool HttpSession::checkLiveStreamFlv(const function<void()> &cb){
    return checkLiveStream(RTMP_SCHEMA, ".flv", [this, cb](const MediaSource::Ptr &src) {
        auto rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(src);
        assert(rtmp_src);
        if (!cb) {
            //找到源,发送http头,负载后续发送
341
            sendResponse(200, false, HttpFileManager::getContentType(".flv").data(), KeyValue(), nullptr, true);
342 343 344 345 346 347
        } else {
            //自定义发送http头
            cb();
        }
        //直播牺牲延时提升发送性能
        setSocketFlags();
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362

        //非H264/AAC时打印警告日志,防止用户提无效问题
        auto tracks = src->getTracks(false);
        for (auto &track : tracks) {
            switch (track->getCodecId()) {
                case CodecH264:
                case CodecAAC:
                    break;
                default: {
                    WarnP(this) << "flv播放器一般只支持H264和AAC编码,该编码格式可能不被播放器支持:" << track->getCodecName();
                    break;
                }
            }
        }

363 364 365 366
        start(getPoller(), rtmp_src);
    });
}

367
void HttpSession::Handle_Req_GET(ssize_t &content_len) {
368 369 370
    Handle_Req_GET_l(content_len, true);
}

371
void HttpSession::Handle_Req_GET_l(ssize_t &content_len, bool sendBody) {
372
    //先看看是否为WebSocket请求
373
    if (checkWebSocket()) {
374
        content_len = -1;
375
        _contentCallBack = [this](const char *data, size_t len) {
376
            WebSocketSplitter::decode((uint8_t *) data, len);
377 378 379 380 381
            //_contentCallBack是可持续的,后面还要处理后续数据
            return true;
        };
        return;
    }
xiongziliang committed
382

383
    if (emitHttpEvent(false)) {
384
        //拦截http api事件
385 386
        return;
    }
xiongziliang committed
387

388
    if (checkLiveStreamFlv()) {
389 390 391 392
        //拦截http-flv播放器
        return;
    }

393 394 395 396
    if (checkLiveStreamTS()) {
        //拦截http-ts播放器
        return;
    }
397

398 399 400 401 402
    if (checkLiveStreamFMP4()) {
        //拦截http-fmp4播放器
        return;
    }

403
    bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
404
    weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
405
    HttpFileManager::onAccessPath(*this, _parser, [weakSelf, bClose](int code, const string &content_type,
406 407 408
                                                                     const StrCaseMap &responseHeader, const HttpBody::Ptr &body) {
        auto strongSelf = weakSelf.lock();
        if (!strongSelf) {
409
            return;
410
        }
411
        strongSelf->async([weakSelf, bClose, code, content_type, responseHeader, body]() {
412 413 414
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
415
            }
416
            strongSelf->sendResponse(code, bClose, content_type.data(), responseHeader, body);
417
        });
418
    });
xzl committed
419 420
}

421 422 423 424 425
static string dateStr() {
    char buf[64];
    time_t tt = time(NULL);
    strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
    return buf;
xzl committed
426
}
427

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
class AsyncSenderData {
public:
    friend class AsyncSender;
    typedef std::shared_ptr<AsyncSenderData> Ptr;
    AsyncSenderData(const TcpSession::Ptr &session, const HttpBody::Ptr &body, bool close_when_complete) {
        _session = dynamic_pointer_cast<HttpSession>(session);
        _body = body;
        _close_when_complete = close_when_complete;
    }
    ~AsyncSenderData() = default;
private:
    std::weak_ptr<HttpSession> _session;
    HttpBody::Ptr _body;
    bool _close_when_complete;
    bool _read_complete = false;
};

class AsyncSender {
public:
    typedef std::shared_ptr<AsyncSender> Ptr;
    static bool onSocketFlushed(const AsyncSenderData::Ptr &data) {
        if (data->_read_complete) {
            if (data->_close_when_complete) {
                //发送完毕需要关闭socket
                shutdown(data->_session.lock());
            }
            return false;
        }

        GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize);
        data->_body->readDataAsync(sendBufSize, [data](const Buffer::Ptr &sendBuf) {
            auto session = data->_session.lock();
            if (!session) {
                //本对象已经销毁
                return;
            }
            session->async([data, sendBuf]() {
                auto session = data->_session.lock();
                if (!session) {
                    //本对象已经销毁
                    return;
                }
                onRequestData(data, session, sendBuf);
            }, false);
        });
        return true;
    }

private:
    static void onRequestData(const AsyncSenderData::Ptr &data, const std::shared_ptr<HttpSession> &session, const Buffer::Ptr &sendBuf) {
        session->_ticker.resetTime();
        if (sendBuf && session->send(sendBuf) != -1) {
            //文件还未读完,还需要继续发送
            if (!session->isSocketBusy()) {
                //socket还可写,继续请求数据
                onSocketFlushed(data);
            }
            return;
        }
        //文件写完了
        data->_read_complete = true;
        if (!session->isSocketBusy() && data->_close_when_complete) {
            shutdown(session);
        }
    }

    static void shutdown(const std::shared_ptr<HttpSession> &session) {
        if(session){
            session->shutdown(SockException(Err_shutdown, StrPrinter << "close connection after send http body completed."));
        }
    }
};

501 502 503 504 505 506 507 508 509
static const string kDate = "Date";
static const string kServer = "Server";
static const string kConnection = "Connection";
static const string kKeepAlive = "Keep-Alive";
static const string kContentType = "Content-Type";
static const string kContentLength = "Content-Length";
static const string kAccessControlAllowOrigin = "Access-Control-Allow-Origin";
static const string kAccessControlAllowCredentials = "Access-Control-Allow-Credentials";

510
void HttpSession::sendResponse(int code,
xiongziliang committed
511 512 513 514
                               bool bClose,
                               const char *pcContentType,
                               const HttpSession::KeyValue &header,
                               const HttpBody::Ptr &body,
515
                               bool no_content_length ){
xiongziliang committed
516 517 518 519
    GET_CONFIG(string,charSet,Http::kCharSet);
    GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond);

    //body默认为空
520
    ssize_t size = 0;
xiongziliang committed
521 522 523 524 525
    if (body && body->remainSize()) {
        //有body,获取body大小
        size = body->remainSize();
    }

526
    if(no_content_length){
527
        //http-flv直播是Keep-Alive类型
xiongziliang committed
528
        bClose = false;
xia-chu committed
529
    }else if((size_t) size >= SIZE_MAX || size < 0 ){
530 531
        //不固定长度的body,那么发送完body后应该关闭socket,以便浏览器做下载完毕的判断
        bClose = true;
xiongziliang committed
532 533 534
    }

    HttpSession::KeyValue &headerOut = const_cast<HttpSession::KeyValue &>(header);
535
    headerOut.emplace(kDate, dateStr());
xiongziliang committed
536
    headerOut.emplace(kServer, SERVER_NAME);
537
    headerOut.emplace(kConnection, bClose ? "close" : "keep-alive");
xiongziliang committed
538
    if(!bClose){
539 540 541 542
        string keepAliveString = "timeout=";
        keepAliveString += to_string(keepAliveSec);
        keepAliveString += ", max=100";
        headerOut.emplace(kKeepAlive,std::move(keepAliveString));
xiongziliang committed
543 544 545 546
    }

    if(!_origin.empty()){
        //设置跨域
547 548
        headerOut.emplace(kAccessControlAllowOrigin,_origin);
        headerOut.emplace(kAccessControlAllowCredentials, "true");
xiongziliang committed
549 550
    }

xia-chu committed
551
    if(!no_content_length && size >= 0 && (size_t)size < SIZE_MAX){
552
        //文件长度为固定值,且不是http-flv强制设置Content-Length
553
        headerOut[kContentLength] = to_string(size);
xiongziliang committed
554 555 556 557 558 559 560
    }

    if(size && !pcContentType){
        //有body时,设置缺省类型
        pcContentType = "text/plain";
    }

561
    if((size || no_content_length) && pcContentType){
xiongziliang committed
562
        //有body时,设置文件类型
563 564 565 566
        string strContentType = pcContentType;
        strContentType += "; charset=";
        strContentType += charSet;
        headerOut.emplace(kContentType,std::move(strContentType));
xiongziliang committed
567 568
    }

569
    //发送http头
570 571 572
    string str;
    str.reserve(256);
    str += "HTTP/1.1 " ;
xiongziliang committed
573 574
    str += to_string(code);
    str += ' ';
575
    str += getHttpStatusMessage(code) ;
576
    str += "\r\n";
577
    for (auto &pr : header) {
578 579 580 581
        str += pr.first ;
        str += ": ";
        str += pr.second;
        str += "\r\n";
582
    }
583
    str += "\r\n";
xiongziliang committed
584
    SockSender::send(std::move(str));
585 586
    _ticker.resetTime();

xiongziliang committed
587
    if(!size){
588 589
        //没有body
        if(bClose){
590
            shutdown(SockException(Err_shutdown,StrPrinter << "close connection after send http header completed with status code:" << code));
591 592 593 594
        }
        return;
    }

595
    GET_CONFIG(uint32_t, sendBufSize, Http::kSendBufSize);
596 597 598 599
    if(body->remainSize() > sendBufSize){
        //文件下载提升发送性能
        setSocketFlags();
    }
600 601 602

    //发送http body
    AsyncSenderData::Ptr data = std::make_shared<AsyncSenderData>(shared_from_this(),body,bClose);
603
    getSock()->setOnFlush([data](){
604 605 606
        return AsyncSender::onSocketFlushed(data);
    });
    AsyncSender::onSocketFlushed(data);
607 608
}

xiongziliang committed
609
string HttpSession::urlDecode(const string &str){
610
    auto ret = strCoding::UrlDecode(str);
611
#ifdef _WIN32
612
    GET_CONFIG(string,charSet,Http::kCharSet);
613 614 615 616
    bool isGb2312 = !strcasecmp(charSet.data(), "gb2312");
    if (isGb2312) {
        ret = strCoding::UTF8ToGB2312(ret);
    }
617
#endif // _WIN32
xiongziliang committed
618 619
    return ret;
}
620

621
void HttpSession::urlDecode(Parser &parser){
622 623 624 625
    parser.setUrl(urlDecode(parser.Url()));
    for(auto &pr : _parser.getUrlArgs()){
        const_cast<string &>(pr.second) = urlDecode(pr.second);
    }
xiongziliang committed
626
}
xzl committed
627

628
bool HttpSession::emitHttpEvent(bool doInvoke){
629
    bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
630 631
    /////////////////////异步回复Invoker///////////////////////////////
    weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
632
    HttpResponseInvoker invoker = [weakSelf,bClose](int code, const KeyValue &headerOut, const HttpBody::Ptr &body){
633 634 635 636
        auto strongSelf = weakSelf.lock();
        if(!strongSelf) {
            return;
        }
637
        strongSelf->async([weakSelf, bClose, code, headerOut, body]() {
638
            auto strongSelf = weakSelf.lock();
639
            if (!strongSelf) {
640
                //本对象已经销毁
641 642
                return;
            }
643
            strongSelf->sendResponse(code, bClose, nullptr, headerOut, body);
644 645 646 647
        });
    };
    ///////////////////广播HTTP事件///////////////////////////
    bool consumed = false;//该事件是否被消费
648
    NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,static_cast<SockInfo &>(*this));
649 650
    if(!consumed && doInvoke){
        //该事件无人消费,所以返回404
651
        invoker(404,KeyValue(), HttpBody::Ptr());
652 653
    }
    return consumed;
xiongziliang committed
654
}
655

656
void HttpSession::Handle_Req_POST(ssize_t &content_len) {
657
    GET_CONFIG(size_t,maxReqSize,Http::kMaxReqSize);
658

659
    ssize_t totalContentLen = _parser["Content-Length"].empty() ? -1 : atoll(_parser["Content-Length"].data());
660

661 662 663 664 665 666
    if(totalContentLen == 0){
        //content为空
        //emitHttpEvent内部会选择是否关闭连接
        emitHttpEvent(true);
        return;
    }
667

668
    if(totalContentLen > 0 && (size_t)totalContentLen < maxReqSize ){
669 670 671
        //返回固定长度的content
        content_len = totalContentLen;
        auto parserCopy = _parser;
672
        _contentCallBack = [this,parserCopy](const char *data,size_t len){
673 674 675 676 677 678 679 680 681 682 683 684
            //恢复http头
            _parser = parserCopy;
            //设置content
            _parser.setContent(string(data,len));
            //触发http事件,emitHttpEvent内部会选择是否关闭连接
            emitHttpEvent(true);
            //清空数据,节省内存
            _parser.Clear();
            //content已经接收完毕
            return false;
        };
    }else{
685
        //返回不固定长度的content或者超过长度限制的content
686 687
        content_len = -1;
        auto parserCopy = _parser;
xiongziliang committed
688
        std::shared_ptr<size_t> recvedContentLen = std::make_shared<size_t>(0);
689 690
        bool bClose = !strcasecmp(_parser["Connection"].data(),"close");

691
        _contentCallBack = [this,parserCopy,totalContentLen,recvedContentLen,bClose](const char *data,size_t len){
692
            *(recvedContentLen) += len;
693 694 695 696 697
            if (totalContentLen < 0) {
                //不固定长度的content,源源不断接收数据
                onRecvUnlimitedContent(parserCopy, data, len, SIZE_MAX, *(recvedContentLen));
                return true;
            }
698

699
            //长度超过限制的content
700 701
            onRecvUnlimitedContent(parserCopy,data,len,totalContentLen,*(recvedContentLen));

702
            if(*(recvedContentLen) < (size_t)totalContentLen){
703
                //数据还没接收完毕
704
                //_contentCallBack是可持续的,后面还要处理后续content数据
705
                return true;
706
            }
707

708
            //数据接收完毕
709
            if(!bClose){
710 711 712
                //keep-alive类型连接
                //content接收完毕,后续都是http header
                setContentLen(0);
713
                //content已经接收完毕
714 715 716 717
                return false;
            }

            //连接类型是close类型,收完content就关闭连接
xiongziliang committed
718
            shutdown(SockException(Err_shutdown,"recv http content completed"));
719
            //content已经接收完毕
720
            return false ;
721 722 723
        };
    }
    //有后续content数据要处理,暂时不关闭连接
724
}
725 726

void HttpSession::sendNotFound(bool bClose) {
727
    GET_CONFIG(string,notFound,Http::kNotFound);
728
    sendResponse(404, bClose,"text/html",KeyValue(),std::make_shared<HttpStringBody>(notFound));
xzl committed
729 730
}

731
void HttpSession::setSocketFlags(){
732 733
    GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
    if(mergeWriteMS > 0) {
734
        //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高
735
        SockUtil::setNoDelay(getSock()->rawFD(), false);
736
        //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能
xiongziliang committed
737
        setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
738 739
    }
}
xiongziliang committed
740

xiongziliang committed
741 742 743 744 745 746
void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) {
    if(flush){
        //需要flush那么一次刷新缓存
        HttpSession::setSendFlushFlag(true);
    }

747
    _ticker.resetTime();
748 749
    if (!_live_over_websocket) {
        _total_bytes_usage += buffer->size();
750
        send(buffer);
751
    } else {
xiongziliang committed
752 753 754 755 756
        WebSocketHeader header;
        header._fin = true;
        header._reserved = 0;
        header._opcode = WebSocketHeader::BINARY;
        header._mask_flag = false;
757
        WebSocketSplitter::encode(header, buffer);
758 759
    }

760
    if (flush) {
xiongziliang committed
761 762 763
        //本次刷新缓存后,下次不用刷新缓存
        HttpSession::setSendFlushFlag(false);
    }
764 765
}

766
void HttpSession::onWebSocketEncodeData(Buffer::Ptr buffer){
767
    _total_bytes_usage += buffer->size();
768
    send(std::move(buffer));
xiongziliang committed
769 770
}

771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
void HttpSession::onWebSocketDecodeComplete(const WebSocketHeader &header_in){
    WebSocketHeader& header = const_cast<WebSocketHeader&>(header_in);
    header._mask_flag = false;

    switch (header._opcode) {
        case WebSocketHeader::CLOSE: {
            encode(header, nullptr);
            shutdown(SockException(Err_shutdown, "recv close request from client"));
            break;
        }

        default : break;
    }
}

xiongziliang committed
786
void HttpSession::onDetach() {
787
    shutdown(SockException(Err_shutdown,"rtmp ring buffer detached"));
xiongziliang committed
788 789 790
}

std::shared_ptr<FlvMuxer> HttpSession::getSharedPtr(){
791
    return dynamic_pointer_cast<FlvMuxer>(shared_from_this());
792 793
}

xiongziliang committed
794
} /* namespace mediakit */