RtmpProtocol.cpp 26 KB
Newer Older
xiongziliang committed
1
/*
xiongziliang committed
2
 * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
xiongziliang committed
3
 *
4
 * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
xiongziliang committed
5
 *
xiongziliang committed
6 7 8
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
xzl committed
9
 */
xiongziliang committed
10

xzl committed
11
#include "RtmpProtocol.h"
xiongzilaing committed
12
#include "Rtmp/utils.h"
xia-chu committed
13
#include "RtmpMediaSource.h"
xiongziliang committed
14
using namespace toolkit;
xzl committed
15

xiongziliang committed
16 17 18 19 20 21 22 23 24
#define C1_DIGEST_SIZE 32
#define C1_KEY_SIZE 128
#define C1_SCHEMA_SIZE 764
#define C1_HANDSHARK_SIZE (RANDOM_LEN + 8)
#define C1_FPKEY_SIZE 30
#define S1_FMS_KEY_SIZE 36
#define S2_FMS_KEY_SIZE 68
#define C1_OFFSET_SIZE 4

xiongziliang committed
25 26 27 28 29
#ifdef ENABLE_OPENSSL
#include "Util/SSLBox.h"
#include <openssl/hmac.h>
#include <openssl/opensslv.h>

30
static string openssl_HMACsha256(const void *key, size_t key_len, const void *data, size_t data_len){
xiongziliang committed
31
    std::shared_ptr<char> out(new char[32], [](char *ptr) { delete[] ptr; });
32
    unsigned int out_len;
33

34 35
#if defined(OPENSSL_VERSION_NUMBER) && (OPENSSL_VERSION_NUMBER > 0x10100000L)
    //openssl 1.1.0新增api,老版本api作废
36 37
    HMAC_CTX *ctx = HMAC_CTX_new();
    HMAC_CTX_reset(ctx);
38
    HMAC_Init_ex(ctx, key, (int)key_len, EVP_sha256(), NULL);
39 40 41 42
    HMAC_Update(ctx, (unsigned char*)data, data_len);
    HMAC_Final(ctx, (unsigned char *)out.get(), &out_len);
    HMAC_CTX_reset(ctx);
    HMAC_CTX_free(ctx);
43
#else
44 45 46 47 48 49
    HMAC_CTX ctx;
    HMAC_CTX_init(&ctx);
    HMAC_Init_ex(&ctx, key, key_len, EVP_sha256(), NULL);
    HMAC_Update(&ctx, (unsigned char*)data, data_len);
    HMAC_Final(&ctx, (unsigned char *)out.get(), &out_len);
    HMAC_CTX_cleanup(&ctx);
50
#endif //defined(OPENSSL_VERSION_NUMBER) && (OPENSSL_VERSION_NUMBER > 0x10100000L)
51
    return string(out.get(),out_len);
52 53 54
}
#endif //ENABLE_OPENSSL

