Coverage Report

Created: 2024-09-11 06:42

/src/brpc/src/brpc/rtmp.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#include <gflags/gflags.h>
20
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> // StringOutputStream
21
#include "bthread/bthread.h"                      // bthread_id_xx
22
#include "bthread/unstable.h"                     // bthread_timer_del
23
#include "brpc/log.h"
24
#include "brpc/callback.h"                   // Closure
25
#include "brpc/channel.h"                    // Channel
26
#include "brpc/socket_map.h"                 // SocketMap
27
#include "brpc/socket.h"                     // Socket
28
#include "brpc/policy/rtmp_protocol.h"       // policy::*
29
#include "brpc/rtmp.h"
30
#include "brpc/details/rtmp_utils.h"
31
32
33
namespace brpc {
34
35
DEFINE_bool(rtmp_server_close_connection_on_error, true,
36
            "Close the client connection on play/publish errors, clients setting"
37
            " RtmpConnectRequest.stream_multiplexing to true are not affected"
38
            " by this flag");
39
40
struct RtmpBvars {
41
    bvar::Adder<int> client_count;
42
    bvar::Adder<int> client_stream_count;
43
    bvar::Adder<int> retrying_client_stream_count;
44
    bvar::Adder<int> server_stream_count;
45
46
    RtmpBvars()
47
        : client_count("rtmp_client_count")
48
        , client_stream_count("rtmp_client_stream_count")
49
        , retrying_client_stream_count("rtmp_retrying_client_stream_count")
50
0
        , server_stream_count("rtmp_server_stream_count") {
51
0
    }
52
};
53
0
inline RtmpBvars* get_rtmp_bvars() {
54
0
    return butil::get_leaky_singleton<RtmpBvars>();
55
0
}
56
57
namespace policy {
58
int SendC0C1(int fd, bool* is_simple_handshake);
59
int WriteWithoutOvercrowded(Socket*, SocketMessagePtr<>& msg);
60
}
61
62
FlvWriter::FlvWriter(butil::IOBuf* buf)
63
0
    : _write_header(false), _buf(buf), _options() {
64
0
}
65
66
FlvWriter::FlvWriter(butil::IOBuf* buf, const FlvWriterOptions& options)
67
0
    : _write_header(false), _buf(buf), _options(options) {
68
0
}
69
70
0
butil::Status FlvWriter::Write(const RtmpVideoMessage& msg) {
71
0
    char buf[32];
72
0
    char* p = buf;
73
0
    if (!_write_header) {
74
0
        _write_header = true;
75
0
        const char flags_bit = static_cast<char>(_options.flv_content_type);
76
0
        const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
77
0
        memcpy(p, header, sizeof(header));
78
0
        p += sizeof(header);
79
0
        policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
80
0
    }
81
    // FLV tag
82
0
    *p++ = FLV_TAG_VIDEO;
83
0
    policy::WriteBigEndian3Bytes(&p, msg.size());
84
0
    policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF));
85
0
    *p++ = (msg.timestamp >> 24) & 0xFF;
86
0
    policy::WriteBigEndian3Bytes(&p, 0); // StreamID
87
    // header of VIDEODATA
88
0
    *p++ = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF);
89
0
    _buf->append(buf, p - buf);
90
0
    _buf->append(msg.data);
91
    // PreviousTagSize
92
0
    p = buf;
93
0
    policy::WriteBigEndian4Bytes(&p, 11 + msg.size());
94
0
    _buf->append(buf, p - buf);
95
0
    return butil::Status::OK();
96
0
}
97
98
0
butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) {
99
0
    char buf[32];
100
0
    char* p = buf;
101
0
    if (!_write_header) {
102
0
        _write_header = true;
103
0
        const char flags_bit = static_cast<char>(_options.flv_content_type);
104
0
        const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
105
0
        memcpy(p, header, sizeof(header));
106
0
        p += sizeof(header);
107
0
        policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
108
0
    }
109
    // FLV tag
110
0
    *p++ = FLV_TAG_AUDIO;
111
0
    policy::WriteBigEndian3Bytes(&p, msg.size());
112
0
    policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF));
113
0
    *p++ = (msg.timestamp >> 24) & 0xFF;
114
0
    policy::WriteBigEndian3Bytes(&p, 0); // StreamID
115
    // header of AUDIODATA
116
0
    *p++ = ((msg.codec & 0xF) << 4)
117
0
        | ((msg.rate & 0x3) << 2)
118
0
        | ((msg.bits & 0x1) << 1)
119
0
        | (msg.type & 0x1);
120
0
    _buf->append(buf, p - buf);
121
0
    _buf->append(msg.data);
122
    // PreviousTagSize
123
0
    p = buf;
124
0
    policy::WriteBigEndian4Bytes(&p, 11 + msg.size());
125
0
    _buf->append(buf, p - buf);
126
0
    return butil::Status::OK();
127
0
}
128
129
0
butil::Status FlvWriter::WriteScriptData(const butil::IOBuf& req_buf, uint32_t timestamp) {
130
0
    char buf[32];
131
0
    char* p = buf;
132
0
    if (!_write_header) {
133
0
        _write_header = true;
134
0
        const char flags_bit = static_cast<char>(_options.flv_content_type);
135
0
        const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
136
0
        memcpy(p, header, sizeof(header));
137
0
        p += sizeof(header);
138
0
        policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
139
0
    }
140
    // FLV tag
141
0
    *p++ = FLV_TAG_SCRIPT_DATA;
142
0
    policy::WriteBigEndian3Bytes(&p, req_buf.size());
143
0
    policy::WriteBigEndian3Bytes(&p, (timestamp & 0xFFFFFF));
144
0
    *p++ = (timestamp >> 24) & 0xFF;
145
0
    policy::WriteBigEndian3Bytes(&p, 0); // StreamID
146
0
    _buf->append(buf, p - buf);
147
0
    _buf->append(req_buf);
148
    // PreviousTagSize
149
0
    p = buf;
150
0
    policy::WriteBigEndian4Bytes(&p, 11 + req_buf.size());
151
0
    _buf->append(buf, p - buf);
152
0
    return butil::Status::OK();
153
0
}
154
155
0
butil::Status FlvWriter::Write(const RtmpCuePoint& cuepoint) {
156
0
    butil::IOBuf req_buf;
157
0
    {
158
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
159
0
        AMFOutputStream ostream(&zc_stream);
160
0
        WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
161
0
        WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
162
0
        WriteAMFObject(cuepoint.data, &ostream);
163
0
        if (!ostream.good()) {
164
0
            return butil::Status(EINVAL, "Fail to serialize cuepoint");
165
0
        }
166
0
    }
167
0
    return WriteScriptData(req_buf, cuepoint.timestamp);
168
0
}
169
170
0
butil::Status FlvWriter::Write(const RtmpMetaData& metadata) {
171
0
    butil::IOBuf req_buf;
172
0
    {
173
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
174
0
        AMFOutputStream ostream(&zc_stream);
175
0
        WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream);
176
0
        WriteAMFObject(metadata.data, &ostream);
177
0
        if (!ostream.good()) {
178
0
            return butil::Status(EINVAL, "Fail to serialize metadata");
179
0
        }
180
0
    }
181
0
    return WriteScriptData(req_buf, metadata.timestamp);
182
0
}
183
184
FlvReader::FlvReader(butil::IOBuf* buf)
185
0
    : _read_header(false), _buf(buf) {
186
0
}
187
188
0
butil::Status FlvReader::ReadHeader() {
189
0
    if (!_read_header) {
190
        // 9 is the size of FlvHeader, which is usually composed of
191
        // { 'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 0x09 }.
192
0
        char header_buf[9 + 4/* PreviousTagSize0 */];
193
0
        const char* p = (const char*)_buf->fetch(header_buf, sizeof(header_buf));
194
0
        if (p == NULL) {
195
0
            return butil::Status(EAGAIN, "Fail to read, not enough data");
196
0
        }
197
0
        const char flv_header_signature[3] = { 'F', 'L', 'V' };
198
0
        if (memcmp(p, flv_header_signature, sizeof(flv_header_signature)) != 0) {
199
0
            LOG(FATAL) << "Fail to parse FLV header";
200
0
            return butil::Status(EINVAL, "Fail to parse FLV header");
201
0
        }
202
0
        _buf->pop_front(sizeof(header_buf));
203
0
        _read_header = true;
204
0
    }
205
0
    return butil::Status::OK();
206
0
}
207
208
0
butil::Status FlvReader::PeekMessageType(FlvTagType* type_out) {
209
0
    butil::Status st = ReadHeader();
210
0
    if (!st.ok()) {
211
0
        return st;
212
0
    }
213
0
    const char* p = (const char*)_buf->fetch1();
214
0
    if (p == NULL) {
215
0
        return butil::Status(EAGAIN, "Fail to read, not enough data");
216
0
    }
217
0
    FlvTagType type = (FlvTagType)*p;
218
0
    if (type != FLV_TAG_AUDIO && type != FLV_TAG_VIDEO &&
219
0
        type != FLV_TAG_SCRIPT_DATA) {
220
0
        return butil::Status(EINVAL, "Fail to parse FLV tag");
221
0
    }
222
0
    if (type_out) {
223
0
        *type_out = type;
224
0
    }
225
0
    return butil::Status::OK();
226
0
}
227
228
0
butil::Status FlvReader::Read(RtmpVideoMessage* msg) {
229
0
    char tags[11];
230
0
    const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
231
0
    if (p == NULL) {
232
0
        return butil::Status(EAGAIN, "Fail to read, not enough data");
233
0
    }
234
0
    if (*p != FLV_TAG_VIDEO) {
235
0
        return butil::Status(EINVAL, "Fail to parse RtmpVideoMessage");
236
0
    }
237
0
    uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
238
0
    uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
239
0
    timestamp |= (*(p + 7) << 24);
240
0
    if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
241
0
        return butil::Status(EAGAIN, "Fail to read, not enough data");
242
0
    }
243
0
    _buf->pop_front(11);
244
0
    char first_byte = 0;
245
0
    CHECK(_buf->cut1(&first_byte));
246
0
    msg->timestamp = timestamp;
247
0
    msg->frame_type = (FlvVideoFrameType)((first_byte >> 4) & 0xF);
248
0
    msg->codec = (FlvVideoCodec)(first_byte & 0xF);
249
    // TODO(zhujiashun): check the validation of frame_type and codec
250
0
    _buf->cutn(&msg->data, msg_size - 1);
251
0
    _buf->pop_front(4/* PreviousTagSize0 */);
252
253
0
    return butil::Status::OK();
254
0
}
255
256
0
butil::Status FlvReader::Read(RtmpAudioMessage* msg) {
257
0
    char tags[11];
258
0
    const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
259
0
    if (p == NULL) {
260
0
        return butil::Status(EAGAIN, "Fail to read, not enough data");
261
0
    }
262
0
    if (*p != FLV_TAG_AUDIO) {
263
0
        return butil::Status(EINVAL, "Fail to parse RtmpAudioMessage");
264
0
    }
265
0
    uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
266
0
    uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
267
0
    timestamp |= (*(p + 7) << 24);
268
0
    if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
269
0
        return butil::Status(EAGAIN, "Fail to read, not enough data");
270
0
    }
271
0
    _buf->pop_front(11);
272
0
    char first_byte = 0;
273
0
    CHECK(_buf->cut1(&first_byte));
274
0
    msg->timestamp = timestamp;
275
0
    msg->codec = (FlvAudioCodec)((first_byte >> 4) & 0xF);
276
0
    msg->rate = (FlvSoundRate)((first_byte >> 2) & 0x3);
277
0
    msg->bits = (FlvSoundBits)((first_byte >> 1) & 0x1);
278
0
    msg->type = (FlvSoundType)(first_byte & 0x1);
279
0
    _buf->cutn(&msg->data, msg_size - 1);
280
0
    _buf->pop_front(4/* PreviousTagSize0 */);
281
282
0
    return butil::Status::OK();
283
0
}
284
285
0
butil::Status FlvReader::Read(RtmpMetaData* msg, std::string* name) {
286
0
    char tags[11];
287
0
    const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
288
0
    if (p == NULL) {
289
0
        return butil::Status(EAGAIN, "Fail to read, not enough data");
290
0
    }
291
0
    if (*p != FLV_TAG_SCRIPT_DATA) {
292
0
        return butil::Status(EINVAL, "Fail to parse RtmpScriptMessage");
293
0
    }
294
0
    uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
295
0
    uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
296
0
    timestamp |= (*(p + 7) << 24);
297
0
    if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
298
0
        return butil::Status(EAGAIN, "Fail to read, not enough data");
299
0
    }
300
0
    _buf->pop_front(11);
301
0
    butil::IOBuf req_buf;
302
0
    _buf->cutn(&req_buf, msg_size);
303
0
    _buf->pop_front(4/* PreviousTagSize0 */);
304
0
    {
305
0
        butil::IOBufAsZeroCopyInputStream zc_stream(req_buf);
306
0
        AMFInputStream istream(&zc_stream);
307
0
        if (!ReadAMFString(name, &istream)) {
308
0
            return butil::Status(EINVAL, "Fail to read AMF string");
309
0
        }
310
0
        if (!ReadAMFObject(&msg->data, &istream)) {
311
0
            return butil::Status(EINVAL, "Fail to read AMF object");
312
0
        }
313
0
    }
314
0
    msg->timestamp = timestamp;
315
0
    return butil::Status::OK();
