Coverage Report

Created: 2024-09-11 06:42

/src/brpc/src/brpc/policy/rtmp_protocol.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#ifndef BRPC_POLICY_RTMP_PROTOCOL_H
20
#define BRPC_POLICY_RTMP_PROTOCOL_H
21
22
#include "butil/containers/flat_map.h"
23
#include "brpc/protocol.h"
24
#include "brpc/rtmp.h"
25
#include "brpc/amf.h"
26
#include "brpc/socket.h"
27
28
29
namespace brpc {
30
31
class Server;
32
class RtmpService;
33
class RtmpStreamBase;
34
35
namespace policy {
36
37
const uint32_t RTMP_DEFAULT_CHUNK_SIZE = 60000; // Copy from SRS
38
const uint32_t RTMP_DEFAULT_WINDOW_ACK_SIZE = 2500000; // Copy from SRS
39
const uint32_t RTMP_MAX_CHUNK_STREAM_ID = 65599;
40
const uint32_t RTMP_CHUNK_ARRAY_2ND_SIZE = 256;
41
const uint32_t RTMP_CHUNK_ARRAY_1ST_SIZE =
42
    (RTMP_MAX_CHUNK_STREAM_ID + RTMP_CHUNK_ARRAY_2ND_SIZE)
43
    / RTMP_CHUNK_ARRAY_2ND_SIZE;
44
const uint32_t RTMP_CONTROL_CHUNK_STREAM_ID = 2;
45
const uint32_t RTMP_CONTROL_MESSAGE_STREAM_ID = 0;
46
47
enum RtmpMessageType {
48
    RTMP_MESSAGE_SET_CHUNK_SIZE = 1,
49
    RTMP_MESSAGE_ABORT = 2,
50
    RTMP_MESSAGE_ACK = 3,
51
    RTMP_MESSAGE_USER_CONTROL = 4,
52
    RTMP_MESSAGE_WINDOW_ACK_SIZE = 5,
53
    RTMP_MESSAGE_SET_PEER_BANDWIDTH = 6,
54
    RTMP_MESSAGE_AUDIO = 8,
55
    RTMP_MESSAGE_VIDEO = 9,
56
    RTMP_MESSAGE_DATA_AMF3 = 15,
57
    RTMP_MESSAGE_SHARED_OBJECT_AMF3 = 16,
58
    RTMP_MESSAGE_COMMAND_AMF3 = 17,
59
    RTMP_MESSAGE_DATA_AMF0 = 18,
60
    RTMP_MESSAGE_SHARED_OBJECT_AMF0 = 19,
61
    RTMP_MESSAGE_COMMAND_AMF0 = 20,
62
    RTMP_MESSAGE_AGGREGATE = 22,
63
};
64
65
0
inline bool is_video_frame_type_valid(FlvVideoFrameType t) {
66
0
    return (t >= FLV_VIDEO_FRAME_KEYFRAME && t <= FLV_VIDEO_FRAME_INFOFRAME);
67
0
}
68
69
0
inline bool is_video_codec_valid(FlvVideoCodec id) {
70
0
    return (id >= FLV_VIDEO_JPEG && id <= FLV_VIDEO_HEVC);
71
0
}
72
73
// Get literal form of the message type.
74
const char* messagetype2str(RtmpMessageType);
75
const char* messagetype2str(uint8_t);
76
77
// Define string constants as macros rather than consts because macros
78
// are concatenatable.
79
0
#define RTMP_SIG_FMS_VER                     "3,5,3,888"
80
#define RTMP_SIG_CLIENT_ID                   "ASAICiss"
81
82
0
#define RTMP_STATUS_CODE_CONNECT_SUCCESS     "NetConnection.Connect.Success"
83
0
#define RTMP_STATUS_CODE_CONNECT_REJECTED    "NetConnection.Connect.Rejected"
84
0
#define RTMP_STATUS_CODE_PLAY_RESET          "NetStream.Play.Reset"
85
0
#define RTMP_STATUS_CODE_PLAY_START          "NetStream.Play.Start"
86
0
#define RTMP_STATUS_CODE_STREAM_NOT_FOUND    "NetStream.Play.StreamNotFound"
87
0
#define RTMP_STATUS_CODE_STREAM_PAUSE        "NetStream.Pause.Notify"
88
0
#define RTMP_STATUS_CODE_STREAM_UNPAUSE      "NetStream.Unpause.Notify"
89
0
#define RTMP_STATUS_CODE_PUBLISH_START       "NetStream.Publish.Start"
90
0
#define RTMP_STATUS_CODE_DATA_START          "NetStream.Data.Start"
91
#define RTMP_STATUS_CODE_UNPUBLISH_SUCCESS   "NetStream.Unpublish.Success"
92
0
#define RTMP_STATUS_CODE_STREAM_SEEK         "NetStream.Seek.Notify"
93
94
0
#define RTMP_AMF0_COMMAND_CONNECT            "connect"
95
0
#define RTMP_AMF0_COMMAND_CREATE_STREAM      "createStream"
96
0
#define RTMP_AMF0_COMMAND_CLOSE_STREAM       "closeStream" // SRS has this.
97
0
#define RTMP_AMF0_COMMAND_DELETE_STREAM      "deleteStream"
98
0
#define RTMP_AMF0_COMMAND_PLAY               "play"
99
0
#define RTMP_AMF0_COMMAND_PLAY2              "play2"
100
0
#define RTMP_AMF0_COMMAND_SEEK               "seek"
101
0
#define RTMP_AMF0_COMMAND_PAUSE              "pause"
102
0
#define RTMP_AMF0_COMMAND_ON_BW_DONE         "onBWDone"
103
0
#define RTMP_AMF0_COMMAND_ON_STATUS          "onStatus"
104
0
#define RTMP_AMF0_COMMAND_RESULT             "_result"
105
0
#define RTMP_AMF0_COMMAND_ERROR              "_error"
106
0
#define RTMP_AMF0_COMMAND_RELEASE_STREAM     "releaseStream"
107
0
#define RTMP_AMF0_COMMAND_FC_PUBLISH         "FCPublish"
108
0
#define RTMP_AMF0_COMMAND_FC_UNPUBLISH       "FCUnpublish"
109
0
#define RTMP_AMF0_COMMAND_GET_STREAM_LENGTH  "getStreamLength"
110
0
#define RTMP_AMF0_COMMAND_CHECK_BW           "_checkbw"
111
0
#define RTMP_AMF0_COMMAND_PUBLISH            "publish"
112
0
#define RTMP_AMF0_DATA_SAMPLE_ACCESS         "|RtmpSampleAccess"
113
#define RTMP_AMF0_COMMAND_CALL               "call"
114
0
#define RTMP_AMF0_SET_DATAFRAME              "@setDataFrame"
115
0
#define RTMP_AMF0_ON_META_DATA               "onMetaData"
116
0
#define RTMP_AMF0_ON_CUE_POINT               "onCuePoint"
117
0
#define RTMP_AMF0_SAMPLE_ACCESS              "|RtmpSampleAccess"
118
119
0
#define RTMP_INFO_LEVEL_STATUS               "status"
120
0
#define RTMP_INFO_LEVEL_ERROR                "error"
121
#define RTMP_INFO_LEVEL_WARNING              "warning"
122
123
enum RtmpUserControlEventType {
124
    RTMP_USER_CONTROL_EVENT_STREAM_BEGIN = 0,
125
    RTMP_USER_CONTROL_EVENT_STREAM_EOF = 1,
126
    RTMP_USER_CONTROL_EVENT_STREAM_DRY = 2,
127
    RTMP_USER_CONTROL_EVENT_SET_BUFFER_LENGTH = 3,
128
    RTMP_USER_CONTROL_EVENT_STREAM_IS_RECORDED = 4,
129
    RTMP_USER_CONTROL_EVENT_PING_REQUEST = 6,
130
    RTMP_USER_CONTROL_EVENT_PING_RESPONSE = 7,
131
132
    // Not specified in any official documentation, but is sent by Flash Media
133
    // Server 3.5. Reference: http://repo.or.cz/w/rtmpdump.git/blob/8880d1456b282ee79979adbe7b6a6eb8ad371081:/librtmp/rtmp.c#l2787
134
    // After the server has sent a complete buffer, and sends a Buffer Empty
135
    // message, it will wait until the play duration of that buffer has passed
136
    // before sending a new buffer. The Buffer Ready message will be sent when
137
    // the new buffer starts. (There is no BufferReady message for the very
138
    // first buffer; presumably the Stream Begin message is sufficient for
139
    // that purpose.)
140
    RTMP_USER_CONTROL_EVENT_BUFFER_EMPTY = 31,
141
    RTMP_USER_CONTROL_EVENT_BUFFER_READY = 32,
142
};
143
144
// Header part of a RTMP message.
145
struct RtmpMessageHeader {
146
    uint32_t timestamp;
147
    uint32_t message_length;
148
    uint8_t  message_type;
149
    uint32_t stream_id;
150
151
    RtmpMessageHeader()
152
        : timestamp(0)
153
        , message_length(0)
154
        , message_type(0)
155
0
        , stream_id(RTMP_CONTROL_MESSAGE_STREAM_ID) {
156
0
    }
157
158
0
    bool is_valid() const { return message_type != 0; }
159
};
160
161
class RtmpContext;
162
163
// The intermediate header passing to CutMessageIntoFileDescriptor.
164
class RtmpUnsentMessage : public SocketMessage {
165
public:
166
    RtmpMessageHeader header;
167
    uint32_t chunk_stream_id;
168
    // Set RtmpContext::_chunk_size_out to this in AppendAndDestroySelf()
169
    // if this field is non-zero.
170
    uint32_t new_chunk_size;
171
    butil::IOBuf body;
172
    // If next is not NULL, next->AppendAndDestroySelf() will be called
173
    // recursively. For implementing batched messages.
174
    SocketMessagePtr<RtmpUnsentMessage> next;
175
public:
176
    RtmpUnsentMessage()
177
0
        : chunk_stream_id(0) , new_chunk_size(0), next(NULL) {}
178
    // @SocketMessage
179
    butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
180
};
181
182
// Notice that we can't directly pack CreateStream command in PackRtmpRequest, because 
183
// we need to pack an AMFObject according to ctx->can_stream_be_created_with_play_or_publish(),
184
// which is in the response of Connect command(sent in RtmpConnect::StartConnect).
185
struct RtmpCreateStreamMessage : public SocketMessage {
186
public:
187
    SocketUniquePtr socket;
188
    uint32_t transaction_id;
189
    RtmpClientStreamOptions options;
190
public:
191
0
    explicit RtmpCreateStreamMessage() {}
192
    // @SocketMessage
193
    butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
194
};
195
196
enum RtmpChunkType {
197
    RTMP_CHUNK_TYPE0 = 0,
198
    RTMP_CHUNK_TYPE1 = 1,
199
    RTMP_CHUNK_TYPE2 = 2,
200
    RTMP_CHUNK_TYPE3 = 3
201
};
202
203
// header part of a chunk.
204
struct RtmpBasicHeader {
205
    uint32_t chunk_stream_id;
206
    RtmpChunkType fmt;
207
    uint8_t header_length;
208
};
209
210
// Read big-endian values from buf.
211
uint8_t Read1Byte(const void* buf);
212
uint16_t ReadBigEndian2Bytes(const void* buf);
213
uint32_t ReadBigEndian3Bytes(const void* buf);
214
uint32_t ReadBigEndian4Bytes(const void* buf);
215
// Write values in big-endian into *buf and forward *buf.
216
void Write1Byte(char** buf, uint8_t val);
217
void WriteBigEndian2Bytes(char** buf, uint16_t val);
218
void WriteBigEndian3Bytes(char** buf, uint32_t val);
219
void WriteBigEndian4Bytes(char** buf, uint32_t val);
220
void WriteLittleEndian4Bytes(char** buf, uint32_t val);
221
222
// Append the control message into `msg_buf' which is writable to Socket. 
223
RtmpUnsentMessage* MakeUnsentControlMessage(
224
    uint8_t message_type, const void* body, size_t size);
225
RtmpUnsentMessage* MakeUnsentControlMessage(
226
    uint8_t message_type, const butil::IOBuf& body);
227
228
// The callback associated with a transaction_id.
229
// If the transaction is successfully done, Run() will be called, otherwise
230
// Cancel() will be called.
231
class RtmpTransactionHandler {
232
public:
233
0
    virtual ~RtmpTransactionHandler() {}
234
    virtual void Run(bool error, const RtmpMessageHeader& mh,
235
                     AMFInputStream*, Socket* socket) = 0;
236
    virtual void Cancel() = 0;
237
};
238
239
class RtmpChunkStream;
240
241
// Associated with a RTMP connection.
242
class RtmpContext : public Destroyable {
243
friend class RtmpChunkStream;
244
friend class RtmpUnsentMessage;
245
public:
246
    // States during handshake.
247
    enum State {
248
        STATE_UNINITIALIZED,
249
        STATE_RECEIVED_S0S1,
250
        STATE_RECEIVED_S2,
251
        STATE_RECEIVED_C0C1,
252
        STATE_RECEIVED_C2,
253
    };
254
255
    // Get literal form of the state.
256
    static const char* state2str(State);
257
258
    // One of copt/service must be NULL, indicating this context belongs
259
    // to a server-side or client-side socket.
260
    RtmpContext(const RtmpClientOptions* copt, const Server* server);
261
    ~RtmpContext();
262
263
    // @Destroyable
264
    void Destroy();
265
266
    // Parse `source' from `socket'.
267
    // This method is only called from Protocol.Parse thus does not need
268
    // to be thread-safe.
269
    ParseResult Feed(butil::IOBuf* source, Socket* socket);
270
271
0
    const RtmpClientOptions* client_options() const { return _client_options; }
272
0
    const Server* server() const { return _server; }
273
0
    RtmpService* service() const { return _service; }
274
275
0
    bool is_server_side() const { return service() != NULL; }
276
0
    bool is_client_side() const { return service() == NULL; }
277
278
    // XXXMessageStream may be called from multiple threads(currently not),
279
    // so they're protected by _stream_mutex
280
    
281
    // Find the stream by its id and reference the stream with intrusive_ptr.
282
    // Returns true on success.
283
    bool FindMessageStream(uint32_t stream_id,
284
                           butil::intrusive_ptr<RtmpStreamBase>* stream);
285
286
    // Called in client-side to map the id to stream.
287
    bool AddClientStream(RtmpStreamBase* stream);
288
289
    // Called in server-side to allocate an id and map the id to stream.
290
    bool AddServerStream(RtmpStreamBase* stream);
291
292
    // Remove the stream from mapping.
293
    // Returns true on success.
294
    bool RemoveMessageStream(RtmpStreamBase* stream);
295
296
    // Allocate id for a transaction.
297
    // Returns true on success.
298
    // This method is called in pack_request(for createStream) where is
299
    // accessible by multiple threads. However creating streams is unlikely to
300
    // be very frequent, so this method is simply synchronized by _trans_mutex.
301
    bool AddTransaction(uint32_t* transaction_id,
302
                        RtmpTransactionHandler* handler);
303
    // Remove the transaction associated with the id.
304
    // Return the transaction handler.
305
    RtmpTransactionHandler* RemoveTransaction(uint32_t transaction_id);
306
307
    // Get the chunk stream by its id. The stream is created by need.
308
    RtmpChunkStream* GetChunkStream(uint32_t cs_id);
309
    // Reset the chunk stream associated with the id.
310
    void ClearChunkStream(uint32_t cs_id);
311
312
    // Allocate/deallocate id for a chunk stream.
313
    void AllocateChunkStreamId(uint32_t* chunk_stream_id);
314
    void DeallocateChunkStreamId(uint32_t chunk_stream_id);
315
316
    // Allocate/deallocate id for a message stream.
317
    bool AllocateMessageStreamId(uint32_t* message_stream_id);
318
    void DeallocateMessageStreamId(uint32_t message_stream_id);
319
320
    // Set the callback to be called in OnConnected(). This method should
321
    // be called before initiating the RTMP handshake (sending C0 and C1)
322
0
    void SetConnectCallback(void (*app_connect_done)(int, void*), void* data) {
323
0
        _on_connect = app_connect_done;
324
0
        _on_connect_arg = data;
325
0
    }
326
    // Called when the RTMP connection is established.
327
    void OnConnected(int error_code);
328
0
    bool unconnected() const { return _on_connect != NULL; }
329
330
0
    void only_check_simple_s0s1() { _only_check_simple_s0s1 = true; }
331
    bool can_stream_be_created_with_play_or_publish() const
332
0
    { return _create_stream_with_play_or_publish; }
333
334
    // Call this fn to change _state.
335
    void SetState(const butil::EndPoint& remote_side, State new_state);
336
337
    void set_create_stream_with_play_or_publish(bool create_stream_with_play_or_publish)
338
0
    { _create_stream_with_play_or_publish = create_stream_with_play_or_publish; }
339
340
    void set_simplified_rtmp(bool simplified_rtmp)
341
0
    { _simplified_rtmp = simplified_rtmp; }
342
343
    int SendConnectRequest(const butil::EndPoint& remote_side, int fd, bool simplified_rtmp);
344
345
private:
346
    ParseResult WaitForC0C1orSimpleRtmp(butil::IOBuf* source, Socket* socket);
347
    ParseResult WaitForC2(butil::IOBuf* source, Socket* socket);
348
    ParseResult WaitForS0S1(butil::IOBuf* source, Socket* socket);
349
    ParseResult WaitForS2(butil::IOBuf* source, Socket* socket);
350
    ParseResult OnChunks(butil::IOBuf* source, Socket* socket);
351
352
    // Count received bytes and send ack back if needed.
353
    void AddReceivedBytes(Socket* socket, uint32_t size);
354
355
private:
356
    State _state;
357
    void* _s1_digest;
358
    // Outbound chunksize (inbound chunksize of peer), modifiable by self.
359
    uint32_t _chunk_size_out;
360
    // Inbound chunksize(outbound chunksize of peer), modifiale by peer.
361
    uint32_t _chunk_size_in;
362
    uint32_t _window_ack_size;
363
    uint32_t _nonack_bytes;
364
    uint64_t _received_bytes;
365
    uint32_t _cs_id_allocator;
366
    std::vector<uint32_t> _free_cs_ids;
367
    uint32_t _ms_id_allocator;
368
    std::vector<uint32_t> _free_ms_ids;
369
    // Client-side options.
370
    const RtmpClientOptions* _client_options;
371
    // Callbacks to be called in OnConnected().
372
    void (*_on_connect)(int, void*);
373
    void* _on_connect_arg;
374
    bool _only_check_simple_s0s1;
375
    bool _create_stream_with_play_or_publish;
376
    
377
    // Server and service.
378
    const Server* _server;
379
    RtmpService* _service;
380
    
381
    // Mapping message_stream_id to message streams.
382
    butil::Mutex _stream_mutex;
383
    struct MessageStreamInfo {
384
        butil::intrusive_ptr<RtmpStreamBase> stream;
385
    };
386
    butil::FlatMap<uint32_t, MessageStreamInfo> _mstream_map;
387
388
    // Mapping transaction id to handlers.
389
    butil::Mutex _trans_mutex;
390
    uint32_t _trans_id_allocator;
391
    butil::FlatMap<uint32_t, RtmpTransactionHandler*> _trans_map;
392
393
    RtmpConnectRequest _connect_req;
394
395
    // Map chunk_stream_id to chunk streams.
396
    // The array is 2-level to reduce memory for most connections.
397
    struct SubChunkArray {
398
        butil::atomic<RtmpChunkStream*> ptrs[RTMP_CHUNK_ARRAY_2ND_SIZE];
399
        SubChunkArray();
400
        ~SubChunkArray();
401
    };
402
    butil::atomic<SubChunkArray*> _cstream_ctx[RTMP_CHUNK_ARRAY_1ST_SIZE];
403
404
    bool _simplified_rtmp;
405
};
406
407
class RtmpChunkStream {
408
public:
409
    typedef bool (RtmpChunkStream::*MessageHandler)(
410
        const RtmpMessageHeader& mh, butil::IOBuf* msg_body, Socket* socket);
411
412
    typedef bool (RtmpChunkStream::*CommandHandler)(
413
        const RtmpMessageHeader& mh, AMFInputStream*, Socket* socket);
414
415
public:
416
    RtmpChunkStream(RtmpContext* conn_ctx, uint32_t cs_id);
417
    
418
    ParseResult Feed(const RtmpBasicHeader& bh,
419
                     butil::IOBuf* source, Socket* socket);
420
421
0
    RtmpContext* connection_context() const { return _conn_ctx; }
422
423
0
    uint32_t chunk_stream_id() const { return _cs_id; }
424
425
    int SerializeMessage(butil::IOBuf* buf, const RtmpMessageHeader& mh,
426
                         butil::IOBuf* body);
427
    
428
    bool OnMessage(
429
        const RtmpBasicHeader& bh, const RtmpMessageHeader& mh,
430
        butil::IOBuf* msg_body, Socket* socket);
431
432
    bool OnSetChunkSize(const RtmpMessageHeader& mh,
433
                        butil::IOBuf* msg_body, Socket* socket);
434
    bool OnAbortMessage(const RtmpMessageHeader& mh,
435
                        butil::IOBuf* msg_body, Socket* socket);
436
    bool OnAck(const RtmpMessageHeader& mh,
437
               butil::IOBuf* msg_body, Socket* socket);
438
    bool OnUserControlMessage(const RtmpMessageHeader& mh,
439
                              butil::IOBuf* msg_body, Socket* socket);
440
    bool OnStreamBegin(const RtmpMessageHeader&,
441
                       const butil::StringPiece& event_data, Socket* socket);
442
    bool OnStreamEOF(const RtmpMessageHeader&,
443
                     const butil::StringPiece& event_data, Socket* socket);
444
    bool OnStreamDry(const RtmpMessageHeader&,
445
                     const butil::StringPiece& event_data, Socket* socket);
446
    bool OnSetBufferLength(const RtmpMessageHeader&,
447
                           const butil::StringPiece& event_data, Socket* socket);
448
    bool OnStreamIsRecorded(const RtmpMessageHeader&,
449
                            const butil::StringPiece& event_data, Socket* socket);
450
    bool OnPingRequest(const RtmpMessageHeader&,
451
                       const butil::StringPiece& event_data, Socket* socket);
452
    bool OnPingResponse(const RtmpMessageHeader&,
453
                        const butil::StringPiece& event_data, Socket* socket);
454
    bool OnBufferEmpty(const RtmpMessageHeader&,
455
                       const butil::StringPiece& event_data, Socket* socket);
456
    bool OnBufferReady(const RtmpMessageHeader&,
457
                       const butil::StringPiece& event_data, Socket* socket);
458
    
459
    bool OnWindowAckSize(const RtmpMessageHeader& mh,
460
                         butil::IOBuf* msg_body, Socket* socket);
461
    bool OnSetPeerBandwidth(const RtmpMessageHeader& mh,
462
                            butil::IOBuf* msg_body, Socket* socket);
463
    
464
    bool OnAudioMessage(const RtmpMessageHeader& mh,
465
                        butil::IOBuf* msg_body, Socket* socket);
466
    bool OnVideoMessage(const RtmpMessageHeader& mh,
467
                        butil::IOBuf* msg_body, Socket* socket);
468
    bool OnDataMessageAMF0(const RtmpMessageHeader& mh,
469
                           butil::IOBuf* msg_body, Socket* socket);
470
    bool OnDataMessageAMF3(const RtmpMessageHeader& mh,
471
                           butil::IOBuf* msg_body, Socket* socket);
472
    bool OnSharedObjectMessageAMF0(const RtmpMessageHeader& mh,
473
                                   butil::IOBuf* msg_body, Socket* socket);
474
    bool OnSharedObjectMessageAMF3(const RtmpMessageHeader& mh,
475
                                   butil::IOBuf* msg_body, Socket* socket);
476
    bool OnCommandMessageAMF0(const RtmpMessageHeader& mh,
477
                              butil::IOBuf* msg_body, Socket* socket);
478
    bool OnCommandMessageAMF3(const RtmpMessageHeader& mh,
479
                              butil::IOBuf* msg_body, Socket* socket);
480
    bool OnAggregateMessage(const RtmpMessageHeader& mh,
481
                            butil::IOBuf* msg_body, Socket* socket);
482
483
    bool OnStatus(const RtmpMessageHeader& mh, AMFInputStream* istream,
484
                  Socket* socket);
485
    bool OnConnect(const RtmpMessageHeader& mh, AMFInputStream* istream,
486
                   Socket* socket);
487
    bool OnBWDone(const RtmpMessageHeader& mh, AMFInputStream* istream,
488
                  Socket* socket);
489
    bool OnResult(const RtmpMessageHeader& mh, AMFInputStream* istream,
490
                   Socket* socket);
491
    bool OnError(const RtmpMessageHeader& mh, AMFInputStream* istream,
492
                   Socket* socket);
493
    bool OnPlay(const RtmpMessageHeader& mh, AMFInputStream* istream,
494
                Socket* socket);
495
    bool OnPlay2(const RtmpMessageHeader& mh, AMFInputStream* istream,
496
                 Socket* socket);
497
    bool OnCreateStream(const RtmpMessageHeader& mh, AMFInputStream* istream,
498
                        Socket* socket);
499
    bool OnDeleteStream(const RtmpMessageHeader& mh, AMFInputStream* istream,
500
                        Socket* socket);
501
    bool OnCloseStream(const RtmpMessageHeader& mh, AMFInputStream* istream,
502
                        Socket* socket);
503
    bool OnPublish(const RtmpMessageHeader& mh, AMFInputStream* istream,
504
                   Socket* socket);
505
    bool OnReleaseStream(const RtmpMessageHeader& mh, AMFInputStream* istream,
506
                         Socket* socket);
507
    bool OnFCPublish(const RtmpMessageHeader& mh, AMFInputStream* istream,
508
                     Socket* socket); 
509
    bool OnFCUnpublish(const RtmpMessageHeader& mh, AMFInputStream* istream,
510
                       Socket* socket);
511
    bool OnGetStreamLength(const RtmpMessageHeader& mh, AMFInputStream* istream,
512
                           Socket* socket);
513
    bool OnCheckBW(const RtmpMessageHeader& mh, AMFInputStream* istream,
514
                   Socket* socket);
515
    bool OnSeek(const RtmpMessageHeader& mh, AMFInputStream* istream,
516
                Socket* socket);
517
    bool OnPause(const RtmpMessageHeader& mh, AMFInputStream* istream,
518
                 Socket* socket);
519
    
520
private:
521
    struct ReadParams {
522
        ReadParams();
523
        bool last_has_extended_ts;
524
        bool first_chunk_of_message;
525
        uint32_t last_timestamp_delta;
526
        uint32_t left_message_length;
527
        RtmpMessageHeader last_msg_header;
528
        butil::IOBuf msg_body;
529
    };
530
    struct WriteParams {
531
        WriteParams();
532
        bool last_has_extended_ts;
533
        uint32_t last_timestamp_delta;
534
        RtmpMessageHeader last_msg_header;
535
    };
536
537
    RtmpContext* _conn_ctx;
538
    uint32_t _cs_id;
539
    ReadParams _r;
540
    WriteParams _w;
541
};
542
543
// Parse binary format of rmtp.
544
ParseResult ParseRtmpMessage(butil::IOBuf* source, Socket *socket, bool read_eof,
545
                            const void *arg);
546
547
// no-op placeholder, never be called.
548
void ProcessRtmpMessage(InputMessageBase* msg);
549
550
// Pack createStream message
551
void PackRtmpRequest(butil::IOBuf* buf,
552
                     SocketMessage**,
553
                     uint64_t correlation_id,
554
                     const google::protobuf::MethodDescriptor* method,
555
                     Controller* controller,
556
                     const butil::IOBuf& request,
557
                     const Authenticator* auth);
558
559
// Serialize createStream message
560
void SerializeRtmpRequest(butil::IOBuf* buf,
561
                          Controller* cntl,
562
                          const google::protobuf::Message* request);
563
564
// ============== inline impl. =================
565
// TODO(gejun): impl. do not work for big-endian machines.
566
0
inline uint8_t Read1Byte(const void* void_buf) {
567
0
    return *(const char*)void_buf;
568
0
}
569
0
inline uint16_t ReadBigEndian2Bytes(const void* void_buf) {
570
0
    uint16_t ret = 0;
571
0
    char* p = (char*)&ret;
572
0
    const char* buf = (const char*)void_buf;
573
0
    p[1] = buf[0];
574
0
    p[0] = buf[1];
575
0
    return ret;
576
0
}
577
0
inline uint32_t ReadBigEndian3Bytes(const void* void_buf) {
578
0
    uint32_t ret = 0;
579
0
    char* p = (char*)&ret;
580
0
    const char* buf = (const char*)void_buf;
581
0
    p[3] = 0;
582
0
    p[2] = buf[0];
583
0
    p[1] = buf[1];
584
0
    p[0] = buf[2];
585
0
    return ret;
586
0
}
587
0
inline uint32_t ReadBigEndian4Bytes(const void* void_buf) {
588
0
    uint32_t ret = 0;
589
0
    char* p = (char*)&ret;
590
0
    const char* buf = (const char*)void_buf;
591
0
    p[3] = buf[0];
592
0
    p[2] = buf[1];
593
0
    p[1] = buf[2];
594
0
    p[0] = buf[3];
595
0
    return ret;
596
0
}
597
0
inline void Write1Byte(char** buf, uint8_t val) {
598
0
    char* out = *buf;
599
0
    *out= val;
600
0
    *buf = out + 1;
601
0
}
602
0
inline void WriteBigEndian2Bytes(char** buf, uint16_t val) {
603
0
    const char* p = (const char*)&val;
604
0
    char* out = *buf;
605
0
    out[0] = p[1];
606
0
    out[1] = p[0];
607
0
    *buf = out + 2;
608
0
}
609
0
inline void WriteBigEndian3Bytes(char** buf, uint32_t val) {
610
0
    const char* p = (const char*)&val;
611
0
    CHECK_EQ(p[3], 0);
612
0
    char* out = *buf;
613
0
    out[0] = p[2];
614
0
    out[1] = p[1];
615
0
    out[2] = p[0];
616
0
    *buf = out + 3;
617
0
}
618
0
inline void WriteBigEndian4Bytes(char** buf, uint32_t val) {
619
0
    const char* p = (const char*)&val;
620
0
    char* out = *buf;
621
0
    out[0] = p[3];
622
0
    out[1] = p[2];
623
0
    out[2] = p[1];
624
0
    out[3] = p[0];
625
0
    *buf = out + 4;
626
0
}
627
0
inline void WriteLittleEndian4Bytes(char** buf, uint32_t val) {
628
0
    const char* p = (const char*)&val;
629
0
    char* out = *buf;
630
0
    out[0] = p[0];
631
0
    out[1] = p[1];
632
0
    out[2] = p[2];
633
0
    out[3] = p[3];
634
0
    *buf = out + 4;
635
0
}
636
637
}  // namespace policy
638
} // namespace brpc
639
640
641
#endif  // BRPC_POLICY_RTMP_PROTOCOL_H