xiongziliang committed
55
namespace mediakit {
xzl committed
56 57

RtmpProtocol::RtmpProtocol() {
58
    _next_step_func = [this](const char *data, size_t len) {
59
        return handle_C0C1(data, len);
60
    };
xzl committed
61
}
xiongziliang committed
62

xzl committed
63
RtmpProtocol::~RtmpProtocol() {
64
    reset();
xzl committed
65
}
xiongziliang committed
66

xiongziliang committed
67
void RtmpProtocol::reset() {
68
    ////////////ChunkSize////////////
xiongziliang committed
69 70
    _chunk_size_in = DEFAULT_CHUNK_LEN;
    _chunk_size_out = DEFAULT_CHUNK_LEN;
71
    ////////////Acknowledgement////////////
xiongziliang committed
72 73 74
    _bytes_sent = 0;
    _bytes_sent_last = 0;
    _windows_size = 0;
75
    ///////////PeerBandwidth///////////
xiongziliang committed
76 77
    _bandwidth = 2500000;
    _band_limit_type = 2;
78
    ////////////Chunk////////////
xiongziliang committed
79 80 81
    _map_chunk_data.clear();
    _now_stream_index = 0;
    _now_chunk_id = 0;
82
    //////////Invoke Request//////////
xiongziliang committed
83
    _send_req_id = 0;
84
    //////////Rtmp parser//////////
85
    HttpRequestSplitter::reset();
xiongziliang committed
86
    _stream_index = STREAM_CONTROL;
87
    _next_step_func = [this](const char *data, size_t len) {
88
        return handle_C0C1(data, len);
89
    };
xzl committed
90 91
}

xiongziliang committed
92 93 94 95
void RtmpProtocol::sendAcknowledgement(uint32_t size) {
    size = htonl(size);
    std::string acknowledgement((char *) &size, 4);
    sendRequest(MSG_ACK, acknowledgement);
xzl committed
96 97
}

xiongziliang committed
98 99 100
void RtmpProtocol::sendAcknowledgementSize(uint32_t size) {
    size = htonl(size);
    std::string set_windowSize((char *) &size, 4);
101
    sendRequest(MSG_WIN_SIZE, set_windowSize);
xzl committed
102 103
}

xiongziliang committed
104 105 106
void RtmpProtocol::sendPeerBandwidth(uint32_t size) {
    size = htonl(size);
    std::string set_peerBandwidth((char *) &size, 4);
107 108
    set_peerBandwidth.push_back((char) 0x02);
    sendRequest(MSG_SET_PEER_BW, set_peerBandwidth);
xzl committed
109 110
}

xiongziliang committed
111 112
void RtmpProtocol::sendChunkSize(uint32_t size) {
    uint32_t len = htonl(size);
113 114
    std::string set_chunk((char *) &len, 4);
    sendRequest(MSG_SET_CHUNK, set_chunk);
xiongziliang committed
115
    _chunk_size_out = size;
xzl committed
116 117
}

xiongziliang committed
118 119
void RtmpProtocol::sendPingRequest(uint32_t stamp) {
    sendUserControl(CONTROL_PING_REQUEST, stamp);
xzl committed
120 121
}

xiongziliang committed
122 123
void RtmpProtocol::sendPingResponse(uint32_t time_stamp) {
    sendUserControl(CONTROL_PING_RESPONSE, time_stamp);
xzl committed
124 125
}

xiongziliang committed
126
void RtmpProtocol::sendSetBufferLength(uint32_t stream_index, uint32_t len) {
127
    std::string control;
xiongziliang committed
128 129 130 131 132
    stream_index = htonl(stream_index);
    control.append((char *) &stream_index, 4);

    len = htonl(len);
    control.append((char *) &len, 4);
133
    sendUserControl(CONTROL_SETBUFFER, control);
xzl committed
134 135
}

xiongziliang committed
136
void RtmpProtocol::sendUserControl(uint16_t event_type, uint32_t event_data) {
137
    std::string control;
xiongziliang committed
138 139 140 141 142
    event_type = htons(event_type);
    control.append((char *) &event_type, 2);

    event_data = htonl(event_data);
    control.append((char *) &event_data, 4);
143
    sendRequest(MSG_USER_CONTROL, control);
xzl committed
144 145
}

xiongziliang committed
146
void RtmpProtocol::sendUserControl(uint16_t event_type, const string &event_data) {
147
    std::string control;
xiongziliang committed
148 149 150
    event_type = htons(event_type);
    control.append((char *) &event_type, 2);
    control.append(event_data);
151
    sendRequest(MSG_USER_CONTROL, control);
xzl committed
152 153
}

xiongziliang committed
154 155 156
void RtmpProtocol::sendResponse(int type, const string &str) {
    if(!_data_started && (type == MSG_DATA)){
        _data_started =  true;
157
    }
xiongziliang committed
158
    sendRtmp(type, _now_stream_index, str, 0, _data_started ? CHUNK_CLIENT_REQUEST_AFTER : CHUNK_CLIENT_REQUEST_BEFORE);
xzl committed
159 160
}

xiongziliang committed
161
void RtmpProtocol::sendInvoke(const string &cmd, const AMFValue &val) {
162
    AMFEncoder enc;
xiongziliang committed
163
    enc << cmd << ++_send_req_id << val;
164
    sendRequest(MSG_CMD, enc.data());
xzl committed
165 166
}

xiongziliang committed
167 168
void RtmpProtocol::sendRequest(int cmd, const string& str) {
    sendRtmp(cmd, _stream_index, str, 0, CHUNK_SERVER_REQUEST);
xzl committed
169 170
}

171 172
class BufferPartial : public Buffer {
public:
173
    BufferPartial(const Buffer::Ptr &buffer, size_t offset, size_t size){
174 175 176 177 178
        _buffer = buffer;
        _data = buffer->data() + offset;
        _size = size;
    }

xiongziliang committed
179
    ~BufferPartial() override{}
180 181 182 183

    char *data() const override {
        return _data;
    }
xiongziliang committed
184

185
    size_t size() const override{
186 187
        return _size;
    }
xiongziliang committed
188

189 190
private:
    char *_data;
191
    size_t _size;
xiongziliang committed
192
    Buffer::Ptr _buffer;
193 194
};

xiongziliang committed
195 196
void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const std::string &buffer, uint32_t stamp, int chunk_id) {
    sendRtmp(type, stream_index, std::make_shared<BufferString>(buffer), stamp, chunk_id);
197
}
xzl committed
198