316
0
}
317
318
0
const char* FlvVideoFrameType2Str(FlvVideoFrameType t) {
319
0
    switch (t) {
320
0
    case FLV_VIDEO_FRAME_KEYFRAME:              return "keyframe";
321
0
    case FLV_VIDEO_FRAME_INTERFRAME:            return "interframe";
322
0
    case FLV_VIDEO_FRAME_DISPOSABLE_INTERFRAME: return "disposable interframe";
323
0
    case FLV_VIDEO_FRAME_GENERATED_KEYFRAME:    return "generated keyframe";
324
0
    case FLV_VIDEO_FRAME_INFOFRAME:             return "info/command frame";
325
0
    } 
326
0
    return "Unknown FlvVideoFrameType";
327
0
}
328
329
0
const char* FlvVideoCodec2Str(FlvVideoCodec id) {
330
0
    switch (id) {
331
0
    case FLV_VIDEO_JPEG:            return "JPEG";
332
0
    case FLV_VIDEO_SORENSON_H263:   return "Sorenson H.263";
333
0
    case FLV_VIDEO_SCREEN_VIDEO:    return "Screen video";
334
0
    case FLV_VIDEO_ON2_VP6:         return "On2 VP6";
335
0
    case FLV_VIDEO_ON2_VP6_WITH_ALPHA_CHANNEL:
336
0
        return "On2 VP6 with alpha channel";
337
0
    case FLV_VIDEO_SCREEN_VIDEO_V2: return "Screen video version 2";
338
0
    case FLV_VIDEO_AVC:             return "AVC";
339
0
    case FLV_VIDEO_HEVC:            return "H.265";
340
0
    }
341
0
    return "Unknown FlvVideoCodec";
342
0
}
343
344
0
const char* FlvAudioCodec2Str(FlvAudioCodec codec) {
345
0
    switch (codec) {
346
0
    case FLV_AUDIO_LINEAR_PCM_PLATFORM_ENDIAN:
347
0
        return "Linear PCM, platform endian";
348
0
    case FLV_AUDIO_ADPCM: return "ADPCM";
349
0
    case FLV_AUDIO_MP3: return "MP3";
350
0
    case FLV_AUDIO_LINEAR_PCM_LITTLE_ENDIAN:
351
0
        return "Linear PCM, little endian";
352
0
    case FLV_AUDIO_NELLYMOSER_16KHZ_MONO:
353
0
        return "Nellymoser 16-kHz mono";
354
0
    case FLV_AUDIO_NELLYMOSER_8KHZ_MONO:
355
0
        return "Nellymoser 8-kHz mono";
356
0
    case FLV_AUDIO_NELLYMOSER:
357
0
        return "Nellymoser";
358
0
    case FLV_AUDIO_G711_ALAW_LOGARITHMIC_PCM:
359
0
        return "G.711 A-law logarithmic PCM";
360
0
    case FLV_AUDIO_G711_MULAW_LOGARITHMIC_PCM:
361
0
        return "G.711 mu-law logarithmic PCM";
362
0
    case FLV_AUDIO_RESERVED:
363
0
        return "reserved";
364
0
    case FLV_AUDIO_AAC: return "AAC";
365
0
    case FLV_AUDIO_SPEEX: return "Speex";
366
0
    case FLV_AUDIO_MP3_8KHZ: return "MP3 8-Khz";
367
0
    case FLV_AUDIO_DEVICE_SPECIFIC_SOUND:
368
0
        return "Device-specific sound";
369
0
    }
370
0
    return "Unknown FlvAudioCodec";
371
0
}
372
373
0
const char* FlvSoundRate2Str(FlvSoundRate rate) {
374
0
    switch (rate) {
375
0
    case FLV_SOUND_RATE_5512HZ:  return "5512";
376
0
    case FLV_SOUND_RATE_11025HZ: return "11025";
377
0
    case FLV_SOUND_RATE_22050HZ: return "22050";
378
0
    case FLV_SOUND_RATE_44100HZ: return "44100";
379
0
    }
380
0
    return "Unknown FlvSoundRate";
381
0
}
382
383
0
const char* FlvSoundBits2Str(FlvSoundBits size) {
384
0
    switch (size) {
385
0
    case FLV_SOUND_8BIT:  return "8";
386
0
    case FLV_SOUND_16BIT: return "16";
387
0
    }
388
0
    return "Unknown FlvSoundBits";
389
0
}
390
391
0
const char* FlvSoundType2Str(FlvSoundType t) {
392
0
    switch (t) {
393
0
    case FLV_SOUND_MONO:   return "mono";
394
0
    case FLV_SOUND_STEREO: return "stereo";
395
0
    }
396
0
    return "Unknown FlvSoundType";
397
0
}
398
399
0
std::ostream& operator<<(std::ostream& os, const RtmpAudioMessage& msg) {
400
0
    return os << "AudioMessage{timestamp=" << msg.timestamp
401
0
              << " codec=" << FlvAudioCodec2Str(msg.codec)
402
0
              << " rate=" << FlvSoundRate2Str(msg.rate)
403
0
              << " bits=" << FlvSoundBits2Str(msg.bits)
404
0
              << " type=" << FlvSoundType2Str(msg.type)
405
0
              << " data=" << butil::ToPrintable(msg.data) << '}';
406
0
}
407
408
0
std::ostream& operator<<(std::ostream& os, const RtmpVideoMessage& msg) {
409
0
    return os << "VideoMessage{timestamp=" << msg.timestamp
410
0
              << " type=" << FlvVideoFrameType2Str(msg.frame_type)
411
0
              << " codec=" << FlvVideoCodec2Str(msg.codec)
412
0
              << " data=" << butil::ToPrintable(msg.data) << '}';
413
0
}
414
415
0
butil::Status RtmpAACMessage::Create(const RtmpAudioMessage& msg) {
416
0
    if (msg.codec != FLV_AUDIO_AAC) {
417
0
        return butil::Status(EINVAL, "codec=%s is not AAC",
418
0
                            FlvAudioCodec2Str(msg.codec));
419
0
    }
420
0
    const uint8_t* p = (const uint8_t*)msg.data.fetch1();
421
0
    if (p == NULL) {
422
0
        return butil::Status(EINVAL, "Not enough data in AudioMessage");
423
0
    }
424
0
    if (*p > FLV_AAC_PACKET_RAW) {
425
0
        return butil::Status(EINVAL, "Invalid AAC packet_type=%d", (int)*p);
426
0
    }
427
0
    this->timestamp = msg.timestamp;
428
0
    this->rate = msg.rate;
429
0
    this->bits = msg.bits;
430
0
    this->type = msg.type;
431
0
    this->packet_type = (FlvAACPacketType)*p;
432
0
    msg.data.append_to(&data, msg.data.size() - 1, 1);
433
0
    return butil::Status::OK();
434
0
}
435
436
AudioSpecificConfig::AudioSpecificConfig()
437
    : aac_object(AAC_OBJECT_UNKNOWN)
438
    , aac_sample_rate(0)
439
0
    , aac_channels(0) {
440
0
}
441
442
0
butil::Status AudioSpecificConfig::Create(const butil::IOBuf& buf) {
443
0
    if (buf.size() < 2u) {
444
0
        return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short",
445
0
                             (uint64_t)buf.size());
446
0
    }
447
0
    char tmpbuf[2];
448
0
    buf.copy_to(tmpbuf, arraysize(tmpbuf));
449
0
    return Create(tmpbuf, arraysize(tmpbuf));
450
0
}
451
452
0
butil::Status AudioSpecificConfig::Create(const void* data, size_t len) {
453
0
    if (len < 2u) {
454
0
        return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short", (uint64_t)len);
455
0
    }
456
0
    uint8_t profile_ObjectType = ((const char*)data)[0];
457
0
    uint8_t samplingFrequencyIndex = ((const char*)data)[1];
458
0
    aac_channels = (samplingFrequencyIndex >> 3) & 0x0f;
459
0
    aac_sample_rate = ((profile_ObjectType << 1) & 0x0e) | ((samplingFrequencyIndex >> 7) & 0x01);
460
0
    aac_object = (AACObjectType)((profile_ObjectType >> 3) & 0x1f);
461
0
    if (aac_object == AAC_OBJECT_UNKNOWN) {
462
0
        return butil::Status(EINVAL, "Invalid object type");
463
0
    }
464
0
    return butil::Status::OK();
465
0
}
466
467
0
bool RtmpAudioMessage::IsAACSequenceHeader() const {
468
0
    if (codec != FLV_AUDIO_AAC) {
469
0
        return false;
470
0
    }
471
0
    const uint8_t* p = (const uint8_t*)data.fetch1();
472
0
    if (p == NULL) {
473
0
        return false;
474
0
    }
475
0
    return *p == FLV_AAC_PACKET_SEQUENCE_HEADER;
476
0
}
477
478
0
butil::Status RtmpAVCMessage::Create(const RtmpVideoMessage& msg) {
479
0
    if (msg.codec != FLV_VIDEO_AVC) {
480
0
        return butil::Status(EINVAL, "codec=%s is not AVC",
481
0
                            FlvVideoCodec2Str(msg.codec));
482
0
    }
483
0
    uint8_t buf[4];
484
0
    const uint8_t* p = (const uint8_t*)msg.data.fetch(buf, sizeof(buf));
485
0
    if (p == NULL) {
486
0
        return butil::Status(EINVAL, "Not enough data in VideoMessage");
487
0
    }
488
0
    if (*p > FLV_AVC_PACKET_END_OF_SEQUENCE) {
489
0
        return butil::Status(EINVAL, "Invalid AVC packet_type=%d", (int)*p);
490
0
    }
491
0
    this->timestamp = msg.timestamp;
492
0
    this->frame_type = msg.frame_type;
493
0
    this->packet_type = (FlvAVCPacketType)*p;
494
0
    this->composition_time = policy::ReadBigEndian3Bytes(p + 1);
495
0
    msg.data.append_to(&data, msg.data.size() - 4, 4);
496
0
    return butil::Status::OK();
497
0
}
498
499
0
bool RtmpVideoMessage::IsAVCSequenceHeader() const {
500
0
    if (codec != FLV_VIDEO_AVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) {
501
0
        return false;
502
0
    }
503
0
    const uint8_t* p = (const uint8_t*)data.fetch1();
504
0
    if (p == NULL) {
505
0
        return false;
506
0
    }
507
0
    return *p == FLV_AVC_PACKET_SEQUENCE_HEADER;
508
0
}
509
510
0
bool RtmpVideoMessage::IsHEVCSequenceHeader() const {
511
0
    if (codec != FLV_VIDEO_HEVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) {
512
0
        return false;
513
0
    }
514
0
    const uint8_t* p = (const uint8_t*)data.fetch1();
515
0
    if (p == NULL) {
516
0
        return false;
517
0
    }
518
0
    return *p == FLV_AVC_PACKET_SEQUENCE_HEADER;
519
0
}
520
521
0
const char* AVCProfile2Str(AVCProfile p) {
522
0
    switch (p) {
523
0
    case AVC_PROFILE_BASELINE: return "Baseline";
524
0
    case AVC_PROFILE_CONSTRAINED_BASELINE: return "ConstrainedBaseline";
525
0
    case AVC_PROFILE_MAIN: return "Main";
526
0
    case AVC_PROFILE_EXTENDED: return "Extended";
527
0
    case AVC_PROFILE_HIGH: return "High";
528
0
    case AVC_PROFILE_HIGH10: return "High10";
529
0
    case AVC_PROFILE_HIGH10_INTRA: return "High10Intra";
530
0
    case AVC_PROFILE_HIGH422: return "High422";
531
0
    case AVC_PROFILE_HIGH422_INTRA: return "High422Intra";
532
0
    case AVC_PROFILE_HIGH444: return "High444";
533
0
    case AVC_PROFILE_HIGH444_PREDICTIVE: return "High444Predictive";
534
0
    case AVC_PROFILE_HIGH444_INTRA: return "High444Intra";
535
0
    }
536
0
    return "Unknown";
537
0
}
538
539
AVCDecoderConfigurationRecord::AVCDecoderConfigurationRecord()
540
    : width(0)
541
    , height(0)
542
    , avc_profile((AVCProfile)0)
543
    , avc_level((AVCLevel)0)
544
0
    , length_size_minus1(-1) {
545
0
}
546
547
std::ostream& operator<<(std::ostream& os,
548
0
                         const AVCDecoderConfigurationRecord& r) {
549
0
    os << "{profile=" << AVCProfile2Str(r.avc_profile)
550
0
       << " level=" << (int)r.avc_level
551
0
       << " length_size_minus1=" << (int)r.length_size_minus1
552
0
       << " width=" << r.width
553
0
       << " height=" << r.height
554
0
       << " sps=[";
555
0
    for (size_t i = 0; i < r.sps_list.size(); ++i) {
556
0
        if (i) {
557
0
            os << ' ';
558
0
        }
559
0
        os << r.sps_list[i].size();
560
0
    }
561
0
    os << "] pps=[";
562
0
    for (size_t i = 0; i < r.pps_list.size(); ++i) {
563
0
        if (i) {
564
0
            os << ' ';
565
0
        }
566
0
        os << r.pps_list[i].size();
567
0
    }
568
0
    os << "]}";
569
0
    return os;
570
0
}
571
572
0
butil::Status AVCDecoderConfigurationRecord::Create(const butil::IOBuf& buf) {
573
    // the buf should be short generally, copy it out to continuous memory
574
    // to simplify parsing.
575
0
    DEFINE_SMALL_ARRAY(char, cont_buf, buf.size(), 64);
576
0
    buf.copy_to(cont_buf, buf.size());
577
0
    return Create(cont_buf, buf.size());
578
0
}
579
580
0
butil::Status AVCDecoderConfigurationRecord::Create(const void* data, size_t len) {
581
0
    butil::StringPiece buf((const char*)data, len);
582
0
    if (buf.size() < 6) {
583
0
        return butil::Status(EINVAL, "Length=%lu is not long enough",
584
0
                            (unsigned long)buf.size());
585
0
    }
586
    // skip configurationVersion at buf[0]
587
0
    avc_profile = (AVCProfile)buf[1];
588
    // skip profile_compatibility at buf[2]
589
0
    avc_level = (AVCLevel)buf[3];
590
    
591
    // 5.3.4.2.1 Syntax, H.264-AVC-ISO_IEC_14496-15.pdf, page 16
592
    // 5.2.4.1 AVC decoder configuration record
593
    // 5.2.4.1.2 Semantics
594
    // The value of this field shall be one of 0, 1, or 3 corresponding to a
595
    // length encoded with 1, 2, or 4 bytes, respectively.
596
0
    length_size_minus1 = buf[4] & 0x03;
597
0
    if (length_size_minus1 == 2) {
598
0
        return butil::Status(EINVAL, "lengthSizeMinusOne should never be 2");
599
0
    }
600
601
    // Parsing SPS
602
0
    const int num_sps = (int)(buf[5] & 0x1f);
603
0
    buf.remove_prefix(6);
604
0
    sps_list.clear();
605
0
    sps_list.reserve(num_sps);
606
0
    for (int i = 0; i < num_sps; ++i) {
607
0
        if (buf.size() < 2) {
608
0
            return butil::Status(EINVAL, "Not enough data to decode SPS-length");
609
0
        }
610
0
        const uint16_t sps_length = policy::ReadBigEndian2Bytes(buf.data());
611
0
        if (buf.size() < 2u + sps_length) {
612
0
            return butil::Status(EINVAL, "Not enough data to decode SPS");
613
0
        }
614
0
        if (sps_length > 0) {
615
0
            butil::Status st = ParseSPS(buf.data() + 2, sps_length);
616
0
            if (!st.ok()) {
617
0
                return st;
618
0
            }
619
0
            sps_list.push_back(buf.substr(2, sps_length).as_string());
620
0
        }
621
0
        buf.remove_prefix(2 + sps_length);
622
0
    }
623
    // Parsing PPS
624
0
    pps_list.clear();
625
0
    if (buf.empty()) {
626
0
        return butil::Status(EINVAL, "Not enough data to decode PPS");
627
0
    }
628
0
    const int num_pps = (int)buf[0];
629
0
    buf.remove_prefix(1);
630
0
    for (int i = 0; i < num_pps; ++i) {
631
0
        if (buf.size() < 2) {
632
0
            return butil::Status(EINVAL, "Not enough data to decode PPS-length");
633
0
        }
634
0
        const uint16_t pps_length = policy::ReadBigEndian2Bytes(buf.data());
635
0
        if (buf.size() < 2u + pps_length) {
636
0
            return butil::Status(EINVAL, "Not enough data to decode PPS");
637
0
        }
638
0
        if (pps_length > 0) {
639
0
            pps_list.push_back(buf.substr(2, pps_length).as_string());
640
0
        }
641
0
        buf.remove_prefix(2 + pps_length);
642
0
    }
643
0
    return butil::Status::OK();
644
0
}
645
646
butil::Status AVCDecoderConfigurationRecord::ParseSPS(
647
0
    const butil::StringPiece& buf, size_t sps_length) {
648
    // for NALU, 7.3.1 NAL unit syntax
649
    // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61.
650
0
    if (buf.empty()) {
651
0
        return butil::Status(EINVAL, "SPS is empty");
652
0
    }
653
0
    const int8_t nutv = buf[0];
654
0
    const int8_t forbidden_zero_bit = (nutv >> 7) & 0x01;
655
0
    if (forbidden_zero_bit) {
656
0
        return butil::Status(EINVAL, "forbidden_zero_bit shall equal 0");
657
0
    }
658
    // nal_ref_idc not equal to 0 specifies that the content of the NAL unit
659
    // contains:
660
    //    a sequence parameter set
661
    // or a picture parameter set
662
    // or a slice of a reference picture
663
    // or a slice data partition of a reference picture.
664
0
    int8_t nal_ref_idc = (nutv >> 5) & 0x03;
665
0
    if (!nal_ref_idc) {
666
0
        return butil::Status(EINVAL, "nal_ref_idc is 0");
667
0
    }
668
    // 7.4.1 NAL unit semantics
669
    // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61.
670
    // nal_unit_type specifies the type of RBSP data structure contained in
671
    // the NAL unit as specified in Table 7-1.
672
0
    const AVCNaluType nal_unit_type = (AVCNaluType)(nutv & 0x1f);
673
0
    if (nal_unit_type != AVC_NALU_SPS) {
674
0
        return butil::Status(EINVAL, "nal_unit_type is not %d", (int)AVC_NALU_SPS);
675
0
    }
676
    // Extract the rbsp from sps.
677
0
    DEFINE_SMALL_ARRAY(char, rbsp, sps_length - 1, 64);
678
0
    buf.copy(rbsp, sps_length - 1, 1);
679
0
    size_t rbsp_len = 0;    
680
0
    for (size_t i = 1; i < sps_length; ++i) {
681
        // XX 00 00 03 XX, the 03 byte should be dropped.
682
0
        if (!(i >= 3 && buf[i - 2] == 0 && buf[i - 1] == 0 && buf[i] == 3)) {
683
0
            rbsp[rbsp_len++] = buf[i];
684
0
        }
685
0
    }
686
    // for SPS, 7.3.2.1.1 Sequence parameter set data syntax
687
    // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 62.
688
0
    if (rbsp_len < 3) {
689
0
        return butil::Status(EINVAL, "rbsp must be at least 3 bytes");
690
0
    }
691
    // Decode rbsp.
692
0
    const char* p = rbsp;
693
0
    uint8_t profile_idc = *p++;
694
0
    if (!profile_idc) {
695
0
        return butil::Status(EINVAL, "profile_idc is 0");
696
0
    }
697
0
    int8_t flags = *p++;
698
0
    if (flags & 0x03) {
699
0
        return butil::Status(EINVAL, "Invalid flags=%d", (int)flags);
700
0
    }
701
0
    uint8_t level_idc = *p++;
702
0
    if (!level_idc) {
703
0
        return butil::Status(EINVAL, "level_idc is 0");
704
0
    }
705
0
    BitStream bs(p, rbsp + rbsp_len - p);
706
0
    int32_t seq_parameter_set_id = -1;
707
0
    if (avc_nalu_read_uev(&bs, &seq_parameter_set_id) != 0) {
708
0
        return butil::Status(EINVAL, "Fail to read seq_parameter_set_id");
709
0
    }
710
0
    if (seq_parameter_set_id < 0) {
711
0
        return butil::Status(EINVAL, "Invalid seq_parameter_set_id=%d",
712
0
                            (int)seq_parameter_set_id);
713
0
    }
714
0
    int32_t chroma_format_idc = -1;
715
0
    if (profile_idc == 100 || profile_idc == 110 || profile_idc == 122 ||
716
0
        profile_idc == 244 || profile_idc == 44 || profile_idc == 83 ||
717
0
        profile_idc == 86 || profile_idc == 118 || profile_idc == 128) {
718
0
        if (avc_nalu_read_uev(&bs, &chroma_format_idc) != 0) {
719
0
            return butil::Status(EINVAL, "Fail to read chroma_format_idc");
720
0
        }
721
0
        if (chroma_format_idc == 3) {
722
0
            int8_t separate_colour_plane_flag = -1;
723
0
            if (avc_nalu_read_bit(&bs, &separate_colour_plane_flag) != 0) {
724
0
                return butil::Status(EINVAL, "Fail to read separate_colour_plane_flag");
725
0
            }
726
0
        }
727
0
        int32_t bit_depth_luma_minus8 = -1;
728
0
        if (avc_nalu_read_uev(&bs, &bit_depth_luma_minus8) != 0) {
729
0
            return butil::Status(EINVAL, "Fail to read bit_depth_luma_minus8");
730
0
        }
731
0
        int32_t bit_depth_chroma_minus8 = -1;
732
0
        if (avc_nalu_read_uev(&bs, &bit_depth_chroma_minus8) != 0) {
733
0
            return butil::Status(EINVAL, "Fail to read bit_depth_chroma_minus8");
734
0
        }
735
0
        int8_t qpprime_y_zero_transform_bypass_flag = -1;
736
0
        if (avc_nalu_read_bit(&bs, &qpprime_y_zero_transform_bypass_flag) != 0) {
737
0
            return butil::Status(EINVAL, "Fail to read qpprime_y_zero_transform_bypass_flag");
738
0
        }
739
0
        int8_t seq_scaling_matrix_present_flag = -1;
740
0
        if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag) != 0) {
741
0
            return butil::Status(EINVAL, "Fail to read seq_scaling_matrix_present_flag");
742
0
        }