xiongziliang committed
199 200 201
void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::Ptr &buf, uint32_t stamp, int chunk_id){
    if (chunk_id < 2 || chunk_id > 63) {
        auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << chunk_id << endl;
202 203
        throw std::runtime_error(strErr);
    }
204
    //是否有扩展时间戳
xiongziliang committed
205
    bool ext_stamp = stamp >= 0xFFFFFF;
771730766@qq.com committed
206

207
    //rtmp头
xiongziliang committed
208 209 210
    BufferRaw::Ptr buffer_header = obtainBuffer();
    buffer_header->setCapacity(sizeof(RtmpHeader));
    buffer_header->setSize(sizeof(RtmpHeader));
211
    //对rtmp头赋值,如果使用整形赋值,在arm android上可能由于数据对齐导致总线错误的问题
xiongziliang committed
212 213 214 215
    RtmpHeader *header = (RtmpHeader *) buffer_header->data();
    header->flags = (chunk_id & 0x3f) | (0 << 6);
    header->type_id = type;
    set_be24(header->time_stamp, ext_stamp ? 0xFFFFFF : stamp);
216
    set_be24(header->body_size, (uint32_t)buf->size());
xiongziliang committed
217
    set_le32(header->stream_index, stream_index);
218
    //发送rtmp头
219
    onSendRawData(std::move(buffer_header));
220

221
    //扩展时间戳字段
xiongziliang committed
222 223
    BufferRaw::Ptr buffer_ext_stamp;
    if (ext_stamp) {
224
        //生成扩展时间戳
xiongziliang committed
225 226 227 228
        buffer_ext_stamp = obtainBuffer();
        buffer_ext_stamp->setCapacity(4);
        buffer_ext_stamp->setSize(4);
        set_be32(buffer_ext_stamp->data(), stamp);
229 230 231
    }

    //生成一个字节的flag,标明是什么chunkId
xiongziliang committed
232 233 234 235 236
    BufferRaw::Ptr buffer_flags = obtainBuffer();
    buffer_flags->setCapacity(1);
    buffer_flags->setSize(1);
    buffer_flags->data()[0] = (chunk_id & 0x3f) | (3 << 6);

237
    size_t offset = 0;
238
    size_t totalSize = sizeof(RtmpHeader);
239 240
    while (offset < buf->size()) {
        if (offset) {
xiongziliang committed
241
            onSendRawData(buffer_flags);
242 243
            totalSize += 1;
        }
xiongziliang committed
244
        if (ext_stamp) {
245
            //扩展时间戳
xiongziliang committed
246
            onSendRawData(buffer_ext_stamp);
247 248
            totalSize += 4;
        }
xiongziliang committed
249 250
        size_t chunk = min(_chunk_size_out, buf->size() - offset);
        onSendRawData(std::make_shared<BufferPartial>(buf, offset, chunk));
251
        totalSize += chunk;
252
        offset += chunk;
253
    }
254
    _bytes_sent += (uint32_t)totalSize;
xiongziliang committed
255 256 257
    if (_windows_size > 0 && _bytes_sent - _bytes_sent_last >= _windows_size) {
        _bytes_sent_last = _bytes_sent;
        sendAcknowledgement(_bytes_sent);
258
    }
xzl committed
259 260
}

261
void RtmpProtocol::onParseRtmp(const char *data, size_t size) {
262 263 264
    input(data, size);
}

265
const char *RtmpProtocol::onSearchPacketTail(const char *data,size_t len){
xiongziliang committed
266
    //移动拷贝提高性能
267
    auto next_step_func(std::move(_next_step_func));
xiongziliang committed
268
    //执行下一步
269
    auto ret = next_step_func(data, len);
xiongziliang committed
270 271 272 273
    if (!_next_step_func) {
        //为设置下一步,恢复之
        next_step_func.swap(_next_step_func);
    }
274
    return ret;
xzl committed
275 276 277
}

////for client////
xiongziliang committed
278
void RtmpProtocol::startClientSession(const function<void()> &func) {
279 280 281 282 283
    //发送 C0C1
    char handshake_head = HANDSHAKE_PLAINTEXT;
    onSendRawData(obtainBuffer(&handshake_head, 1));
    RtmpHandshake c1(0);
    onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1)));
284
    _next_step_func = [this, func](const char *data, size_t len) {
285
        //等待 S0+S1+S2
286
        return handle_S0S1S2(data, len, func);
287
    };
xzl committed
288
}
xiongziliang committed
289

290
const char* RtmpProtocol::handle_S0S1S2(const char *data, size_t len, const function<void()> &func) {
291
    if (len < 1 + 2 * C1_HANDSHARK_SIZE) {
292
        //数据不够
293
        return nullptr;
294
    }
295
    if (data[0] != HANDSHAKE_PLAINTEXT) {
296 297 298
        throw std::runtime_error("only plaintext[0x03] handshake supported");
    }
    //发送 C2
299
    const char *pcC2 = data + 1;
300 301
    onSendRawData(obtainBuffer(pcC2, C1_HANDSHARK_SIZE));
    //握手结束
302
    _next_step_func = [this](const char *data, size_t len) {
303
        //握手结束并且开始进入解析命令模式
304
        return handle_rtmp(data, len);
305
    };
xiongziliang committed
306
    func();
307
    return data + 1 + 2 * C1_HANDSHARK_SIZE;
xzl committed
308
}
xiongziliang committed
309

xzl committed
310
////for server ////
311
const char * RtmpProtocol::handle_C0C1(const char *data, size_t len) {
312
    if (len < 1 + C1_HANDSHARK_SIZE) {
313
        //need more data!
314
        return nullptr;
315
    }
316
    if (data[0] != HANDSHAKE_PLAINTEXT) {
317 318
        throw std::runtime_error("only plaintext[0x03] handshake supported");
    }
319
    if (memcmp(data + 5, "\x00\x00\x00\x00", 4) == 0) {
320
        //simple handsharke
321
        handle_C1_simple(data);
xiongziliang committed
322
    } else {
323
#ifdef ENABLE_OPENSSL
324
        //complex handsharke
325
        handle_C1_complex(data);
326
#else
327
        WarnL << "未打开ENABLE_OPENSSL宏,复杂握手采用简单方式处理,flash播放器可能无法播放!";
328
        handle_C1_simple(data);
329
#endif//ENABLE_OPENSSL
330
    }
331
    return data + 1 + C1_HANDSHARK_SIZE;
332
}
xiongziliang committed
333

334
void RtmpProtocol::handle_C1_simple(const char *data){
335 336 337 338 339 340 341
    //发送S0
    char handshake_head = HANDSHAKE_PLAINTEXT;
    onSendRawData(obtainBuffer(&handshake_head, 1));
    //发送S1
    RtmpHandshake s1(0);
    onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE));
    //发送S2
342
    onSendRawData(obtainBuffer(data + 1, C1_HANDSHARK_SIZE));
343
    //等待C2
344
    _next_step_func = [this](const char *data, size_t len) {
345 346
        //握手结束并且开始进入解析命令模式
        return handle_C2(data, len);
347
    };
xzl committed
348
}
xiongziliang committed
349

350
#ifdef ENABLE_OPENSSL
351
void RtmpProtocol::handle_C1_complex(const char *data){
352 353
    //参考自:http://blog.csdn.net/win_lin/article/details/13006803
    //skip c0,time,version
354
    const char *c1_start = data + 1;
355 356
    const char *schema_start = c1_start + 8;
    char *digest_start;
xiongziliang committed
357
    try {
358 359 360 361 362 363
        /* c1s1 schema0
        time: 4bytes
        version: 4bytes
        key: 764bytes
        digest: 764bytes
         */
xiongziliang committed
364 365 366 367
        auto digest = get_C1_digest((uint8_t *) schema_start + C1_SCHEMA_SIZE, &digest_start);
        string c1_joined(c1_start, C1_HANDSHARK_SIZE);
        c1_joined.erase(digest_start - c1_start, C1_DIGEST_SIZE);
        check_C1_Digest(digest, c1_joined);
368

xiongziliang committed
369
        send_complex_S0S1S2(0, digest);
xiongziliang committed
370
//		InfoL << "schema0";
371
    } catch (std::exception &) {
372
        //貌似flash从来都不用schema1
xiongziliang committed
373
//		WarnL << "try rtmp complex schema0 failed:" <<  ex.what();
xiongziliang committed
374
        try {
375 376 377 378 379 380
            /* c1s1 schema1
            time: 4bytes
            version: 4bytes
            digest: 764bytes
            key: 764bytes
             */
xiongziliang committed
381 382 383 384
            auto digest = get_C1_digest((uint8_t *) schema_start, &digest_start);
            string c1_joined(c1_start, C1_HANDSHARK_SIZE);
            c1_joined.erase(digest_start - c1_start, C1_DIGEST_SIZE);
            check_C1_Digest(digest, c1_joined);
385

xiongziliang committed
386
            send_complex_S0S1S2(1, digest);
xiongziliang committed
387
//			InfoL << "schema1";
388
        } catch (std::exception &) {
xiongziliang committed
389
//			WarnL << "try rtmp complex schema1 failed:" <<  ex.what();
390
            handle_C1_simple(data);
391 392
        }
    }
393 394
}