743
0
        if (seq_scaling_matrix_present_flag) {
744
0
            int nb_scmpfs = (chroma_format_idc != 3 ? 8 : 12);
745
0
            for (int i = 0; i < nb_scmpfs; i++) {
746
0
                int8_t seq_scaling_matrix_present_flag_i = -1;
747
0
                if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag_i)) {
748
0
                    return butil::Status(EINVAL, "Fail to read seq_scaling_"
749
0
                                        "matrix_present_flag[%d]", i);
750
0
                }
751
0
                if (seq_scaling_matrix_present_flag_i) {
752
0
                    return butil::Status(EINVAL, "Invalid seq_scaling_matrix_"
753
0
                                        "present_flag[%d]=%d nb_scmpfs=%d",
754
0
                                        i, (int)seq_scaling_matrix_present_flag_i,
755
0
                                        nb_scmpfs);
756
0
                }
757
0
            }
758
0
        }
759
0
    }
760
0
    int32_t log2_max_frame_num_minus4 = -1;
761
0
    if (avc_nalu_read_uev(&bs, &log2_max_frame_num_minus4) != 0) {
762
0
        return butil::Status(EINVAL, "Fail to read log2_max_frame_num_minus4");
763
0
    }
764
0
    int32_t pic_order_cnt_type = -1;
765
0
    if (avc_nalu_read_uev(&bs, &pic_order_cnt_type) != 0) {
766
0
        return butil::Status(EINVAL, "Fail to read pic_order_cnt_type");
767
0
    }
768
0
    if (pic_order_cnt_type == 0) {
769
0
        int32_t log2_max_pic_order_cnt_lsb_minus4 = -1;
770
0
        if (avc_nalu_read_uev(&bs, &log2_max_pic_order_cnt_lsb_minus4) != 0) {
771
0
            return butil::Status(EINVAL, "Fail to read log2_max_pic_order_cnt_lsb_minus4");
772
0
        }
773
0
    } else if (pic_order_cnt_type == 1) {
774
0
        int8_t delta_pic_order_always_zero_flag = -1;
775
0
        if (avc_nalu_read_bit(&bs, &delta_pic_order_always_zero_flag) != 0) {
776
0
            return butil::Status(EINVAL, "Fail to read delta_pic_order_always_zero_flag");
777
0
        }
778
0
        int32_t offset_for_non_ref_pic = -1;
779
0
        if (avc_nalu_read_uev(&bs, &offset_for_non_ref_pic) != 0) {
780
0
            return butil::Status(EINVAL, "Fail to read offset_for_non_ref_pic");
781
0
        }
782
0
        int32_t offset_for_top_to_bottom_field = -1;
783
0
        if (avc_nalu_read_uev(&bs, &offset_for_top_to_bottom_field) != 0) {
784
0
            return butil::Status(EINVAL, "Fail to read offset_for_top_to_bottom_field");
785
0
        }
786
0
        int32_t num_ref_frames_in_pic_order_cnt_cycle = -1;
787
0
        if (avc_nalu_read_uev(&bs, &num_ref_frames_in_pic_order_cnt_cycle) != 0) {
788
0
            return butil::Status(EINVAL, "Fail to read num_ref_frames_in_pic_order_cnt_cycle");
789
0
        }
790
0
        if (num_ref_frames_in_pic_order_cnt_cycle) {
791
0
            return butil::Status(EINVAL, "Invalid num_ref_frames_in_pic_order_cnt_cycle=%d",
792
0
                                num_ref_frames_in_pic_order_cnt_cycle);
793
0
        }
794
0
    }
795
0
    int32_t max_num_ref_frames = -1;
796
0
    if (avc_nalu_read_uev(&bs, &max_num_ref_frames) != 0) {
797
0
        return butil::Status(EINVAL, "Fail to read max_num_ref_frames");
798
0
    }
799
0
    int8_t gaps_in_frame_num_value_allowed_flag = -1;
800
0
    if (avc_nalu_read_bit(&bs, &gaps_in_frame_num_value_allowed_flag) != 0) {
801
0
        return butil::Status(EINVAL, "Fail to read gaps_in_frame_num_value_allowed_flag");
802
0
    }
803
0
    int32_t pic_width_in_mbs_minus1 = -1;
804
0
    if (avc_nalu_read_uev(&bs, &pic_width_in_mbs_minus1) != 0) {
805
0
        return butil::Status(EINVAL, "Fail to read pic_width_in_mbs_minus1");
806
0
    }
807
0
    int32_t pic_height_in_map_units_minus1 = -1;
808
0
    if (avc_nalu_read_uev(&bs, &pic_height_in_map_units_minus1) != 0) {
809
0
        return butil::Status(EINVAL, "Fail to read pic_height_in_map_units_minus1");
810
0
    }
811
0
    width = (int)(pic_width_in_mbs_minus1 + 1) * 16;
812
0
    height = (int)(pic_height_in_map_units_minus1 + 1) * 16;
813
0
    return butil::Status::OK();
814
0
}
815
816
static bool find_avc_annexb_nalu_start_code(const butil::IOBuf& buf,
817
0
                                            size_t* start_code_length) {
818
0
    size_t consecutive_zero_count = 0;
819
0
    for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) {
820
0
        char c = *it;
821
0
        if (c == 0) {
822
0
            ++consecutive_zero_count;
823
0
        } else if (c == 1) {
824
0
            if (consecutive_zero_count >= 2) {
825
0
                if (start_code_length) {
826
0
                    *start_code_length = consecutive_zero_count + 1;
827
0
                }
828
0
                return true;
829
0
            }
830
0
            return false;
831
0
        } else {
832
0
            return false;
833
0
        }
834
0
    }
835
0
    return false;
836
0
}
837
838
static void find_avc_annexb_nalu_stop_code(const butil::IOBuf& buf,
839
                                           size_t* nalu_length_out,
840
0
                                           size_t* stop_code_length) {
841
0
    size_t nalu_length = 0;
842
0
    size_t consecutive_zero_count = 0;
843
0
    for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) {
844
0
        unsigned char c = (unsigned char)*it;
845
0
        if (c > 1) { // most frequent
846
0
            ++nalu_length;
847
0
            consecutive_zero_count = 0;
848
0
            continue;
849
0
        }
850
0
        if (c == 0) {
851
0
            ++consecutive_zero_count;
852
0
        } else { // c == 1
853
0
            if (consecutive_zero_count >= 2) {
854
0
                if (nalu_length_out) {
855
0
                    *nalu_length_out = nalu_length;
856
0
                }
857
0
                if (stop_code_length) {
858
0
                    *stop_code_length = consecutive_zero_count + 1;
859
0
                }
860
0
                return;
861
0
            } 
862
0
            ++nalu_length;
863
0
            consecutive_zero_count = 0;
864
0
        }
865
0
    }
866
0
    if (nalu_length_out) {
867
0
        *nalu_length_out = nalu_length + consecutive_zero_count;
868
0
    }
869
0
    if (stop_code_length) {
870
0
        *stop_code_length = 0;
871
0
    }
872
0
}
873
874
AVCNaluIterator::AVCNaluIterator(butil::IOBuf* data, uint32_t length_size_minus1,
875
                                 AVCNaluFormat* format)
876
    : _data(data)
877
    , _format(format)
878
    , _length_size_minus1(length_size_minus1)
879
0
    , _nalu_type(AVC_NALU_EMPTY) {
880
0
    if (_data) {
881
0
        ++*this;
882
0
    }
883
0
}
884
885
0
AVCNaluIterator::~AVCNaluIterator() {
886
0
}
887
888
0
void AVCNaluIterator::operator++() {
889
0
    if (*_format == AVC_NALU_FORMAT_ANNEXB) {
890
0
        if (!next_as_annexb()) {
891
0
            return set_end();
892
0
        }
893
0
    } else if (*_format == AVC_NALU_FORMAT_IBMF) {
894
0
        if (!next_as_ibmf()) {
895
0
            return set_end();
896
0
        }
897
0
    } else {
898
0
        size_t start_code_length = 0;
899
0
        if (find_avc_annexb_nalu_start_code(*_data, &start_code_length) &&
900
0
            _data->size() > start_code_length) {
901
0
            if (start_code_length > 0) {
902
0
                _data->pop_front(start_code_length);
903
0
            }
904
0
            *_format = AVC_NALU_FORMAT_ANNEXB;
905
0
            if (!next_as_annexb()) {
906
0
                return set_end();
907
0
            }
908
0
        } else if (next_as_ibmf()) {
909
0
            *_format = AVC_NALU_FORMAT_IBMF;
910
0
        } else {
911
0
            set_end();
912
0
        }
913
0
    }
914
0
}
915
916
0
bool AVCNaluIterator::next_as_annexb() {
917
0
    if (_data->empty()) {
918
0
        return false;
919
0
    }
920
0
    size_t nalu_length = 0;
921
0
    size_t stop_code_length = 0;
922
0
    find_avc_annexb_nalu_stop_code(*_data, &nalu_length, &stop_code_length);
923
0
    _cur_nalu.clear();
924
0
    _nalu_type = AVC_NALU_EMPTY;
925
0
    if (nalu_length) {
926
0
        _data->cutn(&_cur_nalu, nalu_length);
927
0
        const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1();
928
0
        _nalu_type = (AVCNaluType)(byte0 & 0x1f);
929
0
    }
930
0
    if (stop_code_length) {
931
0
        _data->pop_front(stop_code_length);
932
0
    }
933
0
    return true;
934
0
}
935
936
0
bool AVCNaluIterator::next_as_ibmf() {
937
    // The value of this field shall be one of 0, 1, or 3 corresponding to a
938
    // length encoded with 1, 2, or 4 bytes, respectively.
939
0
    CHECK_NE(_length_size_minus1, 2u);
940
941
0
    if (_data->empty()) {
942
0
        return false;
943
0
    }
944
0
    if (_data->size() < _length_size_minus1 + 1) {
945
0
        LOG(ERROR) << "Not enough data to decode length of NALU";
946
0
        return false;
947
0
    }
948
0
    int32_t nalu_length = 0;
949
0
    char buf[4];
950
0
    if (_length_size_minus1 == 3) {
951
0
        _data->copy_to(buf, 4);
952
0
        nalu_length = policy::ReadBigEndian4Bytes(buf);
953
0
    } else if (_length_size_minus1 == 1) {
954
0
        _data->copy_to(buf, 2);
955
0
        nalu_length = policy::ReadBigEndian2Bytes(buf);
956
0
    } else {
957
0
        _data->copy_to(buf, 1);
958
0
        nalu_length = *buf;
959
0
    }
960
    // maybe stream is invalid format.
961
    // see: https://github.com/ossrs/srs/issues/183
962
0
    if (nalu_length < 0) {
963
0
        LOG(ERROR) << "Invalid nalu_length=" << nalu_length;
964
0
        return false;
965
0
    }
966
0
    if (_data->size() < _length_size_minus1 + 1 + nalu_length) {
967
0
        LOG(ERROR) << "Not enough data to decode NALU";
968
0
        return false;
969
0
    }
970
0
    _data->pop_front(_length_size_minus1 + 1);
971
0
    _cur_nalu.clear();
972
0
    _nalu_type = AVC_NALU_EMPTY;
973
0
    if (nalu_length) {
974
0
        _data->cutn(&_cur_nalu, nalu_length);
975
0
        const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1();
976
0
        _nalu_type = (AVCNaluType)(byte0 & 0x1f);
977
0
    }
978
0
    return true;
979
0
}
980
981
RtmpClientOptions::RtmpClientOptions()
982
    : fpad(false)
983
    , audioCodecs((RtmpAudioCodec)3575) // Copy from SRS
984
    , videoCodecs((RtmpVideoCodec)252)  // Copy from SRS
985
    , videoFunction(RTMP_VIDEO_FUNCTION_CLIENT_SEEK)
986
    , timeout_ms(1000)
987
    , connect_timeout_ms(500)
988
    , buffer_length_ms(1000)
989
    , chunk_size(policy::RTMP_DEFAULT_CHUNK_SIZE)
990
    , window_ack_size(policy::RTMP_DEFAULT_WINDOW_ACK_SIZE)