395 396 397
#if !defined(u_int8_t)
#define u_int8_t unsigned char
#endif // !defined(u_int8_t)
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420

static u_int8_t FMSKey[] = {
    0x47, 0x65, 0x6e, 0x75, 0x69, 0x6e, 0x65, 0x20,
    0x41, 0x64, 0x6f, 0x62, 0x65, 0x20, 0x46, 0x6c,
    0x61, 0x73, 0x68, 0x20, 0x4d, 0x65, 0x64, 0x69,
    0x61, 0x20, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72,
    0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Media Server 001
    0xf0, 0xee, 0xc2, 0x4a, 0x80, 0x68, 0xbe, 0xe8,
    0x2e, 0x00, 0xd0, 0xd1, 0x02, 0x9e, 0x7e, 0x57,
    0x6e, 0xec, 0x5d, 0x2d, 0x29, 0x80, 0x6f, 0xab,
    0x93, 0xb8, 0xe6, 0x36, 0xcf, 0xeb, 0x31, 0xae
}; // 68

static u_int8_t FPKey[] = {
    0x47, 0x65, 0x6E, 0x75, 0x69, 0x6E, 0x65, 0x20,
    0x41, 0x64, 0x6F, 0x62, 0x65, 0x20, 0x46, 0x6C,
    0x61, 0x73, 0x68, 0x20, 0x50, 0x6C, 0x61, 0x79,
    0x65, 0x72, 0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Player 001
    0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8,
    0x2E, 0x00, 0xD0, 0xD1, 0x02, 0x9E, 0x7E, 0x57,
    0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
    0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
}; // 62
xiongziliang committed
421

422
void RtmpProtocol::check_C1_Digest(const string &digest,const string &data){
xiongziliang committed
423 424
    auto sha256 = openssl_HMACsha256(FPKey, C1_FPKEY_SIZE, data.data(), data.size());
    if (sha256 != digest) {
425
        throw std::runtime_error("digest mismatched");
xiongziliang committed
426
    } else {
427 428
        InfoL << "check rtmp complex handshark success!";
    }
429
}
xiongziliang committed
430

431
string RtmpProtocol::get_C1_digest(const uint8_t *ptr,char **digestPos){
432 433 434 435 436 437 438
    /* 764bytes digest结构
    offset: 4bytes
    random-data: (offset)bytes
    digest-data: 32bytes
    random-data: (764-4-offset-32)bytes
     */
    int offset = 0;
xiongziliang committed
439
    for (int i = 0; i < C1_OFFSET_SIZE; ++i) {
440 441 442
        offset += ptr[i];
    }
    offset %= (C1_SCHEMA_SIZE - C1_DIGEST_SIZE - C1_OFFSET_SIZE);
xiongziliang committed
443 444
    *digestPos = (char *) ptr + C1_OFFSET_SIZE + offset;
    string digest(*digestPos, C1_DIGEST_SIZE);
445 446
    //DebugL << "digest offset:" << offset << ",digest:" << hexdump(digest.data(),digest.size());
    return digest;
447
}
xiongziliang committed
448

449
string RtmpProtocol::get_C1_key(const uint8_t *ptr){
450 451 452 453 454 455 456
    /* 764bytes key结构
    random-data: (offset)bytes
    key-data: 128bytes
    random-data: (764-offset-128-4)bytes
    offset: 4bytes
     */
    int offset = 0;
xiongziliang committed
457
    for (int i = C1_SCHEMA_SIZE - C1_OFFSET_SIZE; i < C1_SCHEMA_SIZE; ++i) {
458 459 460
        offset += ptr[i];
    }
    offset %= (C1_SCHEMA_SIZE - C1_KEY_SIZE - C1_OFFSET_SIZE);
xiongziliang committed
461
    string key((char *) ptr + offset, C1_KEY_SIZE);
462 463
    //DebugL << "key offset:" << offset << ",key:" << hexdump(key.data(),key.size());
    return key;
464
}
xiongziliang committed
465

466
void RtmpProtocol::send_complex_S0S1S2(int schemeType,const string &digest){
467 468 469 470 471 472
    //S1S2计算参考自:https://github.com/hitYangfei/golang/blob/master/rtmpserver.go
    //发送S0
    char handshake_head = HANDSHAKE_PLAINTEXT;
    onSendRawData(obtainBuffer(&handshake_head, 1));
    //S1
    RtmpHandshake s1(0);
xiongziliang committed
473
    memcpy(s1.zero, "\x04\x05\x00\x01", 4);
474
    char *digestPos;
xiongziliang committed
475
    if (schemeType == 0) {
476 477 478 479 480 481
        /* c1s1 schema0
        time: 4bytes
        version: 4bytes
        key: 764bytes
        digest: 764bytes
         */
xiongziliang committed
482 483
        get_C1_digest(s1.random + C1_SCHEMA_SIZE, &digestPos);
    } else {
484 485 486 487 488 489
        /* c1s1 schema1
        time: 4bytes
        version: 4bytes
        digest: 764bytes
        key: 764bytes
         */
xiongziliang committed
490
        get_C1_digest(s1.random, &digestPos);
491
    }
xiongziliang committed
492 493 494 495 496
    char *s1_start = (char *) &s1;
    string s1_joined(s1_start, sizeof(s1));
    s1_joined.erase(digestPos - s1_start, C1_DIGEST_SIZE);
    string s1_digest = openssl_HMACsha256(FMSKey, S1_FMS_KEY_SIZE, s1_joined.data(), s1_joined.size());
    memcpy(digestPos, s1_digest.data(), s1_digest.size());
497 498 499
    onSendRawData(obtainBuffer((char *) &s1, sizeof(s1)));

    //S2
xiongziliang committed
500
    string s2_key = openssl_HMACsha256(FMSKey, S2_FMS_KEY_SIZE, digest.data(), digest.size());
501
    RtmpHandshake s2(0);
xiongziliang committed
502 503 504 505
    s2.random_generate((char *) &s2, 8);
    string s2_digest = openssl_HMACsha256(s2_key.data(), s2_key.size(), &s2, sizeof(s2) - C1_DIGEST_SIZE);
    memcpy((char *) &s2 + C1_HANDSHARK_SIZE - C1_DIGEST_SIZE, s2_digest.data(), C1_DIGEST_SIZE);
    onSendRawData(obtainBuffer((char *) &s2, sizeof(s2)));
506
    //等待C2
507
    _next_step_func = [this](const char *data, size_t len) {
508
        return handle_C2(data, len);
509
    };
510
}
511
#endif //ENABLE_OPENSSL
xiongziliang committed
512

513
const char* RtmpProtocol::handle_C2(const char *data, size_t len) {
514
    if (len < C1_HANDSHARK_SIZE) {
515
        //need more data!
516
        return nullptr;
517
    }
518
    _next_step_func = [this](const char *data, size_t len) {
519
        return handle_rtmp(data, len);
520
    };
521 522 523

    //握手结束,进入命令模式
    return handle_rtmp(data + C1_HANDSHARK_SIZE, len - C1_HANDSHARK_SIZE);
xzl committed
524 525
}

xiongziliang committed
526 527
static const size_t HEADER_LENGTH[] = {12, 8, 4, 1};