991
0
    , simplified_rtmp(false) {
992
0
}
993
994
// Shared by RtmpClient and RtmpClientStream(s)
995
class RtmpClientImpl : public SharedObject {
996
friend class RtmpClientStream;
997
public:
998
0
    RtmpClientImpl() {
999
0
        get_rtmp_bvars()->client_count << 1;
1000
0
    }
1001
0
    ~RtmpClientImpl() {
1002
0
        get_rtmp_bvars()->client_count << -1;
1003
0
        RPC_VLOG << "Destroying RtmpClientImpl=" << this;
1004
0
    }
1005
1006
    // Specify the servers to connect.
1007
    int Init(butil::EndPoint server_addr_and_port,
1008
             const RtmpClientOptions& options);
1009
    int Init(const char* server_addr_and_port,
1010
             const RtmpClientOptions& options);
1011
    int Init(const char* server_addr, int port,
1012
             const RtmpClientOptions& options);
1013
    int Init(const char* naming_service_url, 
1014
             const char* load_balancer_name,
1015
             const RtmpClientOptions& options);
1016
    
1017
0
    const RtmpClientOptions& options() const { return _connect_options; }
1018
0
    SocketMap& socket_map() { return _socket_map; }
1019
1020
    int CreateSocket(const butil::EndPoint& pt, SocketId* id);
1021
1022
private:
1023
    DISALLOW_COPY_AND_ASSIGN(RtmpClientImpl);
1024
    int CommonInit(const RtmpClientOptions& options);
1025
    
1026
    Channel _chan;
1027
    RtmpClientOptions _connect_options;
1028
    SocketMap _socket_map;
1029
};
1030
1031
class RtmpConnect : public AppConnect {
1032
public:
1033
    // @AppConnect
1034
    void StartConnect(const Socket* s, void (*done)(int, void*), void* data) override;
1035
    void StopConnect(Socket* s) override;
1036
};
1037
1038
void RtmpConnect::StartConnect(
1039
0
    const Socket* s, void (*done)(int, void*), void* data) {
1040
0
    RPC_VLOG << "Establish rtmp-level connection on " << *s;
1041
0
    policy::RtmpContext* ctx =
1042
0
        static_cast<policy::RtmpContext*>(s->parsing_context());
1043
0
    if (ctx == NULL) {
1044
0
        LOG(FATAL) << "RtmpContext of " << *s << " is NULL";
1045
0
        return done(EINVAL, data);
1046
0
    }
1047
1048
0
    const RtmpClientOptions* _client_options = ctx->client_options();
1049
0
    if (_client_options && _client_options->simplified_rtmp) {
1050
0
        ctx->set_simplified_rtmp(true);
1051
0
        if (ctx->SendConnectRequest(s->remote_side(), s->fd(), true) != 0) {
1052
0
            LOG(ERROR) << s->remote_side() << ": Fail to send simple connect";
1053
0
            return done(EINVAL, data);
1054
0
        }
1055
0
        ctx->SetState(s->remote_side(), policy::RtmpContext::STATE_RECEIVED_S2);
1056
0
        ctx->set_create_stream_with_play_or_publish(true);
1057
0
        return done(0, data);
1058
0
    }
1059
1060
    // Save to callback to call when RTMP connect is done.
1061
0
    ctx->SetConnectCallback(done, data);
1062
        
1063
    // Initiate the rtmp handshake.
1064
0
    bool is_simple_handshake = false;
1065
0
    if (policy::SendC0C1(s->fd(), &is_simple_handshake) != 0) {
1066
0
        LOG(ERROR) << s->remote_side() << ": Fail to send C0 C1";
1067
0
        return done(EINVAL, data);
1068
0
    }
1069
0
    if (is_simple_handshake) {
1070
0
        ctx->only_check_simple_s0s1();
1071
0
    }
1072
0
}
1073
1074
0
void RtmpConnect::StopConnect(Socket* s) {
1075
0
    policy::RtmpContext* ctx =
1076
0
        static_cast<policy::RtmpContext*>(s->parsing_context());
1077
0
    if (ctx == NULL) {
1078
0
        LOG(FATAL) << "RtmpContext of " << *s << " is NULL";
1079
0
    } else {
1080
0
        ctx->OnConnected(EFAILEDSOCKET);
1081
0
    }
1082
0
}
1083
1084
class RtmpSocketCreator : public SocketCreator {
1085
public:
1086
    RtmpSocketCreator(const RtmpClientOptions& connect_options)
1087
0
        : _connect_options(connect_options) {
1088
0
    }
1089
1090
0
    int CreateSocket(const SocketOptions& opt, SocketId* id) override {
1091
0
        SocketOptions sock_opt = opt;
1092
0
        sock_opt.app_connect = std::make_shared<RtmpConnect>();
1093
0
        sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
1094
0
        return get_client_side_messenger()->Create(sock_opt, id);
1095
0
    }
1096
    
1097
private:
1098
    RtmpClientOptions _connect_options;
1099
};
1100
1101
0
int RtmpClientImpl::CreateSocket(const butil::EndPoint& pt, SocketId* id) {
1102
0
    SocketOptions sock_opt;
1103
0
    sock_opt.remote_side = pt;
1104
0
    sock_opt.app_connect = std::make_shared<RtmpConnect>();
1105
0
    sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
1106
0
    return get_client_side_messenger()->Create(sock_opt, id);
1107
0
}
1108
1109
0
int RtmpClientImpl::CommonInit(const RtmpClientOptions& options) {
1110
0
    _connect_options = options;
1111
0
    SocketMapOptions sm_options;
1112
0
    sm_options.socket_creator = new RtmpSocketCreator(_connect_options);
1113
0
    if (_socket_map.Init(sm_options) != 0) {
1114
0
        LOG(ERROR) << "Fail to init _socket_map";
1115
0
        return -1;
1116
0
    }
1117
0
    return 0;
1118
0
}
1119
1120
int RtmpClientImpl::Init(butil::EndPoint server_addr_and_port,
1121
0
                         const RtmpClientOptions& options) {
1122
0
    if (CommonInit(options) != 0) {
1123
0
        return -1;
1124
0
    }
1125
0
    ChannelOptions copts;
1126
0
    copts.connect_timeout_ms = options.connect_timeout_ms;
1127
0
    copts.timeout_ms = options.timeout_ms;
1128
0
    copts.protocol = PROTOCOL_RTMP;
1129
0
    return _chan.Init(server_addr_and_port, &copts);
1130
0
}
1131
int RtmpClientImpl::Init(const char* server_addr_and_port,
1132
0
                         const RtmpClientOptions& options) {
1133
0
    if (CommonInit(options) != 0) {
1134
0
        return -1;
1135
0
    }
1136
0
    ChannelOptions copts;
1137
0
    copts.connect_timeout_ms = options.connect_timeout_ms;
1138
0
    copts.timeout_ms = options.timeout_ms;
1139
0
    copts.protocol = PROTOCOL_RTMP;
1140
0
    return _chan.Init(server_addr_and_port, &copts);
1141
0
}
1142
int RtmpClientImpl::Init(const char* server_addr, int port,
1143
0
                         const RtmpClientOptions& options) {
1144
0
    if (CommonInit(options) != 0) {
1145
0
        return -1;
1146
0
    }
1147
0
    ChannelOptions copts;
1148
0
    copts.connect_timeout_ms = options.connect_timeout_ms;
1149
0
    copts.timeout_ms = options.timeout_ms;
1150
0
    copts.protocol = PROTOCOL_RTMP;
1151
0
    return _chan.Init(server_addr, port, &copts);
1152
0
}
1153
int RtmpClientImpl::Init(const char* naming_service_url, 
1154
                         const char* load_balancer_name,
1155
0
                         const RtmpClientOptions& options) {
1156
0
    if (CommonInit(options) != 0) {
1157
0
        return -1;
1158
0
    }
1159
0
    ChannelOptions copts;
1160
0
    copts.connect_timeout_ms = options.connect_timeout_ms;
1161
0
    copts.timeout_ms = options.timeout_ms;
1162
0
    copts.protocol = PROTOCOL_RTMP;
1163
0
    return _chan.Init(naming_service_url, load_balancer_name, &copts);
1164
0
}
1165
1166
0
RtmpClient::RtmpClient() {}
1167
0
RtmpClient::~RtmpClient() {}
1168
0
RtmpClient::RtmpClient(const RtmpClient& rhs) : _impl(rhs._impl) {}
1169
1170
0
RtmpClient& RtmpClient::operator=(const RtmpClient& rhs) {
1171
0
    _impl = rhs._impl;
1172
0
    return *this;
1173
0
}
1174
1175
0
const RtmpClientOptions& RtmpClient::options() const {
1176
0
    if (_impl) {
1177
0
        return _impl->options();
1178
0
    } else {
1179
0
        static RtmpClientOptions dft_opt;
1180
0
        return dft_opt;
1181
0
    }
1182
0
}
1183
1184
int RtmpClient::Init(butil::EndPoint server_addr_and_port,
1185
0
                     const RtmpClientOptions& options) {
1186
0
    butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
1187
0
    if (tmp == NULL) {
1188
0
        LOG(FATAL) << "Fail to new RtmpClientImpl";
1189
0
        return -1;
1190
0
    }
1191
0
    if (tmp->Init(server_addr_and_port, options) != 0) {
1192
0
        return -1;
1193
0
    }
1194
0
    tmp.swap(_impl);
1195
0
    return 0;
1196
0
}
1197
1198
int RtmpClient::Init(const char* server_addr_and_port,
1199
0
                     const RtmpClientOptions& options) {
1200
0
    butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
1201
0
    if (tmp == NULL) {
1202
0
        LOG(FATAL) << "Fail to new RtmpClientImpl";
1203
0
        return -1;
1204
0
    }
1205
0
    if (tmp->Init(server_addr_and_port, options) != 0) {
1206
0
        return -1;
1207
0
    }
1208
0
    tmp.swap(_impl);
1209
0
    return 0;
1210
0
}
1211
1212
int RtmpClient::Init(const char* server_addr, int port,
1213
0
                     const RtmpClientOptions& options) {
1214
0
    butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
1215
0
    if (tmp == NULL) {
1216
0
        LOG(FATAL) << "Fail to new RtmpClientImpl";
1217
0
        return -1;
1218
0
    }
1219
0
    if (tmp->Init(server_addr, port, options) != 0) {
1220
0
        return -1;
1221
0
    }
1222
0
    tmp.swap(_impl);
1223
0
    return 0;
1224
0
}
1225
1226
int RtmpClient::Init(const char* naming_service_url, 
1227
                     const char* load_balancer_name,
1228
0
                     const RtmpClientOptions& options) {
1229
0
    butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
1230
0
    if (tmp == NULL) {
1231
0
        LOG(FATAL) << "Fail to new RtmpClientImpl";
1232
0
        return -1;
1233
0
    }
1234
0
    if (tmp->Init(naming_service_url, load_balancer_name, options) != 0) {
1235
0
        return -1;
1236
0
    }
1237
0
    tmp.swap(_impl);
1238
0
    return 0;
1239
0
}
1240
1241
0
bool RtmpClient::initialized() const { return _impl != NULL; }
1242
1243
RtmpStreamBase::RtmpStreamBase(bool is_client)
1244
    : _is_client(is_client)
1245
    , _paused(false)
1246
    , _stopped(false)
1247
    , _processing_msg(false)
1248
    , _has_data_ever(false)
1249
    , _message_stream_id(0)
1250
    , _chunk_stream_id(0)
1251
    , _create_realtime_us(butil::gettimeofday_us())
1252
0
    , _is_server_accepted(false) {
1253
0
}
1254
1255
0
RtmpStreamBase::~RtmpStreamBase() {
1256
0
}
1257
1258
0
void RtmpStreamBase::Destroy() {
1259
0
    return;
1260
0
}
1261
1262
int RtmpStreamBase::SendMessage(uint32_t timestamp,
1263
                                uint8_t message_type,
1264
0
                                const butil::IOBuf& body) {
1265
0
    if (_rtmpsock == NULL) {
1266
0
        errno = EPERM;
1267
0
        return -1;
1268
0
    }
1269
0
    if (_chunk_stream_id == 0) {
1270
0
        LOG(ERROR) << "SendXXXMessage can't be called before play() is received";
1271
0
        errno = EPERM;
1272
0
        return -1;
1273
0
    }
1274
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage);
1275
0
    msg->header.timestamp = timestamp;
1276
0
    msg->header.message_length = body.size();
1277
0
    msg->header.message_type = message_type;
1278
0
    msg->header.stream_id = _message_stream_id;
1279
0
    msg->chunk_stream_id = _chunk_stream_id;
1280
0
    msg->body = body;
1281
0
    return _rtmpsock->Write(msg);