528
const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) {
529 530
    auto ptr = data;
    while (len) {
xiongziliang committed
531
        int offset = 0;
532
        uint8_t flags = ptr[0];
xiongziliang committed
533 534 535
        size_t header_len = HEADER_LENGTH[flags >> 6];
        _now_chunk_id = flags & 0x3f;
        switch (_now_chunk_id) {
536 537 538
            case 0: {
                //0 值表示二字节形式,并且 ID 范围 64 - 319
                //(第二个字节 + 64)。
539
                if (len < 2) {
540
                    //need more data
541
                    return ptr;
542
                }
543
                _now_chunk_id = 64 + (uint8_t) (ptr[1]);
xiongziliang committed
544
                offset = 1;
545
                break;
xiongziliang committed
546 547
            }

548 549 550
            case 1: {
                //1 值表示三字节形式,并且 ID 范围为 64 - 65599
                //((第三个字节) * 256 + 第二个字节 + 64)。
551
                if (len < 3) {
552
                    //need more data
553
                    return ptr;
554
                }
555
                _now_chunk_id = 64 + ((uint8_t) (ptr[2]) << 8) + (uint8_t) (ptr[1]);
xiongziliang committed
556
                offset = 2;
557
                break;
xiongziliang committed
558 559 560 561
            }

            //带有 2 值的块流 ID 被保留,用于下层协议控制消息和命令。
            default : break;
562 563
        }

564
        if (len < header_len + offset) {
565
            //need more data
566
            return ptr;
567
        }
568
        RtmpHeader &header = *((RtmpHeader *) (ptr + offset));
xia-chu committed
569 570 571 572 573 574 575 576 577 578 579
        auto &pr = _map_chunk_data[_now_chunk_id];
        auto &now_packet = pr.first;
        auto &last_packet = pr.second;
        if (!now_packet) {
            now_packet = RtmpPacket::create();
            if (last_packet) {
                //恢复chunk上下文
                *now_packet = *last_packet;
            }
            //绝对时间戳标记复位
            now_packet->is_abs_stamp = false;
xia-chu committed
580
        }
xia-chu committed
581
        auto &chunk_data = *now_packet;
xiongziliang committed
582 583
        chunk_data.chunk_id = _now_chunk_id;
        switch (header_len) {
584
            case 12:
xiongziliang committed
585 586
                chunk_data.is_abs_stamp = true;
                chunk_data.stream_index = load_le32(header.stream_index);
587
            case 8:
xiongziliang committed
588 589
                chunk_data.body_size = load_be24(header.body_size);
                chunk_data.type_id = header.type_id;
590
            case 4:
xiongziliang committed
591
                chunk_data.ts_field = load_be24(header.time_stamp);
592
        }
593

xiongziliang committed
594 595
        auto time_stamp = chunk_data.ts_field;
        if (chunk_data.ts_field == 0xFFFFFF) {
596
            if (len < header_len + offset + 4) {
597
                //need more data
598
                return ptr;
599
            }
600
            time_stamp = load_be32(ptr + offset + header_len);
xiongziliang committed
601
            offset += 4;
602
        }
603

xiongziliang committed
604
        if (chunk_data.body_size < chunk_data.buffer.size()) {
605 606
            throw std::runtime_error("非法的bodySize");
        }
607

xia-chu committed
608
        auto more = min(_chunk_size_in, (size_t) (chunk_data.body_size - chunk_data.buffer.size()));
609
        if (len < header_len + offset + more) {
610
            //need more data
611
            return ptr;
612
        }
613 614 615
        if (more) {
            chunk_data.buffer.append(ptr + header_len + offset, more);
        }
616 617
        ptr += header_len + offset + more;
        len -= header_len + offset + more;
xiongziliang committed
618
        if (chunk_data.buffer.size() == chunk_data.body_size) {
xzl committed
619
            //frame is ready
xiongziliang committed
620 621
            _now_stream_index = chunk_data.stream_index;
            chunk_data.time_stamp = time_stamp + (chunk_data.is_abs_stamp ? 0 : chunk_data.time_stamp);
xia-chu committed
622 623
            //保存chunk上下文
            last_packet = now_packet;
xiongziliang committed
624
            if (chunk_data.body_size) {
xia-chu committed
625
                handle_chunk(std::move(now_packet));
xia-chu committed
626
            } else {
xia-chu committed
627
                now_packet = nullptr;
628 629 630
            }
        }
    }
631
    return ptr;
xzl committed
632 633
}