1282
0
}
1283
1284
int RtmpStreamBase::SendControlMessage(
1285
0
    uint8_t message_type, const void* body, size_t size) {
1286
0
    if (_rtmpsock == NULL) {
1287
0
        errno = EPERM;
1288
0
        return -1;
1289
0
    }
1290
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg(
1291
0
        policy::MakeUnsentControlMessage(message_type, body, size));
1292
0
    return _rtmpsock->Write(msg);
1293
0
}
1294
1295
0
int RtmpStreamBase::SendCuePoint(const RtmpCuePoint& cuepoint) {
1296
0
    butil::IOBuf req_buf;
1297
0
    {
1298
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
1299
0
        AMFOutputStream ostream(&zc_stream);
1300
0
        WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
1301
0
        WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
1302
0
        WriteAMFObject(cuepoint.data, &ostream);
1303
0
        if (!ostream.good()) {
1304
0
            LOG(ERROR) << "Fail to serialize cuepoint";
1305
0
            return -1;
1306
0
        }
1307
0
    }
1308
0
    return SendMessage(cuepoint.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
1309
0
}
1310
1311
int RtmpStreamBase::SendMetaData(const RtmpMetaData& metadata,
1312
0
                                 const butil::StringPiece& name) {
1313
0
    butil::IOBuf req_buf;
1314
0
    {
1315
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
1316
0
        AMFOutputStream ostream(&zc_stream);
1317
0
        WriteAMFString(name, &ostream);
1318
0
        WriteAMFObject(metadata.data, &ostream);
1319
0
        if (!ostream.good()) {
1320
0
            LOG(ERROR) << "Fail to serialize metadata";
1321
0
            return -1;
1322
0
        }
1323
0
    }
1324
0
    return SendMessage(metadata.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
1325
0
}
1326
1327
0
int RtmpStreamBase::SendSharedObjectMessage(const RtmpSharedObjectMessage&) {
1328
0
    CHECK(false) << "Not supported yet";
1329
0
    return -1;
1330
0
}
1331
1332
0
int RtmpStreamBase::SendAudioMessage(const RtmpAudioMessage& msg) {
1333
0
    if (_rtmpsock == NULL) {
1334
0
        errno = EPERM;
1335
0
        return -1;
1336
0
    }
1337
0
    if (_chunk_stream_id == 0) {
1338
0
        LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
1339
0
        errno = EPERM;
1340
0
        return -1;
1341
0
    }
1342
0
    if (_paused) {
1343
0
        errno = EPERM;
1344
0
        return -1;
1345
0
    }
1346
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
1347
0
    msg2->header.timestamp = msg.timestamp;
1348
0
    msg2->header.message_length = msg.size();
1349
0
    msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO;
1350
0
    msg2->header.stream_id = _message_stream_id;
1351
0
    msg2->chunk_stream_id = _chunk_stream_id;
1352
    // Make audio header.
1353
0
    const char audio_head =
1354
0
        ((msg.codec & 0xF) << 4)
1355
0
        | ((msg.rate & 0x3) << 2)
1356
0
        | ((msg.bits & 0x1) << 1)
1357
0
        | (msg.type & 0x1);
1358
0
    msg2->body.push_back(audio_head);
1359
0
    msg2->body.append(msg.data);
1360
0
    return _rtmpsock->Write(msg2);
1361
0
}
1362
1363
0
int RtmpStreamBase::SendAACMessage(const RtmpAACMessage& msg) {
1364
0
    if (_rtmpsock == NULL) {
1365
0
        errno = EPERM;
1366
0
        return -1;
1367
0
    }
1368
0
    if (_chunk_stream_id == 0) {
1369
0
        LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
1370
0
        errno = EPERM;
1371
0
        return -1;
1372
0
    }
1373
0
    if (_paused) {
1374
0
        errno = EPERM;
1375
0
        return -1;
1376
0
    }
1377
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
1378
0
    msg2->header.timestamp = msg.timestamp;
1379
0
    msg2->header.message_length = msg.size();
1380
0
    msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO;
1381
0
    msg2->header.stream_id = _message_stream_id;
1382
0
    msg2->chunk_stream_id = _chunk_stream_id;
1383
    // Make audio header.
1384
0
    char aac_head[2];
1385
0
    aac_head[0] = ((FLV_AUDIO_AAC & 0xF) << 4)
1386
0
        | ((msg.rate & 0x3) << 2)
1387
0
        | ((msg.bits & 0x1) << 1)
1388
0
        | (msg.type & 0x1);
1389
0
    aac_head[1] = (FlvAACPacketType)msg.packet_type;
1390
0
    msg2->body.append(aac_head, sizeof(aac_head));
1391
0
    msg2->body.append(msg.data);
1392
0
    return _rtmpsock->Write(msg2);
1393
0
}
1394
1395
0
int RtmpStreamBase::SendUserMessage(void*) {
1396
0
    CHECK(false) << "You should implement your own SendUserMessage";
1397
0
    return 0; 
1398
0
}
1399
1400
0
int RtmpStreamBase::SendVideoMessage(const RtmpVideoMessage& msg) {
1401
0
    if (_rtmpsock == NULL) {
1402
0
        errno = EPERM;
1403
0
        return -1;
1404
0
    }
1405
0
    if (_chunk_stream_id == 0) {
1406
0
        LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
1407
0
        errno = EPERM;
1408
0
        return -1;
1409
0
    }
1410
0
    if (!policy::is_video_frame_type_valid(msg.frame_type)) {
1411
0
        LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type;
1412
0
    }
1413
0
    if (!policy::is_video_codec_valid(msg.codec)) {
1414
0
        LOG(WARNING) << "Invalid codec=" << (int)msg.codec;
1415
0
    }
1416
0
    if (_paused) {
1417
0
        errno = EPERM;
1418
0
        return -1;
1419
0
    }
1420
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
1421
0
    msg2->header.timestamp = msg.timestamp;
1422
0
    msg2->header.message_length = msg.size();
1423
0
    msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO;
1424
0
    msg2->header.stream_id = _message_stream_id;
1425
0
    msg2->chunk_stream_id = _chunk_stream_id;
1426
    // Make video header
1427
0
    const char video_head = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF);
1428
0
    msg2->body.push_back(video_head);
1429
0
    msg2->body.append(msg.data);
1430
0
    return _rtmpsock->Write(msg2);
1431
0
}
1432
1433
0
int RtmpStreamBase::SendAVCMessage(const RtmpAVCMessage& msg) {
1434
0
    if (_rtmpsock == NULL) {
1435
0
        errno = EPERM;
1436
0
        return -1;
1437
0
    }
1438
0
    if (_chunk_stream_id == 0) {
1439
0
        LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
1440
0
        errno = EPERM;
1441
0
        return -1;
1442
0
    }
1443
0
    if (!policy::is_video_frame_type_valid(msg.frame_type)) {
1444
0
        LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type;
1445
0
    }
1446
0
    if (_paused) {
1447
0
        errno = EPERM;
1448
0
        return -1;
1449
0
    }
1450
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
1451
0
    msg2->header.timestamp = msg.timestamp;
1452
0
    msg2->header.message_length = msg.size();
1453
0
    msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO;
1454
0
    msg2->header.stream_id = _message_stream_id;
1455
0
    msg2->chunk_stream_id = _chunk_stream_id;
1456
    // Make video header
1457
0
    char avc_head[5];
1458
0
    char* p = avc_head;
1459
0
    *p++ = ((msg.frame_type & 0xF) << 4) | (FLV_VIDEO_AVC & 0xF);
1460
0
    *p++ = (FlvAVCPacketType)msg.packet_type;
1461
0
    policy::WriteBigEndian3Bytes(&p, msg.composition_time);
1462
0
    msg2->body.append(avc_head, sizeof(avc_head));
1463
0
    msg2->body.append(msg.data);
1464
0
    return _rtmpsock->Write(msg2);
1465
0
}
1466
1467
0
int RtmpStreamBase::SendStopMessage(const butil::StringPiece&) {
1468
0
    return -1;
1469
0
}
1470
1471
0
const char* RtmpObjectEncoding2Str(RtmpObjectEncoding e) {
1472
0
    switch (e) {
1473
0
    case RTMP_AMF0: return "AMF0";
1474
0
    case RTMP_AMF3: return "AMF3";
1475
0
    }
1476
0
    return "Unknown RtmpObjectEncoding";
1477
0
}
1478
1479
0
void RtmpStreamBase::SignalError() {
1480
0
    return;
1481
0
}
1482
1483
0
void RtmpStreamBase::OnFirstMessage() {}
1484
1485
0
void RtmpStreamBase::OnUserData(void*) {
1486
0
    LOG(INFO) << remote_side() << '[' << stream_id()
1487
0
              << "] ignored UserData{}";
1488
0
}
1489
1490
0
void RtmpStreamBase::OnCuePoint(RtmpCuePoint* cuepoint) {
1491
0
    LOG(INFO) << remote_side() << '[' << stream_id()
1492
0
              << "] ignored CuePoint{" << cuepoint->data << '}';
1493
0
}
1494
1495
0
void RtmpStreamBase::OnMetaData(RtmpMetaData* metadata, const butil::StringPiece& name) {
1496
0
    LOG(INFO) << remote_side() << '[' << stream_id()
1497
0
              << "] ignored MetaData{" << metadata->data << '}'
1498
0
              << " name{" << name << '}';
1499
0
}
1500
1501
0
void RtmpStreamBase::OnSharedObjectMessage(RtmpSharedObjectMessage*) {
1502
0
    LOG(ERROR) << remote_side() << '[' << stream_id()
1503
0
               << "] ignored SharedObjectMessage{}";
1504
0
}
1505
1506
0
void RtmpStreamBase::OnAudioMessage(RtmpAudioMessage* msg) {
1507
0
    LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg;
1508
0
}
1509
1510
0
void RtmpStreamBase::OnVideoMessage(RtmpVideoMessage* msg) {
1511
0
    LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg;
1512
0
}
1513
1514
0
void RtmpStreamBase::OnStop() {
1515
    // do nothing by default
1516
0
}
1517
1518
0
bool RtmpStreamBase::BeginProcessingMessage(const char* fun_name) {
1519
0
    std::unique_lock<butil::Mutex> mu(_call_mutex);
1520
0
    if (_stopped) {
1521
0
        mu.unlock();
1522
0
        LOG(ERROR) << fun_name << " is called after OnStop()";
1523
0
        return false;
1524
0
    }
1525
0
    if (_processing_msg) {
1526
0
        mu.unlock();
1527
0
        LOG(ERROR) << "Impossible: Another OnXXXMessage is being called!";
1528
0
        return false;
1529
0
    }
1530
0
    _processing_msg = true;
1531
0
    if (!_has_data_ever) {
1532
0
        _has_data_ever = true;
1533
0
        OnFirstMessage();
1534
0
    }
1535
0
    return true;
1536
0
}
1537
1538
0
void RtmpStreamBase::EndProcessingMessage() {
1539
0
    std::unique_lock<butil::Mutex> mu(_call_mutex);
1540
0
    _processing_msg = false;
1541
0
    if (_stopped) {
1542
0
        mu.unlock();
1543
0
        return OnStop();
1544
0
    }
1545
0
}
1546
1547
0
void RtmpStreamBase::CallOnUserData(void* data) {
1548
0
    if (BeginProcessingMessage("OnUserData()")) {
1549
0
        OnUserData(data);
1550
0
        EndProcessingMessage();
1551
0
    }
1552
0
}
1553
1554
0
void RtmpStreamBase::CallOnCuePoint(RtmpCuePoint* obj) {
1555
0
    if (BeginProcessingMessage("OnCuePoint()")) {
1556
0
        OnCuePoint(obj);
1557
0
        EndProcessingMessage();
1558
0
    }
1559
0
}
1560
1561
0
void RtmpStreamBase::CallOnMetaData(RtmpMetaData* obj, const butil::StringPiece& name) {
1562
0
    if (BeginProcessingMessage("OnMetaData()")) {
1563
0
        OnMetaData(obj, name);
1564
0
        EndProcessingMessage();
1565
0
    }
1566
0
}
1567
1568
0
void RtmpStreamBase::CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg) {
1569
0
    if (BeginProcessingMessage("OnSharedObjectMessage()")) {
1570
0
        OnSharedObjectMessage(msg);
1571
0
        EndProcessingMessage();
1572
0
    }
1573
0
}
1574
1575
0
void RtmpStreamBase::CallOnAudioMessage(RtmpAudioMessage* msg) {
1576
0
    if (BeginProcessingMessage("OnAudioMessage()")) {
1577
0
        OnAudioMessage(msg);
1578
0
        EndProcessingMessage();
1579
0
    }
1580
0
}
1581
1582
0
void RtmpStreamBase::CallOnVideoMessage(RtmpVideoMessage* msg) {
1583
0
    if (BeginProcessingMessage("OnVideoMessage()")) {
1584
0
        OnVideoMessage(msg);
1585
0
        EndProcessingMessage();
1586
0
    }
1587
0
}
1588
1589
0
void RtmpStreamBase::CallOnStop() {
1590
0
    {
1591
0
        std::unique_lock<butil::Mutex> mu(_call_mutex);
1592
0
        if (_stopped) {
1593
0
            mu.unlock();
1594
0
            LOG(ERROR) << "OnStop() was called more than once";
1595
0
            return;
1596
0
        }
1597
0
        _stopped = true;
1598
0
        if (_processing_msg) {
1599
            // EndProcessingMessage() will call OnStop();
1600
0
            return;
1601
0
        }
1602
0
    }
1603
0
    OnStop();
1604
0
}
1605
 
1606
butil::EndPoint RtmpStreamBase::remote_side() const
1607
0
{ return _rtmpsock ? _rtmpsock->remote_side() : butil::EndPoint(); }
1608
1609
butil::EndPoint RtmpStreamBase::local_side() const
1610
0
{ return _rtmpsock ? _rtmpsock->local_side() : butil::EndPoint(); }
1611
1612
// ============ RtmpClientStream =============
1613
1614
RtmpClientStream::RtmpClientStream()
1615
    : RtmpStreamBase(true)
1616
    , _onfail_id(INVALID_BTHREAD_ID)
1617
    , _create_stream_rpc_id(INVALID_BTHREAD_ID)
1618
    , _from_socketmap(true)
1619
    , _created_stream_with_play_or_publish(false)
1620
0
    , _state(STATE_UNINITIALIZED) {
1621
0
    get_rtmp_bvars()->client_stream_count << 1;
1622
0
    _self_ref.reset(this);
1623
0
}
1624
1625
0
RtmpClientStream::~RtmpClientStream() {
1626
0
    get_rtmp_bvars()->client_stream_count << -1;
1627
0
}
1628
1629
0
void RtmpClientStream::Destroy() {
1630
0
    bthread_id_t onfail_id = INVALID_BTHREAD_ID;
1631
0
    CallId create_stream_rpc_id = INVALID_BTHREAD_ID;
1632
0
    butil::intrusive_ptr<RtmpClientStream> self_ref;
1633
    
1634
0
    std::unique_lock<butil::Mutex> mu(_state_mutex);
1635
0
    switch (_state) {
1636
0
    case STATE_UNINITIALIZED:
1637
0
        _state = STATE_DESTROYING;
1638
0
        mu.unlock();
1639
0
        OnStopInternal(); 
1640
0
        _self_ref.swap(self_ref);
1641
0
        return;
1642
0
    case STATE_CREATING:
1643
0
        _state = STATE_DESTROYING;
1644
0
        create_stream_rpc_id = _create_stream_rpc_id;
1645
0
        mu.unlock();
1646
0
        _self_ref.swap(self_ref);
1647
0
        StartCancel(create_stream_rpc_id);
1648
0
        return;
1649
0
    case STATE_CREATED:
1650
0
        _state = STATE_DESTROYING;
1651
0
        onfail_id = _onfail_id;
1652
0
        mu.unlock();
1653
0
        _self_ref.swap(self_ref);
1654
0
        bthread_id_error(onfail_id, 0);
1655
0
        return;
1656
0
    case STATE_ERROR:
1657
0
        _state = STATE_DESTROYING;
1658
0
        mu.unlock();
1659
0
        _self_ref.swap(self_ref);
1660
0
        return;
1661
0
    case STATE_DESTROYING:
1662
        // Destroy() was already called.
1663
0
        return;
1664
0
    }
1665
0
}
1666
1667
0
void RtmpClientStream::SignalError() {
1668
0
    bthread_id_t onfail_id = INVALID_BTHREAD_ID;
1669
0
    std::unique_lock<butil::Mutex> mu(_state_mutex);
1670
0
    switch (_state) {
1671
0
    case STATE_UNINITIALIZED:
1672
0
        _state = STATE_ERROR;
1673
0
        mu.unlock();
1674
0
        OnStopInternal(); 
1675
0
        return;
1676
0
    case STATE_CREATING:
1677
0
        _state = STATE_ERROR;
1678
0
        mu.unlock();
1679
0
        return;
1680
0
    case STATE_CREATED:
1681
0
        _state = STATE_ERROR;
1682
0
        onfail_id = _onfail_id;
1683
0
        mu.unlock();
1684
0
        bthread_id_error(onfail_id, 0);
1685
0
        return;
1686
0
    case STATE_ERROR:
1687
0
    case STATE_DESTROYING:
1688
        // SignalError() or Destroy() was already called.
1689
0
        return;
1690
0
    }
1691
0
}
1692
1693
StreamUserData* RtmpClientStream::OnCreatingStream(
1694
0
    SocketUniquePtr* inout, Controller* cntl) {
1695
0
    {
1696
0
        std::unique_lock<butil::Mutex> mu(_state_mutex);
1697
0
        if (_state == STATE_ERROR || _state == STATE_DESTROYING) {
1698
0
            cntl->SetFailed(EINVAL, "Fail to replace socket for stream, _state is error or destroying");
1699
0
            return NULL;
1700
0
        }
1701
0
    }
1702
0
    SocketId esid;
1703
0
    if (cntl->connection_type() == CONNECTION_TYPE_SHORT) {
1704
0
        if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) {
1705
0
            cntl->SetFailed(EINVAL, "Fail to create RTMP socket");
1706
0
            return NULL;
1707
0
        }
1708
0
    } else {
1709
0
        if (_client_impl->socket_map().Insert(
1710
0
                SocketMapKey((*inout)->remote_side()), &esid) != 0) {
1711
0
            cntl->SetFailed(EINVAL, "Fail to get the RTMP socket");
1712
0
            return NULL;
1713
0
        }
1714
0
    }
1715
0
    SocketUniquePtr tmp_ptr;
1716
0
    if (Socket::Address(esid, &tmp_ptr) != 0) {
1717
0
        cntl->SetFailed(EFAILEDSOCKET, "Fail to address RTMP SocketId=%" PRIu64
1718
0
                        " from SocketMap of RtmpClient=%p",
1719
0
                        esid, _client_impl.get());
1720
0
        return NULL;
1721
0
    }
1722
0
    RPC_VLOG << "Replace Socket For Stream, RTMP socketId=" << esid
1723
0
             << ", main socketId=" << (*inout)->id();
1724
0
    tmp_ptr->ShareStats(inout->get());
1725
0
    inout->reset(tmp_ptr.release());
1726
0
    return this;
1727
0
}
1728
1729
0
int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) {
1730
0
    butil::intrusive_ptr<RtmpClientStream> stream(
1731
0
        static_cast<RtmpClientStream*>(data), false);
1732
0
    CHECK(stream->_rtmpsock);
1733
    // Must happen after NotifyOnFailed which is after all other callsites
1734
    // to OnStopInternal().
1735
0
    stream->OnStopInternal();
1736
0
    bthread_id_unlock_and_destroy(id);
1737
0
    return 0;
1738
0
}
1739
1740
0
void RtmpClientStream::OnFailedToCreateStream() {
1741
0
    {
1742
0
        std::unique_lock<butil::Mutex> mu(_state_mutex);
1743
0
        switch (_state) {
1744
0
        case STATE_CREATING:
1745
0
            _state = STATE_ERROR;
1746
0
            break;
1747
0
        case STATE_UNINITIALIZED:
1748
0
        case STATE_CREATED:
1749
0
            _state = STATE_ERROR;
1750
0
            mu.unlock();
1751
0
            CHECK(false) << "Impossible";
1752
0
            break;
1753
0
        case STATE_ERROR:
1754
0
        case STATE_DESTROYING:
1755
0
            break;
1756
0
        }
1757
0
    }
1758
0
    return OnStopInternal();
1759
0
}
1760
1761
void RtmpClientStream::DestroyStreamUserData(SocketUniquePtr& sending_sock,
1762
                                             Controller* cntl,
1763
                                             int /*error_code*/,
1764
0
                                             bool end_of_rpc) {
1765
0
    if (!end_of_rpc) {
1766
0
        if (sending_sock) {
1767
0
            if (_from_socketmap) {
1768
0
                _client_impl->socket_map().Remove(SocketMapKey(sending_sock->remote_side()),
1769
0
                        sending_sock->id());
1770
0
            } else {
1771
0
                sending_sock->SetFailed();  // not necessary, already failed.
1772
0
            }
1773
0
        }
1774
0
    } else {
1775
        // Always move sending_sock into _rtmpsock at the end of rpc.
1776
        // - If the RPC is successful, moving sending_sock prevents it from
1777
        //   setfailed in Controller after calling this method.
1778
        // - If the RPC is failed, OnStopInternal() can clean up the socket_map
1779
        //   inserted in OnCreatingStream().
1780
0
        _rtmpsock.swap(sending_sock);
1781
0
    }
1782
0
}
1783
1784
1785
0
void RtmpClientStream::DestroyStreamCreator(Controller* cntl) {
1786
0
    if (cntl->Failed()) {
1787
0
        if (_rtmpsock != NULL &&
1788
            // ^ If sending_sock is NULL, the RPC fails before _pack_request
1789
            // which calls AddTransaction, in another word, RemoveTransaction
1790
            // is not needed.
1791
0
            cntl->ErrorCode() != ERTMPCREATESTREAM) {
1792
            // ^ ERTMPCREATESTREAM is triggered by receiving "_error" command,
1793
            // RemoveTransaction should already be called.
1794
0
            CHECK_LT(cntl->log_id(), (uint64_t)std::numeric_limits<uint32_t>::max());
1795
0
            const uint32_t transaction_id = cntl->log_id();
1796
0
            policy::RtmpContext* rtmp_ctx =
1797
0
                static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
1798
0
            if (rtmp_ctx == NULL) {
1799
0
                LOG(FATAL) << "RtmpContext must be created";
1800
0
            } else {
1801
0
                policy::RtmpTransactionHandler* handler =
1802
0
                    rtmp_ctx->RemoveTransaction(transaction_id);
1803
0
                if (handler) {
1804
0
                    handler->Cancel();
1805
0
                }
1806
0
            }
1807
0
        }
1808
0
        return OnFailedToCreateStream();
1809
0
    }
1810
1811
0
    int rc = 0;
1812
0
    bthread_id_t onfail_id = INVALID_BTHREAD_ID;
1813
0
    {
1814
0
        std::unique_lock<butil::Mutex> mu(_state_mutex);
1815
0
        switch (_state) {
1816
0
        case STATE_CREATING:
1817
0
            CHECK(_rtmpsock);
1818
0
            rc = bthread_id_create(&onfail_id, this, RunOnFailed);
1819
0
            if (rc) {
1820
0
                cntl->SetFailed(ENOMEM, "Fail to create _onfail_id: %s", berror(rc));
1821
0
                mu.unlock();
1822
0
                return OnFailedToCreateStream();
1823
0
            }
1824
            // Add a ref for RunOnFailed.
1825
0
            butil::intrusive_ptr<RtmpClientStream>(this).detach();
1826
0
            _state = STATE_CREATED;
1827
0
            _onfail_id = onfail_id;
1828
0
            break;
1829
0
        case STATE_UNINITIALIZED:
1830
0
        case STATE_CREATED:
1831
0
            _state = STATE_ERROR;
1832
0
            mu.unlock();
1833
0
            CHECK(false) << "Impossible";
1834
0
            return OnStopInternal();
1835
0
        case STATE_ERROR:
1836
0
        case STATE_DESTROYING:
1837
0
            mu.unlock();
1838
0
            return OnStopInternal();
1839
0
        }
1840
0
    }
1841
0
    if (onfail_id != INVALID_BTHREAD_ID) {
1842
0
        _rtmpsock->NotifyOnFailed(onfail_id);
1843
0
    }
1844
0
}
1845
1846
0
void RtmpClientStream::OnStopInternal() {
1847
0
    if (_rtmpsock == NULL) {
1848
0
        return CallOnStop();
1849
0
    }
1850
1851
0
    if (!_rtmpsock->Failed() && _chunk_stream_id != 0) {
1852
        // SRS requires closeStream which is sent over this stream.
1853
0
        butil::IOBuf req_buf1;
1854
0
        {
1855
0
            butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf1);
1856
0
            AMFOutputStream ostream(&zc_stream);
1857
0
            WriteAMFString(RTMP_AMF0_COMMAND_CLOSE_STREAM, &ostream);
1858
0
            WriteAMFUint32(0, &ostream);
1859
0
            WriteAMFNull(&ostream);
1860
0
            CHECK(ostream.good());
1861
0
        }
1862
0
        SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
1863
0
        msg1->header.message_length = req_buf1.size();
1864
0
        msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
1865
0
        msg1->header.stream_id = _message_stream_id;
1866
0
        msg1->chunk_stream_id = _chunk_stream_id;
1867
0
        msg1->body = req_buf1;
1868
    
1869
        // Send deleteStream over the control stream.
1870
0
        butil::IOBuf req_buf2;
1871
0
        {
1872
0
            butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf2);
1873
0
            AMFOutputStream ostream(&zc_stream);
1874
0
            WriteAMFString(RTMP_AMF0_COMMAND_DELETE_STREAM, &ostream);
1875
0
            WriteAMFUint32(0, &ostream);
1876
0
            WriteAMFNull(&ostream);
1877
0
            WriteAMFUint32(_message_stream_id, &ostream);
1878
0
            CHECK(ostream.good());
1879
0
        }
1880
0
        policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
1881
0
            policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf2);
1882
0
        msg1->next.reset(msg2);
1883
1884
0
        if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg1) != 0) {
1885
0
            if (errno != EFAILEDSOCKET) {
1886
0
                PLOG(WARNING) << "Fail to send closeStream/deleteStream to "
1887
0
                              << _rtmpsock->remote_side() << "["
1888
0
                              << _message_stream_id << "]";
1889
                // Close the connection to make sure the server-side knows the
1890
                // closing event, however this may terminate other streams over
1891
                // the connection as well.
1892
0
                _rtmpsock->SetFailed(EFAILEDSOCKET, "Fail to send closeStream/deleteStream");
1893
0
            }
1894
0
        }
1895
0
    }
1896
0
    policy::RtmpContext* ctx =
1897
0
        static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
1898
0
    if (ctx != NULL) {
1899
0
        if (!ctx->RemoveMessageStream(this)) {
1900
            // The stream is not registered yet. Is this normal?
1901
0
            LOG(ERROR) << "Fail to remove stream_id=" << _message_stream_id;
1902
0
        }
1903
0
    } else {
1904
0
        LOG(FATAL) << "RtmpContext of " << *_rtmpsock << " is NULL";
1905
0
    }
1906
0
    if (_from_socketmap) {
1907
0
        _client_impl->socket_map().Remove(SocketMapKey(_rtmpsock->remote_side()),
1908
0
                                          _rtmpsock->id());
1909
0
    } else {
1910
0
        _rtmpsock->ReleaseAdditionalReference();
1911
0
    }
1912
0
    CallOnStop();
1913
0
}
1914
1915
RtmpPlayOptions::RtmpPlayOptions()
1916
    : start(-2)
1917
    , duration(-1)
1918
0
    , reset(true) {
1919
0
}
1920
1921
0
int RtmpClientStream::Play(const RtmpPlayOptions& opt) {
1922
0
    if (_rtmpsock == NULL) {
1923
0
        errno = EPERM;
1924
0
        return -1;
1925
0
    }
1926
0
    if (opt.stream_name.empty()) {
1927
0
        LOG(ERROR) << "Empty stream_name";
1928
0
        errno = EINVAL;
1929
0
        return -1;
1930
0
    }
1931
0
    if (_client_impl == NULL) {
1932
0
        LOG(ERROR) << "The client stream is not created yet";
1933
0
        errno = EPERM;
1934
0
        return -1;
1935
0
    }
1936
0
    butil::IOBuf req_buf;
1937
0
    {
1938
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
1939
0
        AMFOutputStream ostream(&zc_stream);
1940
0
        WriteAMFString(RTMP_AMF0_COMMAND_PLAY, &ostream);
1941
0
        WriteAMFUint32(0, &ostream);
1942
0
        WriteAMFNull(&ostream);
1943
0
        WriteAMFString(opt.stream_name, &ostream);
1944
0
        WriteAMFNumber(opt.start, &ostream);
1945
0
        WriteAMFNumber(opt.duration, &ostream);
1946
0
        WriteAMFBool(opt.reset, &ostream);
1947
0
        CHECK(ostream.good());
1948
0
    }
1949
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
1950
0
    msg1->header.message_length = req_buf.size();
1951
0
    msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
1952
0
    msg1->header.stream_id = _message_stream_id;
1953
0
    msg1->chunk_stream_id = _chunk_stream_id;
1954
0
    msg1->body = req_buf;
1955
1956
0
    if (_client_impl->options().buffer_length_ms > 0) {
1957
0
        char data[10];
1958
0
        char* p = data;
1959
0
        policy::WriteBigEndian2Bytes(
1960
0
            &p, policy::RTMP_USER_CONTROL_EVENT_SET_BUFFER_LENGTH);
1961
0
        policy::WriteBigEndian4Bytes(&p, stream_id());
1962
0
        policy::WriteBigEndian4Bytes(&p, _client_impl->options().buffer_length_ms);
1963
0
        policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
1964
0
            policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data));
1965
0
        msg1->next.reset(msg2);
1966
0
    }
1967
    // FIXME(gejun): Do we need to SetChunkSize for play?
1968
    // if (_client_impl->options().chunk_size > policy::RTMP_INITIAL_CHUNK_SIZE) {
1969
    //     if (SetChunkSize(_client_impl->options().chunk_size) != 0) {
1970
    //         return -1;
1971
    //     }
1972
    // }
1973
0
    return _rtmpsock->Write(msg1);
1974
0
}
1975
1976
0
int RtmpClientStream::Play2(const RtmpPlay2Options& opt) {
1977
0
    butil::IOBuf req_buf;
1978
0
    {
1979
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
1980
0
        AMFOutputStream ostream(&zc_stream);
1981
0
        WriteAMFString(RTMP_AMF0_COMMAND_PLAY2, &ostream);
1982
0
        WriteAMFUint32(0, &ostream);
1983
0
        WriteAMFNull(&ostream);
1984
0
        WriteAMFObject(opt, &ostream);
1985
0
        if (!ostream.good()) {
1986
0
            LOG(ERROR) << "Fail to serialize play2 request";
1987
0
            errno = EINVAL;
1988
0
            return -1;
1989
0
        }
1990
0
    }
1991
0
    return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
1992
0
}
1993
1994
0
const char* RtmpPublishType2Str(RtmpPublishType type) {
1995
0
    switch (type) {
1996
0
    case RTMP_PUBLISH_RECORD: return "record";
1997
0
    case RTMP_PUBLISH_APPEND: return "append";
1998
0
    case RTMP_PUBLISH_LIVE:   return "live";
1999
0
    }
2000
0
    return "Unknown RtmpPublishType";
2001
0
}
2002
2003
0
bool Str2RtmpPublishType(const butil::StringPiece& str, RtmpPublishType* type) {
2004
0
    if (str == "record") {
2005
0
        *type = RTMP_PUBLISH_RECORD;
2006
0
        return true;
2007
0
    } else if (str == "append") {
2008
0
        *type = RTMP_PUBLISH_APPEND;
2009
0
        return true;
2010
0
    } else if (str == "live") {
2011
0
        *type = RTMP_PUBLISH_LIVE;
2012
0
        return true;
2013
0
    }
2014
0
    return false;
2015
0
}
2016
2017
int RtmpClientStream::Publish(const butil::StringPiece& name,
2018
0
                              RtmpPublishType type) {
2019
0
    butil::IOBuf req_buf;
2020
0
    {
2021
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
2022
0
        AMFOutputStream ostream(&zc_stream);
2023
0
        WriteAMFString(RTMP_AMF0_COMMAND_PUBLISH, &ostream);
2024
0
        WriteAMFUint32(0, &ostream);
2025
0
        WriteAMFNull(&ostream);
2026
0
        WriteAMFString(name, &ostream);
2027
0
        WriteAMFString(RtmpPublishType2Str(type), &ostream);
2028
0
        CHECK(ostream.good());
2029
0
    }
2030
0
    return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
2031
0
}
2032
2033
0
int RtmpClientStream::Seek(double offset_ms) {
2034
0
    butil::IOBuf req_buf;
2035
0
    {
2036
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
2037
0
        AMFOutputStream ostream(&zc_stream);
2038
0
        WriteAMFString(RTMP_AMF0_COMMAND_SEEK, &ostream);
2039
0
        WriteAMFUint32(0, &ostream);
2040
0
        WriteAMFNull(&ostream);
2041
0
        WriteAMFNumber(offset_ms, &ostream);
2042
0
        CHECK(ostream.good());
2043
0
    }
2044
0
    return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);    
2045
0
}
2046
2047
0
int RtmpClientStream::Pause(bool pause_or_unpause, double offset_ms) {
2048
0
    butil::IOBuf req_buf;
2049
0
    {
2050
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
2051
0
        AMFOutputStream ostream(&zc_stream);
2052
0
        WriteAMFString(RTMP_AMF0_COMMAND_PAUSE, &ostream);
2053
0
        WriteAMFUint32(0, &ostream);
2054
0
        WriteAMFNull(&ostream);
2055
0
        WriteAMFBool(pause_or_unpause, &ostream);
2056
0
        WriteAMFNumber(offset_ms, &ostream);
2057
0
        CHECK(ostream.good());
2058
0
    }
2059
0
    return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
2060
0
}
2061
2062
0
void RtmpClientStream::OnStatus(const RtmpInfo& info) {
2063
0
    if (info.level() == RTMP_INFO_LEVEL_ERROR) {
2064
0
        LOG(WARNING) << remote_side() << '[' << stream_id()
2065
0
                     << "] " << info.code() << ": " << info.description();
2066
0
        return SignalError();
2067
0
    } else if (info.level() == RTMP_INFO_LEVEL_STATUS) {
2068
0
        if ((!_options.play_name.empty() &&
2069
0
             info.code() == RTMP_STATUS_CODE_PLAY_START) ||
2070
0
            (!_options.publish_name.empty() &&
2071
0
             info.code() == RTMP_STATUS_CODE_PUBLISH_START)) {
2072
            // the memory fence makes sure that if _is_server_accepted is true, 
2073
            // publish request must be sent (so that SendXXX functions can
2074
            // be enabled)
2075
0
            _is_server_accepted.store(true, butil::memory_order_release);
2076
0
        }
2077
0
    }
2078
0
}
2079
2080
RtmpClientStreamOptions::RtmpClientStreamOptions()
2081
    : share_connection(true)
2082
    , wait_until_play_or_publish_is_sent(false)
2083
    , create_stream_max_retry(3)
2084
0
    , publish_type(RTMP_PUBLISH_LIVE) {
2085
0
}
2086
2087
class OnClientStreamCreated : public google::protobuf::Closure {
2088
public:
2089
    void Run();  // @Closure
2090
0
    void CancelBeforeCallMethod() { delete this; }
2091
    
2092
public:
2093
    Controller cntl;
2094
    // Hold a reference of stream to prevent it from destructing during an
2095
    // async Create().
2096
    butil::intrusive_ptr<RtmpClientStream> stream;
2097
};
2098
2099
0
void OnClientStreamCreated::Run() {
2100
0
    std::unique_ptr<OnClientStreamCreated> delete_self(this);
2101
0
    if (cntl.Failed()) {
2102
0
        LOG(WARNING) << "Fail to create stream=" << stream->rtmp_url()
2103
0
                     << ": " << cntl.ErrorText();
2104
0
        return;
2105
0
    }
2106
0
    if (stream->_created_stream_with_play_or_publish) {
2107
        // the server accepted the play/publish command packed in createStream
2108
0
        return;
2109
0
    }
2110
0
    const RtmpClientStreamOptions& options = stream->options();
2111
0
    bool do_nothing = true;
2112
0
    if (!options.play_name.empty()) {
2113
0
        do_nothing = false;
2114
0
        RtmpPlayOptions play_opt;
2115
0
        play_opt.stream_name = options.play_name;
2116
0
        if (stream->Play(play_opt) != 0) {
2117
0
            LOG(WARNING) << "Fail to play " << options.play_name;
2118
0
            return stream->SignalError();
2119
0
        }
2120
0
    }
2121
0
    if (!options.publish_name.empty()) {
2122
0
        do_nothing = false;
2123
0
        if (stream->Publish(options.publish_name, options.publish_type) != 0) {
2124
0
            LOG(WARNING) << "Fail to publish " << stream->rtmp_url();
2125
0
            return stream->SignalError();
2126
0
        }
2127
0
    }
2128
0
    if (do_nothing) {
2129
0
        LOG(ERROR) << "play_name and publish_name are both empty";
2130
0
        return stream->SignalError();
2131
0
    }
2132
0
}
2133
2134
void RtmpClientStream::Init(const RtmpClient* client,
2135
0
                            const RtmpClientStreamOptions& options) {
2136
0
    if (client->_impl == NULL) {
2137
0
        LOG(FATAL) << "RtmpClient is not initialized";
2138
0
        return OnStopInternal();
2139
0
    }
2140
0
    {
2141
0
        std::unique_lock<butil::Mutex> mu(_state_mutex);
2142
0
        if (_state == STATE_DESTROYING || _state == STATE_ERROR) {
2143
            // already Destroy()-ed or SignalError()-ed
2144
0
            LOG(WARNING) << "RtmpClientStream=" << this << " was already "
2145
0
                "Destroy()-ed, stop Init()";
2146
0
            return;
2147
0
        }
2148
0
    }
2149
0
    _client_impl = client->_impl;
2150
0
    _options = options;
2151
0
    OnClientStreamCreated* done = new OnClientStreamCreated;
2152
0
    done->stream.reset(this);
2153
0
    done->cntl.set_stream_creator(this);
2154
0
    done->cntl.set_connection_type(_options.share_connection ?
2155
0
                                   CONNECTION_TYPE_SINGLE :
2156
0
                                   CONNECTION_TYPE_SHORT);
2157
0
    _from_socketmap = (done->cntl.connection_type() == CONNECTION_TYPE_SINGLE);
2158
0
    done->cntl.set_max_retry(_options.create_stream_max_retry);
2159
0
    if (_options.hash_code.has_been_set()) {
2160
0
        done->cntl.set_request_code(_options.hash_code);
2161
0
    }
2162
2163
    // Hack: we pass stream as response so that PackRtmpRequest can get
2164
    // the stream from controller.
2165
0
    google::protobuf::Message* res = (google::protobuf::Message*)this;
2166
0
    const CallId call_id = done->cntl.call_id();
2167
0
    {
2168
0
        std::unique_lock<butil::Mutex> mu(_state_mutex);
2169
0
        switch (_state) {
2170
0
        case STATE_UNINITIALIZED:
2171
0
            _state = STATE_CREATING;
2172
0
            _create_stream_rpc_id = call_id;
2173
0
            break;
2174
0
        case STATE_CREATING:
2175
0
        case STATE_CREATED:
2176
0
            mu.unlock();
2177
0
            LOG(ERROR) << "RtmpClientStream::Init() is called by multiple "
2178
0
                "threads simultaneously";
2179
0
            return done->CancelBeforeCallMethod();
2180
0
        case STATE_ERROR:
2181
0
        case STATE_DESTROYING:
2182
0
            mu.unlock();
2183
0
            return done->CancelBeforeCallMethod();
2184
0
        }
2185
0
    }
2186
0
    _client_impl->_chan.CallMethod(NULL, &done->cntl, NULL, res, done);
2187
0
    if (options.wait_until_play_or_publish_is_sent) {
2188
0
        Join(call_id);
2189
0
    }
2190
0
}
2191
2192
0
std::string RtmpClientStream::rtmp_url() const {
2193
0
    if (_client_impl == NULL) {
2194
0
        return std::string();
2195
0
    }
2196
0
    butil::StringPiece tcurl = _client_impl->options().tcUrl;
2197
0
    butil::StringPiece stream_name = _options.stream_name();
2198
0
    std::string result;
2199
0
    result.reserve(tcurl.size() + 1 + stream_name.size());
2200
0
    result.append(tcurl.data(), tcurl.size());
2201
0
    result.push_back('/');
2202
0
    result.append(stream_name.data(), stream_name.size());
2203
0
    return result;
2204
0
}
2205
2206
// ========= RtmpRetryingClientStream ============
2207
2208
RtmpRetryingClientStreamOptions::RtmpRetryingClientStreamOptions()
2209
    : retry_interval_ms(1000)