xia-chu committed
634 635
void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet) {
    auto &chunk_data = *packet;
xiongziliang committed
636
    switch (chunk_data.type_id) {
637
        case MSG_ACK: {
xiongziliang committed
638
            if (chunk_data.buffer.size() < 4) {
639 640
                throw std::runtime_error("MSG_ACK: Not enough data");
            }
xiongziliang committed
641
            //auto bytePeerRecv = load_be32(&chunk_data.buffer[0]);
642 643
            //TraceL << "MSG_ACK:" << bytePeerRecv;
            break;
xiongziliang committed
644 645
        }

646
        case MSG_SET_CHUNK: {
xiongziliang committed
647
            if (chunk_data.buffer.size() < 4) {
648 649
                throw std::runtime_error("MSG_SET_CHUNK :Not enough data");
            }
xiongziliang committed
650 651
            _chunk_size_in = load_be32(&chunk_data.buffer[0]);
            TraceL << "MSG_SET_CHUNK:" << _chunk_size_in;
652
            break;
xiongziliang committed
653 654
        }

655 656
        case MSG_USER_CONTROL: {
            //user control message
xiongziliang committed
657
            if (chunk_data.buffer.size() < 2) {
658 659
                throw std::runtime_error("MSG_USER_CONTROL: Not enough data.");
            }
xiongziliang committed
660 661
            uint16_t event_type = load_be16(&chunk_data.buffer[0]);
            chunk_data.buffer.erase(0, 2);
662
            switch (event_type) {
xiongziliang committed
663 664
                case CONTROL_PING_REQUEST: {
                    if (chunk_data.buffer.size() < 4) {
665 666
                        throw std::runtime_error("CONTROL_PING_REQUEST: Not enough data.");
                    }
xiongziliang committed
667 668
                    uint32_t timeStamp = load_be32(&chunk_data.buffer[0]);
                    //TraceL << "CONTROL_PING_REQUEST:" << time_stamp;
669
                    sendUserControl(CONTROL_PING_RESPONSE, timeStamp);
xiongziliang committed
670
                    break;
671
                }
xiongziliang committed
672 673 674 675 676 677 678

                case CONTROL_PING_RESPONSE: {
                    if (chunk_data.buffer.size() < 4) {
                        throw std::runtime_error("CONTROL_PING_RESPONSE: Not enough data.");
                    }
                    //uint32_t time_stamp = load_be32(&chunk_data.buffer[0]);
                    //TraceL << "CONTROL_PING_RESPONSE:" << time_stamp;
679 680
                    break;
                }
xiongziliang committed
681 682 683 684 685 686 687 688 689 690

                case CONTROL_STREAM_BEGIN: {
                    //开始播放
                    if (chunk_data.buffer.size() < 4) {
                        throw std::runtime_error("CONTROL_STREAM_BEGIN: Not enough data.");
                    }
                    uint32_t stream_index = load_be32(&chunk_data.buffer[0]);
                    onStreamBegin(stream_index);
                    TraceL << "CONTROL_STREAM_BEGIN:" << stream_index;
                    break;
691 692
                }

xiongziliang committed
693 694 695 696 697 698 699 700 701
                case CONTROL_STREAM_EOF: {
                    //暂停
                    if (chunk_data.buffer.size() < 4) {
                        throw std::runtime_error("CONTROL_STREAM_EOF: Not enough data.");
                    }
                    uint32_t stream_index = load_be32(&chunk_data.buffer[0]);
                    onStreamEof(stream_index);
                    TraceL << "CONTROL_STREAM_EOF:" << stream_index;
                    break;
702
                }
xiongziliang committed
703 704 705 706 707 708 709 710 711 712

                case CONTROL_STREAM_DRY: {
                    //停止播放
                    if (chunk_data.buffer.size() < 4) {
                        throw std::runtime_error("CONTROL_STREAM_DRY: Not enough data.");
                    }
                    uint32_t stream_index = load_be32(&chunk_data.buffer[0]);
                    onStreamDry(stream_index);
                    TraceL << "CONTROL_STREAM_DRY:" << stream_index;
                    break;
713
                }
xiongziliang committed
714 715

                default: /*WarnL << "unhandled user control:" << event_type; */ break;
716 717
            }
            break;
xiongziliang committed
718
        }
719 720

        case MSG_WIN_SIZE: {
xiongziliang committed
721 722
            _windows_size = load_be32(&chunk_data.buffer[0]);
            TraceL << "MSG_WIN_SIZE:" << _windows_size;
723 724
            break;
        }
xiongziliang committed
725 726 727 728 729

        case MSG_SET_PEER_BW: {
            _bandwidth = load_be32(&chunk_data.buffer[0]);
            _band_limit_type =  chunk_data.buffer[4];
            TraceL << "MSG_SET_PEER_BW:" << _windows_size;
730
            break;
xiongziliang committed
731 732
        }

733
        case MSG_AGGREGATE: {
xiongziliang committed
734
            auto ptr = (uint8_t *) chunk_data.buffer.data();
735
            auto ptr_tail = ptr + chunk_data.buffer.size();
xiongziliang committed
736
            while (ptr + 8 + 3 < ptr_tail) {
737 738 739 740 741 742 743 744 745 746 747
                auto type = *ptr;
                ptr += 1;
                auto size = load_be24(ptr);
                ptr += 3;
                auto ts = load_be24(ptr);
                ptr += 3;
                ts |= (*ptr << 24);
                ptr += 1;
                ptr += 3;
                //参考FFmpeg多拷贝了4个字节
                size += 4;
xiongziliang committed
748
                if (ptr + size > ptr_tail) {
749 750
                    break;
                }
xia-chu committed
751 752
                auto sub_packet_ptr = RtmpPacket::create();
                auto &sub_packet = *sub_packet_ptr;
753
                sub_packet.buffer.assign((char *)ptr, size);
xiongziliang committed
754 755 756 757 758
                sub_packet.type_id = type;
                sub_packet.body_size = size;
                sub_packet.time_stamp = ts;
                sub_packet.stream_index = chunk_data.stream_index;
                sub_packet.chunk_id = chunk_data.chunk_id;
xia-chu committed
759
                handle_chunk(std::move(sub_packet_ptr));
760 761 762 763
                ptr += size;
            }
            break;
        }
xiongziliang committed
764

xia-chu committed
765
        default: onRtmpChunk(std::move(packet)); break;
xiongziliang committed
766
    }
xzl committed
767 768
}

769
BufferRaw::Ptr RtmpProtocol::obtainBuffer(const void *data, size_t len) {
770 771 772 773
    auto buffer = BufferRaw::create();
    if (data && len) {
        buffer->assign((const char *) data, len);
    }
774
    return buffer;
775 776
}

xiongziliang committed
777
} /* namespace mediakit */