2210
    , max_retry_duration_ms(-1)
2211
    , fast_retry_count(2)
2212
0
    , quit_when_no_data_ever(true) {
2213
0
}
2214
2215
RtmpRetryingClientStream::RtmpRetryingClientStream()
2216
    : RtmpStreamBase(true)
2217
    , _destroying(false)
2218
    , _called_on_stop(false)
2219
    , _changed_stream(false)
2220
    , _has_timer_ever(false)
2221
    , _is_server_accepted_ever(false)
2222
    , _num_fast_retries(0)
2223
    , _last_creation_time_us(0)
2224
    , _last_retry_start_time_us(0)
2225
    , _create_timer_id(0)
2226
0
    , _sub_stream_creator(NULL) {
2227
0
    get_rtmp_bvars()->retrying_client_stream_count << 1;
2228
0
    _self_ref.reset(this);
2229
0
}
2230
2231
0
RtmpRetryingClientStream::~RtmpRetryingClientStream() {
2232
0
    delete _sub_stream_creator;
2233
0
    _sub_stream_creator = NULL;
2234
0
    get_rtmp_bvars()->retrying_client_stream_count << -1;
2235
0
}
2236
2237
0
void RtmpRetryingClientStream::CallOnStopIfNeeded() {
2238
    // CallOnStop uses locks, we don't need memory fence on _called_on_stop,
2239
    // atomic ops is enough.
2240
0
    if (!_called_on_stop.load(butil::memory_order_relaxed) &&
2241
0
        !_called_on_stop.exchange(true, butil::memory_order_relaxed)) {
2242
0
        CallOnStop();
2243
0
    }
2244
0
}        
2245
2246
0
void RtmpRetryingClientStream::Destroy() {
2247
0
    if (_destroying.exchange(true, butil::memory_order_relaxed)) {
2248
        // Destroy() was already called.
2249
0
        return;
2250
0
    }
2251
2252
    // Make sure _self_ref is released before quiting this function.
2253
    // Notice that _self_ref.reset(NULL) is wrong because it may destructs
2254
    // this object immediately.
2255
0
    butil::intrusive_ptr<RtmpRetryingClientStream> self_ref;
2256
0
    _self_ref.swap(self_ref);
2257
2258
0
    butil::intrusive_ptr<RtmpStreamBase> old_sub_stream;
2259
0
    {
2260
0
        BAIDU_SCOPED_LOCK(_stream_mutex);
2261
        // swap instead of reset(NULL) to make the stream destructed
2262
        // outside _stream_mutex.
2263
0
        _using_sub_stream.swap(old_sub_stream);
2264
0
    }
2265
0
    if (old_sub_stream) {
2266
0
        old_sub_stream->Destroy();
2267
0
    }
2268
    
2269
0
    if (_has_timer_ever) {
2270
0
        if (bthread_timer_del(_create_timer_id) == 0) {
2271
            // The callback is not run yet. Remove the additional ref added
2272
            // before creating the timer.
2273
0
            butil::intrusive_ptr<RtmpRetryingClientStream> deref(this, false);
2274
0
        }
2275
0
    }
2276
0
    return CallOnStopIfNeeded();
2277
0
}
2278
2279
void RtmpRetryingClientStream::Init(
2280
    SubStreamCreator* sub_stream_creator,
2281
0
    const RtmpRetryingClientStreamOptions& options) {
2282
0
    if (sub_stream_creator == NULL) {
2283
0
        LOG(ERROR) << "sub_stream_creator is NULL";
2284
0
        return CallOnStopIfNeeded();
2285
0
    }
2286
0
    _sub_stream_creator = sub_stream_creator;
2287
0
    if (_destroying.load(butil::memory_order_relaxed)) {
2288
0
        LOG(WARNING) << "RtmpRetryingClientStream=" << this << " was already "
2289
0
            "Destroy()-ed, stop Init()";
2290
0
        return;
2291
0
    }
2292
0
    _options = options;
2293
    // retrying stream does not support this option.
2294
0
    _options.wait_until_play_or_publish_is_sent = false;
2295
0
    _last_retry_start_time_us = butil::gettimeofday_us();
2296
0
    Recreate();
2297
0
}
2298
2299
0
void RetryingClientMessageHandler::OnPlayable() {
2300
0
    _parent->OnPlayable();
2301
0
}
2302
2303
0
void RetryingClientMessageHandler::OnUserData(void* msg) {
2304
0
    _parent->CallOnUserData(msg);
2305
0
}
2306
2307
0
void RetryingClientMessageHandler::OnCuePoint(brpc::RtmpCuePoint* cuepoint) {
2308
0
    _parent->CallOnCuePoint(cuepoint);
2309
0
}
2310
2311
0
void RetryingClientMessageHandler::OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) {
2312
0
    _parent->CallOnMetaData(metadata, name);
2313
0
}
2314
2315
0
void RetryingClientMessageHandler::OnAudioMessage(brpc::RtmpAudioMessage* msg) {
2316
0
    _parent->CallOnAudioMessage(msg);
2317
0
}
2318
2319
0
void RetryingClientMessageHandler::OnVideoMessage(brpc::RtmpVideoMessage* msg) {
2320
0
    _parent->CallOnVideoMessage(msg);
2321
0
}
2322
2323
0
void RetryingClientMessageHandler::OnSharedObjectMessage(RtmpSharedObjectMessage* msg) {
2324
0
    _parent->CallOnSharedObjectMessage(msg);
2325
0
}
2326
2327
0
void RetryingClientMessageHandler::OnSubStreamStop(RtmpStreamBase* sub_stream) {
2328
0
    _parent->OnSubStreamStop(sub_stream);
2329
0
}
2330
2331
RetryingClientMessageHandler::RetryingClientMessageHandler(RtmpRetryingClientStream* parent)
2332
0
    : _parent(parent) {}
2333
2334
0
void RtmpRetryingClientStream::Recreate() {
2335
0
    butil::intrusive_ptr<RtmpStreamBase> sub_stream;
2336
0
    _sub_stream_creator->NewSubStream(new RetryingClientMessageHandler(this), &sub_stream);
2337
0
    butil::intrusive_ptr<RtmpStreamBase> old_sub_stream;
2338
0
    bool destroying = false;
2339
0
    {
2340
0
        BAIDU_SCOPED_LOCK(_stream_mutex);
2341
        // Need to check _destroying to avoid setting the new sub_stream to a 
2342
        // destroying retrying stream. 
2343
        // Note: the load of _destroying and the setting of _using_sub_stream 
2344
        // must be in the same lock, otherwise current bthread may be scheduled
2345
        // and Destroy() may be called, making new sub_stream leaked.
2346
0
        destroying = _destroying.load(butil::memory_order_relaxed);
2347
0
        if (!destroying) {
2348
0
            _using_sub_stream.swap(old_sub_stream);
2349
0
            _using_sub_stream = sub_stream;
2350
0
            _changed_stream = true;
2351
0
        }
2352
0
    }
2353
0
    if (old_sub_stream) {
2354
0
        old_sub_stream->Destroy();
2355
0
    }
2356
0
    if (destroying) {
2357
0
        sub_stream->Destroy();
2358
0
        return;
2359
0
    }
2360
0
    _last_creation_time_us = butil::gettimeofday_us();
2361
    // If Init() of sub_stream is called before setting _using_sub_stream,
2362
    // OnStop() may happen before _using_sub_stream is set and the stopped
2363
    // stream is wrongly left in the variable.
2364
     
2365
0
    _sub_stream_creator->LaunchSubStream(sub_stream.get(), &_options);
2366
0
}
2367
2368
0
void RtmpRetryingClientStream::OnRecreateTimer(void* arg) {
2369
    // Hold the referenced stream.
2370
0
    butil::intrusive_ptr<RtmpRetryingClientStream> ptr(
2371
0
        static_cast<RtmpRetryingClientStream*>(arg), false/*not add ref*/);
2372
0
    ptr->Recreate();
2373
0
}
2374
2375
0
void RtmpRetryingClientStream::OnSubStreamStop(RtmpStreamBase* sub_stream) {
2376
    // Make sure the sub_stream is destroyed after this function.
2377
0
    DestroyingPtr<RtmpStreamBase> sub_stream_guard(sub_stream);
2378
    
2379
0
    butil::intrusive_ptr<RtmpStreamBase> removed_sub_stream;
2380
0
    {
2381
0
        BAIDU_SCOPED_LOCK(_stream_mutex);
2382
0
        if (sub_stream == _using_sub_stream) {
2383
0
            _using_sub_stream.swap(removed_sub_stream);
2384
0
        }
2385
0
    }
2386
0
    if (removed_sub_stream == NULL ||
2387
0
        _destroying.load(butil::memory_order_relaxed) ||
2388
0
        _called_on_stop.load(butil::memory_order_relaxed)) {
2389
0
        return;
2390
0
    }
2391
    // Update _is_server_accepted_ever
2392
0
    if (sub_stream->is_server_accepted()) {
2393
0
        _is_server_accepted_ever = true;
2394
0
    }
2395
    
2396
0
    if (_options.max_retry_duration_ms == 0) {
2397
0
        return CallOnStopIfNeeded();
2398
0
    }
2399
    // If the sub_stream has data ever, count this retry as the beginning
2400
    // of RtmpRetryingClientStreamOptions.max_retry_duration_ms.
2401
0
    if ((!_options.play_name.empty() && sub_stream->has_data_ever()) ||
2402
0
        (!_options.publish_name.empty() && sub_stream->is_server_accepted())) {
2403
0
        const int64_t now = butil::gettimeofday_us();
2404
0
        if (now >= _last_retry_start_time_us +
2405
0
            3 * _options.retry_interval_ms * 1000L) {
2406
            // re-enable fast retries when the interval is long enough.
2407
            // `3' is just a randomly-chosen (small) number.
2408
0
            _num_fast_retries = 0;
2409
0
        }
2410
0
        _last_retry_start_time_us = now;
2411
0
    }
2412
    // Check max duration. Notice that this branch cannot be moved forward
2413
    // above branch which may update _last_retry_start_time_us
2414
0
    if (_options.max_retry_duration_ms > 0 &&
2415
0
        butil::gettimeofday_us() >
2416
0
        (_last_retry_start_time_us + _options.max_retry_duration_ms * 1000L)) {
2417
        // exceed the duration, stop retrying.
2418
0
        return CallOnStopIfNeeded();
2419
0
    }
2420
0
    if (_num_fast_retries < _options.fast_retry_count) {
2421
0
        ++_num_fast_retries;
2422
        // Retry immediately for several times. Works for scenarios like:
2423
        // restarting servers, occasional connection lost etc...
2424
0
        return Recreate();
2425
0
    }
2426
0
    if (_options.quit_when_no_data_ever &&
2427
0
        ((!_options.play_name.empty() && !has_data_ever()) ||
2428
0
         (!_options.publish_name.empty() && !_is_server_accepted_ever))) {
2429
        // Stop retrying when created playing streams never have data or
2430
        // publishing streams were never accepted. It's very likely that
2431
        // continuing retrying does not make sense.
2432
0
        return CallOnStopIfNeeded();
2433
0
    }
2434
0
    const int64_t wait_us = _last_creation_time_us +
2435
0
        _options.retry_interval_ms * 1000L - butil::gettimeofday_us();
2436
0
    if (wait_us > 0) {
2437
        // retry is too frequent, schedule the retry.
2438
        // Add a ref for OnRecreateTimer which does deref.
2439
0
        butil::intrusive_ptr<RtmpRetryingClientStream>(this).detach();
2440
0
        if (bthread_timer_add(&_create_timer_id,
2441
0
                              butil::microseconds_from_now(wait_us),
2442
0
                              OnRecreateTimer, this) != 0) {
2443
0
            LOG(ERROR) << "Fail to create timer";
2444
0
            return CallOnStopIfNeeded();
2445
0
        }
2446
0
        _has_timer_ever = true;
2447
0
    } else {
2448
0
        Recreate();
2449
0
    }
2450
0
}
2451
2452
int RtmpRetryingClientStream::AcquireStreamToSend(
2453
0
    butil::intrusive_ptr<RtmpStreamBase>* ptr) {
2454
0
    BAIDU_SCOPED_LOCK(_stream_mutex);
2455
0
    if (!_using_sub_stream) {
2456
0
        errno = EPERM;
2457
0
        return -1;
2458
0
    }
2459
0
    if (!_using_sub_stream->is_server_accepted()) {
2460
        // not published yet.
2461
0
        errno = EPERM;
2462
0
        return -1;
2463
0
    }
2464
0
    if (_changed_stream) {
2465
0
        _changed_stream = false;
2466
0
        errno = ERTMPPUBLISHABLE;
2467
0
        return -1;
2468
0
    }
2469
0
    *ptr = _using_sub_stream;
2470
0
    return 0;
2471
0
}
2472
2473
0
int RtmpRetryingClientStream::SendCuePoint(const RtmpCuePoint& obj) {
2474
0
    butil::intrusive_ptr<RtmpStreamBase> ptr;
2475
0
    if (AcquireStreamToSend(&ptr) != 0) {
2476
0
        return -1;
2477
0
    }
2478
0
    return ptr->SendCuePoint(obj);
2479
0
}
2480
2481
0
int RtmpRetryingClientStream::SendMetaData(const RtmpMetaData& obj, const butil::StringPiece& name) {
2482
0
    butil::intrusive_ptr<RtmpStreamBase> ptr;
2483
0
    if (AcquireStreamToSend(&ptr) != 0) {
2484
0
        return -1;
2485
0
    }
2486
0
    return ptr->SendMetaData(obj, name);
2487
0
}
2488
2489
int RtmpRetryingClientStream::SendSharedObjectMessage(
2490
0
    const RtmpSharedObjectMessage& msg) {
2491
0
    butil::intrusive_ptr<RtmpStreamBase> ptr;
2492
0
    if (AcquireStreamToSend(&ptr) != 0) {
2493
0
        return -1;
2494
0
    }
2495
0
    return ptr->SendSharedObjectMessage(msg);
2496
0
}
2497
2498
0
int RtmpRetryingClientStream::SendAudioMessage(const RtmpAudioMessage& msg) {
2499
0
    butil::intrusive_ptr<RtmpStreamBase> ptr;
2500
0
    if (AcquireStreamToSend(&ptr) != 0) {
2501
0
        return -1;
2502
0
    }
2503
0
    return ptr->SendAudioMessage(msg);
2504
0
}
2505
2506
0
int RtmpRetryingClientStream::SendAACMessage(const RtmpAACMessage& msg) {
2507
0
    butil::intrusive_ptr<RtmpStreamBase> ptr;
2508
0
    if (AcquireStreamToSend(&ptr) != 0) {
2509
0
        return -1;
2510
0
    }
2511
0
    return ptr->SendAACMessage(msg);
2512
0
}
2513
2514
0
int RtmpRetryingClientStream::SendVideoMessage(const RtmpVideoMessage& msg) {
2515
0
    butil::intrusive_ptr<RtmpStreamBase> ptr;
2516
0
    if (AcquireStreamToSend(&ptr) != 0) {
2517
0
        return -1;
2518
0
    }
2519
0
    return ptr->SendVideoMessage(msg);
2520
0
}
2521
2522
0
int RtmpRetryingClientStream::SendAVCMessage(const RtmpAVCMessage& msg) {
2523
0
    butil::intrusive_ptr<RtmpStreamBase> ptr;
2524
0
    if (AcquireStreamToSend(&ptr) != 0) {
2525
0
        return -1;
2526
0
    }
2527
0
    return ptr->SendAVCMessage(msg);
2528
0
}
2529
2530
0
void RtmpRetryingClientStream::StopCurrentStream() {
2531
0
    butil::intrusive_ptr<RtmpStreamBase> sub_stream;
2532
0
    {
2533
0
        BAIDU_SCOPED_LOCK(_stream_mutex);
2534
0
        sub_stream = _using_sub_stream;
2535
0
    }
2536
0
    if (sub_stream) {
2537
0
        sub_stream->SignalError();
2538
0
    }
2539
0
}
2540
2541
0
void RtmpRetryingClientStream::OnPlayable() {}
2542
2543
0
butil::EndPoint RtmpRetryingClientStream::remote_side() const {
2544
0
    {
2545
0
        BAIDU_SCOPED_LOCK(_stream_mutex);
2546
0
        if (_using_sub_stream) {
2547
0
            return _using_sub_stream->remote_side();
2548
0
        }
2549
0
    }
2550
0
    return butil::EndPoint();
2551
0
}
2552
2553
0
butil::EndPoint RtmpRetryingClientStream::local_side() const {
2554
0
    {
2555
0
        BAIDU_SCOPED_LOCK(_stream_mutex);
2556
0
        if (_using_sub_stream) {
2557
0
            return _using_sub_stream->local_side();
2558
0
        }
2559
0
    }
2560
0
    return butil::EndPoint();
2561
0
}
2562
2563
// =========== RtmpService ===============
2564
0
void RtmpService::OnPingResponse(const butil::EndPoint&, uint32_t) {
2565
    // TODO: put into some bvars?
2566
0
}
2567
2568
RtmpServerStream::RtmpServerStream()
2569
    : RtmpStreamBase(false)
2570
    , _client_supports_stream_multiplexing(false)
2571
    , _is_publish(false)
2572
0
    , _onfail_id(INVALID_BTHREAD_ID) {
2573
0
    get_rtmp_bvars()->server_stream_count << 1;
2574
0
}
2575
2576
0
RtmpServerStream::~RtmpServerStream() {
2577
0
    get_rtmp_bvars()->server_stream_count << -1;
2578
0
}
2579
2580
0
void RtmpServerStream::Destroy() {
2581
0
    CHECK(false) << "You're not supposed to call Destroy() for server-side streams";
2582
0
}
2583
2584
void RtmpServerStream::OnPlay(const RtmpPlayOptions& opt,
2585
                              butil::Status* status,
2586
0
                              google::protobuf::Closure* done) {
2587
0
    ClosureGuard done_guard(done);
2588
0
    status->set_error(EPERM, "%s[%u] ignored play{stream_name=%s start=%f"
2589
0
                      " duration=%f reset=%d}",
2590
0
                      butil::endpoint2str(remote_side()).c_str(), stream_id(),
2591
0
                      opt.stream_name.c_str(), opt.start, opt.duration,
2592
0
                      (int)opt.reset);
2593
0
}
2594
2595
0
void RtmpServerStream::OnPlay2(const RtmpPlay2Options& opt) {
2596
0
    LOG(ERROR) << remote_side() << '[' << stream_id()
2597
0
               << "] ignored play2{" << opt.ShortDebugString() << '}';
2598
0
}
2599
2600
void RtmpServerStream::OnPublish(const std::string& name,
2601
                                 RtmpPublishType type,
2602
                                 butil::Status* status,
2603
0
                                 google::protobuf::Closure* done) {
2604
0
    ClosureGuard done_guard(done);
2605
0
    status->set_error(EPERM, "%s[%u] ignored publish{stream_name=%s type=%s}",
2606
0
                      butil::endpoint2str(remote_side()).c_str(), stream_id(),
2607
0
                      name.c_str(), RtmpPublishType2Str(type));
2608
0
}
2609
2610
0
int RtmpServerStream::OnSeek(double offset_ms) {
2611
0
    LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored seek("
2612
0
               << offset_ms << ")";
2613
0
    return -1;
2614
0
}
2615
2616
0
int RtmpServerStream::OnPause(bool pause, double offset_ms) {
2617
0
    LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored "
2618
0
               << (pause ? "pause" : "unpause")
2619
0
               << "(offset_ms=" << offset_ms << ")";
2620
0
    return -1;
2621
0
}
2622
2623
0
void RtmpServerStream::OnSetBufferLength(uint32_t /*buffer_length_ms*/) {}
2624
2625
0
int RtmpServerStream::SendStopMessage(const butil::StringPiece& error_desc) {
2626
0
    if (_rtmpsock == NULL) {
2627
0
        errno = EINVAL;
2628
0
        return -1;
2629
0
    }
2630
0
    if (FLAGS_rtmp_server_close_connection_on_error &&
2631
0
        !_client_supports_stream_multiplexing) {
2632
0
        _rtmpsock->SetFailed(EFAILEDSOCKET, "Close connection because %.*s",
2633
0
                             (int)error_desc.size(), error_desc.data());
2634
        // The purpose is to close the connection, no matter what SetFailed()
2635
        // returns, the operation should be done.
2636
0
        LOG_IF(WARNING, FLAGS_log_error_text)
2637
0
            << "Close connection because " << error_desc;
2638
0
        return 0;
2639
0
    }
2640
2641
    // Send StreamNotFound error to make the client close connections.
2642
    // Works for flashplayer and ffplay(not started playing), not work for SRS
2643
    // and ffplay(started playing)
2644
0
    butil::IOBuf req_buf;
2645
0
    RtmpInfo info;
2646
0
    {
2647
0
        butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
2648
0
        AMFOutputStream ostream(&zc_stream);
2649
0
        WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
2650
0
        WriteAMFUint32(0, &ostream);
2651
0
        WriteAMFNull(&ostream);
2652
0
        if (_is_publish) {
2653
            // NetStream.Publish.Rejected does not work for ffmpeg, works for OBS.
2654
            // NetStream.Publish.BadName does not work for OBS.
2655
            // NetStream.Play.StreamNotFound is not accurate but works for both
2656
            // ffmpeg and OBS.
2657
0
            info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND);
2658
0
        } else {
2659
0
            info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND);
2660
0
        }
2661
0
        info.set_level(RTMP_INFO_LEVEL_ERROR);
2662
0
        if (!error_desc.empty()) {
2663
0
            info.set_description(error_desc.as_string());
2664
0
        }
2665
0
        WriteAMFObject(info, &ostream);
2666
0
    }
2667
0
    SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage);
2668
0
    msg->header.message_length = req_buf.size();
2669
0
    msg->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
2670
0
    msg->header.stream_id = _message_stream_id;
2671
0
    msg->chunk_stream_id = _chunk_stream_id;
2672
0
    msg->body = req_buf;
2673
    
2674
0
    if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg) != 0) {
2675
0
        PLOG_IF(WARNING, errno != EFAILEDSOCKET)
2676
0
            << _rtmpsock->remote_side() << '[' << _message_stream_id
2677
0
            << "]: Fail to send " << info.code() << ": " << error_desc;
2678
0
        return -1;
2679
0
    }
2680
0
    LOG_IF(WARNING, FLAGS_log_error_text)
2681
0
        << _rtmpsock->remote_side() << '[' << _message_stream_id << "]: Sent "
2682
0
        << info.code() << ' ' << error_desc;
2683
0
    return 0;
2684
0
}
2685
2686
// Call this method to send StreamDry to the client.
2687
// Returns 0 on success, -1 otherwise.
2688
0
int RtmpServerStream::SendStreamDry() {
2689
0
    char data[6];
2690
0
    char* p = data;
2691
0
    policy::WriteBigEndian2Bytes(&p, policy::RTMP_USER_CONTROL_EVENT_STREAM_DRY);
2692
0
    policy::WriteBigEndian4Bytes(&p, stream_id());
2693
0
    return SendControlMessage(policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data));
2694
0
}
2695
2696
0
int RtmpServerStream::RunOnFailed(bthread_id_t id, void* data, int) {
2697
0
    butil::intrusive_ptr<RtmpServerStream> stream(
2698
0
        static_cast<RtmpServerStream*>(data), false);
2699
0
    CHECK(stream->_rtmpsock);
2700
0
    stream->OnStopInternal();
2701
0
    bthread_id_unlock_and_destroy(id);
2702
0
    return 0;
2703
0
}
2704
2705
0
void RtmpServerStream::OnStopInternal() {
2706
0
    if (_rtmpsock == NULL) {
2707
0
        return CallOnStop();
2708
0
    }
2709
0
    policy::RtmpContext* ctx =
2710
0
        static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
2711
0
    if (ctx == NULL) {
2712
0
        LOG(FATAL) << _rtmpsock->remote_side() << ": RtmpContext of "
2713
0
                   << *_rtmpsock << " is NULL";
2714
0
        return CallOnStop();
2715
0
    }
2716
0
    if (ctx->RemoveMessageStream(this)) {
2717
0
        return CallOnStop();
2718
0
    }
2719
0
}
2720
2721
541
butil::StringPiece RemoveRtmpPrefix(const butil::StringPiece& url_in) {
2722
541
    if (!url_in.starts_with("rtmp://")) {
2723
508
        return url_in;
2724
508
    }
2725
33
    butil::StringPiece url = url_in;
2726
33
    size_t i = 7;
2727
238
    for (; i < url.size() && url[i] == '/'; ++i);
2728
33
    url.remove_prefix(i);
2729
33
    return url;
2730
541
}
2731
2732
0
butil::StringPiece RemoveProtocolPrefix(const butil::StringPiece& url_in) {
2733
0
    size_t proto_pos = url_in.find("://");
2734
0
    if (proto_pos == butil::StringPiece::npos) {
2735
0
        return url_in;
2736
0
    }
2737
0
    butil::StringPiece url = url_in;
2738
0
    size_t i = proto_pos + 3;
2739
0
    for (; i < url.size() && url[i] == '/'; ++i);
2740
0
    url.remove_prefix(i);
2741
0
    return url;
2742
0
}
2743
2744
void ParseRtmpHostAndPort(const butil::StringPiece& host_and_port,
2745
                          butil::StringPiece* host,
2746
541
                          butil::StringPiece* port) {
2747
541
    size_t colon_pos = host_and_port.find(':');
2748
541
    if (colon_pos == butil::StringPiece::npos) {
2749
419
        if (host) {
2750
419
            *host = host_and_port;
2751
419
        }
2752
419
        if (port) {
2753
419
            *port = "1935";
2754
419
        }
2755
419
    } else {
2756
122
        if (host) {
2757
122
            *host = host_and_port.substr(0, colon_pos);
2758
122
        }
2759
122
        if (port) {
2760
122
            *port = host_and_port.substr(colon_pos + 1);
2761
122
        }
2762
122
    }
2763
541
}
2764
2765
butil::StringPiece RemoveQueryStrings(const butil::StringPiece& stream_name_in,
2766
0
                                     butil::StringPiece* query_strings) {
2767
0
    const size_t qm_pos = stream_name_in.find('?');
2768
0
    if (qm_pos == butil::StringPiece::npos) {
2769
0
        if (query_strings) {
2770
0
            query_strings->clear();
2771
0
        }
2772
0
        return stream_name_in;
2773
0
    } else {
2774
0
        if (query_strings) {
2775
0
            *query_strings = stream_name_in.substr(qm_pos + 1);
2776
0
        }
2777
0
        return stream_name_in.substr(0, qm_pos);
2778
0
    }
2779
0
}
2780
2781
// Split vhost from *app in forms of "APP?vhost=..." and overwrite *host.
2782
static void SplitVHostFromApp(const butil::StringPiece& app_and_vhost,
2783
                              butil::StringPiece* app,
2784
332
                              butil::StringPiece* vhost) {
2785
332
    const size_t q_pos = app_and_vhost.find('?');
2786
332
    if (q_pos == butil::StringPiece::npos) {
2787
138
        if (app) {
2788
138
            *app = app_and_vhost;
2789
138
        }
2790
138
        if (vhost) {
2791
138
            vhost->clear();
2792
138
        }
2793
138
        return;
2794
138
    }
2795
    
2796
194
    if (app) {
2797
194
        *app = app_and_vhost.substr(0, q_pos);
2798
194
    }
2799
194
    if (vhost) {
2800
194
        butil::StringPiece qstr = app_and_vhost.substr(q_pos + 1);
2801
194
        butil::StringSplitter sp(qstr.data(), qstr.data() + qstr.size(), '&');
2802
1.55k
        for (; sp; ++sp) {
2803
1.41k
            butil::StringPiece field(sp.field(), sp.length());
2804
1.41k
            if (field.starts_with("vhost=")) {
2805
53
                *vhost = field.substr(6);
2806
                // vhost cannot have port.
2807
53
                const size_t colon_pos = vhost->find_last_of(':');
2808
53
                if (colon_pos != butil::StringPiece::npos) {
2809
20
                    vhost->remove_suffix(vhost->size() - colon_pos);
2810
20
                }
2811
53
                return;
2812
53
            }
2813
1.41k
        }
2814
141
        vhost->clear();
2815
141
    }
2816
194
}
2817
2818
void ParseRtmpURL(const butil::StringPiece& rtmp_url_in,
2819
                  butil::StringPiece* host,
2820
                  butil::StringPiece* vhost,
2821
                  butil::StringPiece* port,
2822
                  butil::StringPiece* app,
2823
541
                  butil::StringPiece* stream_name) {
2824
541
    if (stream_name) {
2825
541
        stream_name->clear();
2826
541
    }
2827
541
    butil::StringPiece rtmp_url = RemoveRtmpPrefix(rtmp_url_in);
2828
541
    size_t slash1_pos = rtmp_url.find_first_of('/');
2829
541
    if (slash1_pos == butil::StringPiece::npos) {
2830
209
        if (host || port) {
2831
209
            ParseRtmpHostAndPort(rtmp_url, host, port);
2832
209
        }
2833
209
        if (app) {
2834
209
            app->clear();
2835
209
        }
2836
209
        return;
2837
209
    }
2838
332
    if (host || port) {
2839
332
        ParseRtmpHostAndPort(rtmp_url.substr(0, slash1_pos), host, port);
2840
332
    }
2841
    // Remove duplicated slashes.
2842
788
    for (++slash1_pos; slash1_pos < rtmp_url.size() &&
2843
788
             rtmp_url[slash1_pos] == '/'; ++slash1_pos);
2844
332
    rtmp_url.remove_prefix(slash1_pos);
2845
332
    size_t slash2_pos = rtmp_url.find_first_of('/');
2846
332
    if (slash2_pos == butil::StringPiece::npos) {
2847
266
        return SplitVHostFromApp(rtmp_url, app, vhost);
2848
266
    }
2849
66
    SplitVHostFromApp(rtmp_url.substr(0, slash2_pos), app, vhost);
2850
66
    if (stream_name != NULL) {
2851
        // Remove duplicated slashes.
2852
412
        for (++slash2_pos; slash2_pos < rtmp_url.size() &&
2853
412
                 rtmp_url[slash2_pos] == '/'; ++slash2_pos);
2854
66
        rtmp_url.remove_prefix(slash2_pos);
2855
66
        *stream_name = rtmp_url;
2856
66
    }
2857
66
}
2858
2859
std::string MakeRtmpURL(const butil::StringPiece& host,
2860
                        const butil::StringPiece& port,
2861
                        const butil::StringPiece& app,
2862
0
                        const butil::StringPiece& stream_name) {
2863
0
    std::string result;
2864
0
    result.reserve(15 + host.size() + app.size() + stream_name.size());
2865
0
    result.append("rtmp://");
2866
0
    result.append(host.data(), host.size());
2867
0
    if (!port.empty()) {
2868
0
        result.push_back(':');
2869
0
        result.append(port.data(), port.size());
2870
0
    }
2871
0
    if (!app.empty()) {
2872
0
        result.push_back('/');
2873
0
        result.append(app.data(), app.size());
2874
0
    }
2875
0
    if (!stream_name.empty()) {
2876
0
        if (app.empty()) {  // extra / to notify user that app is empty.
2877
0
            result.push_back('/');
2878
0
        }
2879
0
        result.push_back('/');
2880
0
        result.append(stream_name.data(), stream_name.size());
2881
0
    }
2882
0
    return result;
2883
0
}
2884
2885
} // namespace brpc