/src/brpc/src/brpc/policy/rtmp_protocol.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | |
19 | | #ifndef BRPC_POLICY_RTMP_PROTOCOL_H |
20 | | #define BRPC_POLICY_RTMP_PROTOCOL_H |
21 | | |
22 | | #include "butil/containers/flat_map.h" |
23 | | #include "brpc/protocol.h" |
24 | | #include "brpc/rtmp.h" |
25 | | #include "brpc/amf.h" |
26 | | #include "brpc/socket.h" |
27 | | |
28 | | |
29 | | namespace brpc { |
30 | | |
31 | | class Server; |
32 | | class RtmpService; |
33 | | class RtmpStreamBase; |
34 | | |
35 | | namespace policy { |
36 | | |
37 | | const uint32_t RTMP_DEFAULT_CHUNK_SIZE = 60000; // Copy from SRS |
38 | | const uint32_t RTMP_DEFAULT_WINDOW_ACK_SIZE = 2500000; // Copy from SRS |
39 | | const uint32_t RTMP_MAX_CHUNK_STREAM_ID = 65599; |
40 | | const uint32_t RTMP_CHUNK_ARRAY_2ND_SIZE = 256; |
41 | | const uint32_t RTMP_CHUNK_ARRAY_1ST_SIZE = |
42 | | (RTMP_MAX_CHUNK_STREAM_ID + RTMP_CHUNK_ARRAY_2ND_SIZE) |
43 | | / RTMP_CHUNK_ARRAY_2ND_SIZE; |
44 | | const uint32_t RTMP_CONTROL_CHUNK_STREAM_ID = 2; |
45 | | const uint32_t RTMP_CONTROL_MESSAGE_STREAM_ID = 0; |
46 | | |
47 | | enum RtmpMessageType { |
48 | | RTMP_MESSAGE_SET_CHUNK_SIZE = 1, |
49 | | RTMP_MESSAGE_ABORT = 2, |
50 | | RTMP_MESSAGE_ACK = 3, |
51 | | RTMP_MESSAGE_USER_CONTROL = 4, |
52 | | RTMP_MESSAGE_WINDOW_ACK_SIZE = 5, |
53 | | RTMP_MESSAGE_SET_PEER_BANDWIDTH = 6, |
54 | | RTMP_MESSAGE_AUDIO = 8, |
55 | | RTMP_MESSAGE_VIDEO = 9, |
56 | | RTMP_MESSAGE_DATA_AMF3 = 15, |
57 | | RTMP_MESSAGE_SHARED_OBJECT_AMF3 = 16, |
58 | | RTMP_MESSAGE_COMMAND_AMF3 = 17, |
59 | | RTMP_MESSAGE_DATA_AMF0 = 18, |
60 | | RTMP_MESSAGE_SHARED_OBJECT_AMF0 = 19, |
61 | | RTMP_MESSAGE_COMMAND_AMF0 = 20, |
62 | | RTMP_MESSAGE_AGGREGATE = 22, |
63 | | }; |
64 | | |
65 | 0 | inline bool is_video_frame_type_valid(FlvVideoFrameType t) { |
66 | 0 | return (t >= FLV_VIDEO_FRAME_KEYFRAME && t <= FLV_VIDEO_FRAME_INFOFRAME); |
67 | 0 | } |
68 | | |
69 | 0 | inline bool is_video_codec_valid(FlvVideoCodec id) { |
70 | 0 | return (id >= FLV_VIDEO_JPEG && id <= FLV_VIDEO_HEVC); |
71 | 0 | } |
72 | | |
73 | | // Get literal form of the message type. |
74 | | const char* messagetype2str(RtmpMessageType); |
75 | | const char* messagetype2str(uint8_t); |
76 | | |
77 | | // Define string constants as macros rather than consts because macros |
78 | | // are concatenatable. |
79 | 0 | #define RTMP_SIG_FMS_VER "3,5,3,888" |
80 | | #define RTMP_SIG_CLIENT_ID "ASAICiss" |
81 | | |
82 | 0 | #define RTMP_STATUS_CODE_CONNECT_SUCCESS "NetConnection.Connect.Success" |
83 | 0 | #define RTMP_STATUS_CODE_CONNECT_REJECTED "NetConnection.Connect.Rejected" |
84 | 0 | #define RTMP_STATUS_CODE_PLAY_RESET "NetStream.Play.Reset" |
85 | 0 | #define RTMP_STATUS_CODE_PLAY_START "NetStream.Play.Start" |
86 | 0 | #define RTMP_STATUS_CODE_STREAM_NOT_FOUND "NetStream.Play.StreamNotFound" |
87 | 0 | #define RTMP_STATUS_CODE_STREAM_PAUSE "NetStream.Pause.Notify" |
88 | 0 | #define RTMP_STATUS_CODE_STREAM_UNPAUSE "NetStream.Unpause.Notify" |
89 | 0 | #define RTMP_STATUS_CODE_PUBLISH_START "NetStream.Publish.Start" |
90 | 0 | #define RTMP_STATUS_CODE_DATA_START "NetStream.Data.Start" |
91 | | #define RTMP_STATUS_CODE_UNPUBLISH_SUCCESS "NetStream.Unpublish.Success" |
92 | 0 | #define RTMP_STATUS_CODE_STREAM_SEEK "NetStream.Seek.Notify" |
93 | | |
94 | 0 | #define RTMP_AMF0_COMMAND_CONNECT "connect" |
95 | 0 | #define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream" |
96 | 0 | #define RTMP_AMF0_COMMAND_CLOSE_STREAM "closeStream" // SRS has this. |
97 | 0 | #define RTMP_AMF0_COMMAND_DELETE_STREAM "deleteStream" |
98 | 0 | #define RTMP_AMF0_COMMAND_PLAY "play" |
99 | 0 | #define RTMP_AMF0_COMMAND_PLAY2 "play2" |
100 | 0 | #define RTMP_AMF0_COMMAND_SEEK "seek" |
101 | 0 | #define RTMP_AMF0_COMMAND_PAUSE "pause" |
102 | 0 | #define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone" |
103 | 0 | #define RTMP_AMF0_COMMAND_ON_STATUS "onStatus" |
104 | 0 | #define RTMP_AMF0_COMMAND_RESULT "_result" |
105 | 0 | #define RTMP_AMF0_COMMAND_ERROR "_error" |
106 | 0 | #define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream" |
107 | 0 | #define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish" |
108 | 0 | #define RTMP_AMF0_COMMAND_FC_UNPUBLISH "FCUnpublish" |
109 | 0 | #define RTMP_AMF0_COMMAND_GET_STREAM_LENGTH "getStreamLength" |
110 | 0 | #define RTMP_AMF0_COMMAND_CHECK_BW "_checkbw" |
111 | 0 | #define RTMP_AMF0_COMMAND_PUBLISH "publish" |
112 | 0 | #define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess" |
113 | | #define RTMP_AMF0_COMMAND_CALL "call" |
114 | 0 | #define RTMP_AMF0_SET_DATAFRAME "@setDataFrame" |
115 | 0 | #define RTMP_AMF0_ON_META_DATA "onMetaData" |
116 | 0 | #define RTMP_AMF0_ON_CUE_POINT "onCuePoint" |
117 | 0 | #define RTMP_AMF0_SAMPLE_ACCESS "|RtmpSampleAccess" |
118 | | |
119 | 0 | #define RTMP_INFO_LEVEL_STATUS "status" |
120 | 0 | #define RTMP_INFO_LEVEL_ERROR "error" |
121 | | #define RTMP_INFO_LEVEL_WARNING "warning" |
122 | | |
123 | | enum RtmpUserControlEventType { |
124 | | RTMP_USER_CONTROL_EVENT_STREAM_BEGIN = 0, |
125 | | RTMP_USER_CONTROL_EVENT_STREAM_EOF = 1, |
126 | | RTMP_USER_CONTROL_EVENT_STREAM_DRY = 2, |
127 | | RTMP_USER_CONTROL_EVENT_SET_BUFFER_LENGTH = 3, |
128 | | RTMP_USER_CONTROL_EVENT_STREAM_IS_RECORDED = 4, |
129 | | RTMP_USER_CONTROL_EVENT_PING_REQUEST = 6, |
130 | | RTMP_USER_CONTROL_EVENT_PING_RESPONSE = 7, |
131 | | |
132 | | // Not specified in any official documentation, but is sent by Flash Media |
133 | | // Server 3.5. Reference: http://repo.or.cz/w/rtmpdump.git/blob/8880d1456b282ee79979adbe7b6a6eb8ad371081:/librtmp/rtmp.c#l2787 |
134 | | // After the server has sent a complete buffer, and sends a Buffer Empty |
135 | | // message, it will wait until the play duration of that buffer has passed |
136 | | // before sending a new buffer. The Buffer Ready message will be sent when |
137 | | // the new buffer starts. (There is no BufferReady message for the very |
138 | | // first buffer; presumably the Stream Begin message is sufficient for |
139 | | // that purpose.) |
140 | | RTMP_USER_CONTROL_EVENT_BUFFER_EMPTY = 31, |
141 | | RTMP_USER_CONTROL_EVENT_BUFFER_READY = 32, |
142 | | }; |
143 | | |
144 | | // Header part of a RTMP message. |
145 | | struct RtmpMessageHeader { |
146 | | uint32_t timestamp; |
147 | | uint32_t message_length; |
148 | | uint8_t message_type; |
149 | | uint32_t stream_id; |
150 | | |
151 | | RtmpMessageHeader() |
152 | | : timestamp(0) |
153 | | , message_length(0) |
154 | | , message_type(0) |
155 | 0 | , stream_id(RTMP_CONTROL_MESSAGE_STREAM_ID) { |
156 | 0 | } |
157 | | |
158 | 0 | bool is_valid() const { return message_type != 0; } |
159 | | }; |
160 | | |
161 | | class RtmpContext; |
162 | | |
163 | | // The intermediate header passing to CutMessageIntoFileDescriptor. |
164 | | class RtmpUnsentMessage : public SocketMessage { |
165 | | public: |
166 | | RtmpMessageHeader header; |
167 | | uint32_t chunk_stream_id; |
168 | | // Set RtmpContext::_chunk_size_out to this in AppendAndDestroySelf() |
169 | | // if this field is non-zero. |
170 | | uint32_t new_chunk_size; |
171 | | butil::IOBuf body; |
172 | | // If next is not NULL, next->AppendAndDestroySelf() will be called |
173 | | // recursively. For implementing batched messages. |
174 | | SocketMessagePtr<RtmpUnsentMessage> next; |
175 | | public: |
176 | | RtmpUnsentMessage() |
177 | 0 | : chunk_stream_id(0) , new_chunk_size(0), next(NULL) {} |
178 | | // @SocketMessage |
179 | | butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*); |
180 | | }; |
181 | | |
182 | | // Notice that we can't directly pack CreateStream command in PackRtmpRequest, because |
183 | | // we need to pack an AMFObject according to ctx->can_stream_be_created_with_play_or_publish(), |
184 | | // which is in the response of Connect command(sent in RtmpConnect::StartConnect). |
185 | | struct RtmpCreateStreamMessage : public SocketMessage { |
186 | | public: |
187 | | SocketUniquePtr socket; |
188 | | uint32_t transaction_id; |
189 | | RtmpClientStreamOptions options; |
190 | | public: |
191 | 0 | explicit RtmpCreateStreamMessage() {} |
192 | | // @SocketMessage |
193 | | butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*); |
194 | | }; |
195 | | |
196 | | enum RtmpChunkType { |
197 | | RTMP_CHUNK_TYPE0 = 0, |
198 | | RTMP_CHUNK_TYPE1 = 1, |
199 | | RTMP_CHUNK_TYPE2 = 2, |
200 | | RTMP_CHUNK_TYPE3 = 3 |
201 | | }; |
202 | | |
203 | | // header part of a chunk. |
204 | | struct RtmpBasicHeader { |
205 | | uint32_t chunk_stream_id; |
206 | | RtmpChunkType fmt; |
207 | | uint8_t header_length; |
208 | | }; |
209 | | |
210 | | // Read big-endian values from buf. |
211 | | uint8_t Read1Byte(const void* buf); |
212 | | uint16_t ReadBigEndian2Bytes(const void* buf); |
213 | | uint32_t ReadBigEndian3Bytes(const void* buf); |
214 | | uint32_t ReadBigEndian4Bytes(const void* buf); |
215 | | // Write values in big-endian into *buf and forward *buf. |
216 | | void Write1Byte(char** buf, uint8_t val); |
217 | | void WriteBigEndian2Bytes(char** buf, uint16_t val); |
218 | | void WriteBigEndian3Bytes(char** buf, uint32_t val); |
219 | | void WriteBigEndian4Bytes(char** buf, uint32_t val); |
220 | | void WriteLittleEndian4Bytes(char** buf, uint32_t val); |
221 | | |
222 | | // Append the control message into `msg_buf' which is writable to Socket. |
223 | | RtmpUnsentMessage* MakeUnsentControlMessage( |
224 | | uint8_t message_type, const void* body, size_t size); |
225 | | RtmpUnsentMessage* MakeUnsentControlMessage( |
226 | | uint8_t message_type, const butil::IOBuf& body); |
227 | | |
228 | | // The callback associated with a transaction_id. |
229 | | // If the transaction is successfully done, Run() will be called, otherwise |
230 | | // Cancel() will be called. |
231 | | class RtmpTransactionHandler { |
232 | | public: |
233 | 0 | virtual ~RtmpTransactionHandler() {} |
234 | | virtual void Run(bool error, const RtmpMessageHeader& mh, |
235 | | AMFInputStream*, Socket* socket) = 0; |
236 | | virtual void Cancel() = 0; |
237 | | }; |
238 | | |
239 | | class RtmpChunkStream; |
240 | | |
241 | | // Associated with a RTMP connection. |
242 | | class RtmpContext : public Destroyable { |
243 | | friend class RtmpChunkStream; |
244 | | friend class RtmpUnsentMessage; |
245 | | public: |
246 | | // States during handshake. |
247 | | enum State { |
248 | | STATE_UNINITIALIZED, |
249 | | STATE_RECEIVED_S0S1, |
250 | | STATE_RECEIVED_S2, |
251 | | STATE_RECEIVED_C0C1, |
252 | | STATE_RECEIVED_C2, |
253 | | }; |
254 | | |
255 | | // Get literal form of the state. |
256 | | static const char* state2str(State); |
257 | | |
258 | | // One of copt/service must be NULL, indicating this context belongs |
259 | | // to a server-side or client-side socket. |
260 | | RtmpContext(const RtmpClientOptions* copt, const Server* server); |
261 | | ~RtmpContext(); |
262 | | |
263 | | // @Destroyable |
264 | | void Destroy(); |
265 | | |
266 | | // Parse `source' from `socket'. |
267 | | // This method is only called from Protocol.Parse thus does not need |
268 | | // to be thread-safe. |
269 | | ParseResult Feed(butil::IOBuf* source, Socket* socket); |
270 | | |
271 | 0 | const RtmpClientOptions* client_options() const { return _client_options; } |
272 | 0 | const Server* server() const { return _server; } |
273 | 0 | RtmpService* service() const { return _service; } |
274 | | |
275 | 0 | bool is_server_side() const { return service() != NULL; } |
276 | 0 | bool is_client_side() const { return service() == NULL; } |
277 | | |
278 | | // XXXMessageStream may be called from multiple threads(currently not), |
279 | | // so they're protected by _stream_mutex |
280 | | |
281 | | // Find the stream by its id and reference the stream with intrusive_ptr. |
282 | | // Returns true on success. |
283 | | bool FindMessageStream(uint32_t stream_id, |
284 | | butil::intrusive_ptr<RtmpStreamBase>* stream); |
285 | | |
286 | | // Called in client-side to map the id to stream. |
287 | | bool AddClientStream(RtmpStreamBase* stream); |
288 | | |
289 | | // Called in server-side to allocate an id and map the id to stream. |
290 | | bool AddServerStream(RtmpStreamBase* stream); |
291 | | |
292 | | // Remove the stream from mapping. |
293 | | // Returns true on success. |
294 | | bool RemoveMessageStream(RtmpStreamBase* stream); |
295 | | |
296 | | // Allocate id for a transaction. |
297 | | // Returns true on success. |
298 | | // This method is called in pack_request(for createStream) where is |
299 | | // accessible by multiple threads. However creating streams is unlikely to |
300 | | // be very frequent, so this method is simply synchronized by _trans_mutex. |
301 | | bool AddTransaction(uint32_t* transaction_id, |
302 | | RtmpTransactionHandler* handler); |
303 | | // Remove the transaction associated with the id. |
304 | | // Return the transaction handler. |
305 | | RtmpTransactionHandler* RemoveTransaction(uint32_t transaction_id); |
306 | | |
307 | | // Get the chunk stream by its id. The stream is created by need. |
308 | | RtmpChunkStream* GetChunkStream(uint32_t cs_id); |
309 | | // Reset the chunk stream associated with the id. |
310 | | void ClearChunkStream(uint32_t cs_id); |
311 | | |
312 | | // Allocate/deallocate id for a chunk stream. |
313 | | void AllocateChunkStreamId(uint32_t* chunk_stream_id); |
314 | | void DeallocateChunkStreamId(uint32_t chunk_stream_id); |
315 | | |
316 | | // Allocate/deallocate id for a message stream. |
317 | | bool AllocateMessageStreamId(uint32_t* message_stream_id); |
318 | | void DeallocateMessageStreamId(uint32_t message_stream_id); |
319 | | |
320 | | // Set the callback to be called in OnConnected(). This method should |
321 | | // be called before initiating the RTMP handshake (sending C0 and C1) |
322 | 0 | void SetConnectCallback(void (*app_connect_done)(int, void*), void* data) { |
323 | 0 | _on_connect = app_connect_done; |
324 | 0 | _on_connect_arg = data; |
325 | 0 | } |
326 | | // Called when the RTMP connection is established. |
327 | | void OnConnected(int error_code); |
328 | 0 | bool unconnected() const { return _on_connect != NULL; } |
329 | | |
330 | 0 | void only_check_simple_s0s1() { _only_check_simple_s0s1 = true; } |
331 | | bool can_stream_be_created_with_play_or_publish() const |
332 | 0 | { return _create_stream_with_play_or_publish; } |
333 | | |
334 | | // Call this fn to change _state. |
335 | | void SetState(const butil::EndPoint& remote_side, State new_state); |
336 | | |
337 | | void set_create_stream_with_play_or_publish(bool create_stream_with_play_or_publish) |
338 | 0 | { _create_stream_with_play_or_publish = create_stream_with_play_or_publish; } |
339 | | |
340 | | void set_simplified_rtmp(bool simplified_rtmp) |
341 | 0 | { _simplified_rtmp = simplified_rtmp; } |
342 | | |
343 | | int SendConnectRequest(const butil::EndPoint& remote_side, int fd, bool simplified_rtmp); |
344 | | |
345 | | private: |
346 | | ParseResult WaitForC0C1orSimpleRtmp(butil::IOBuf* source, Socket* socket); |
347 | | ParseResult WaitForC2(butil::IOBuf* source, Socket* socket); |
348 | | ParseResult WaitForS0S1(butil::IOBuf* source, Socket* socket); |
349 | | ParseResult WaitForS2(butil::IOBuf* source, Socket* socket); |
350 | | ParseResult OnChunks(butil::IOBuf* source, Socket* socket); |
351 | | |
352 | | // Count received bytes and send ack back if needed. |
353 | | void AddReceivedBytes(Socket* socket, uint32_t size); |
354 | | |
355 | | private: |
356 | | State _state; |
357 | | void* _s1_digest; |
358 | | // Outbound chunksize (inbound chunksize of peer), modifiable by self. |
359 | | uint32_t _chunk_size_out; |
360 | | // Inbound chunksize(outbound chunksize of peer), modifiale by peer. |
361 | | uint32_t _chunk_size_in; |
362 | | uint32_t _window_ack_size; |
363 | | uint32_t _nonack_bytes; |
364 | | uint64_t _received_bytes; |
365 | | uint32_t _cs_id_allocator; |
366 | | std::vector<uint32_t> _free_cs_ids; |
367 | | uint32_t _ms_id_allocator; |
368 | | std::vector<uint32_t> _free_ms_ids; |
369 | | // Client-side options. |
370 | | const RtmpClientOptions* _client_options; |
371 | | // Callbacks to be called in OnConnected(). |
372 | | void (*_on_connect)(int, void*); |
373 | | void* _on_connect_arg; |
374 | | bool _only_check_simple_s0s1; |
375 | | bool _create_stream_with_play_or_publish; |
376 | | |
377 | | // Server and service. |
378 | | const Server* _server; |
379 | | RtmpService* _service; |
380 | | |
381 | | // Mapping message_stream_id to message streams. |
382 | | butil::Mutex _stream_mutex; |
383 | | struct MessageStreamInfo { |
384 | | butil::intrusive_ptr<RtmpStreamBase> stream; |
385 | | }; |
386 | | butil::FlatMap<uint32_t, MessageStreamInfo> _mstream_map; |
387 | | |
388 | | // Mapping transaction id to handlers. |
389 | | butil::Mutex _trans_mutex; |
390 | | uint32_t _trans_id_allocator; |
391 | | butil::FlatMap<uint32_t, RtmpTransactionHandler*> _trans_map; |
392 | | |
393 | | RtmpConnectRequest _connect_req; |
394 | | |
395 | | // Map chunk_stream_id to chunk streams. |
396 | | // The array is 2-level to reduce memory for most connections. |
397 | | struct SubChunkArray { |
398 | | butil::atomic<RtmpChunkStream*> ptrs[RTMP_CHUNK_ARRAY_2ND_SIZE]; |
399 | | SubChunkArray(); |
400 | | ~SubChunkArray(); |
401 | | }; |
402 | | butil::atomic<SubChunkArray*> _cstream_ctx[RTMP_CHUNK_ARRAY_1ST_SIZE]; |
403 | | |
404 | | bool _simplified_rtmp; |
405 | | }; |
406 | | |
407 | | class RtmpChunkStream { |
408 | | public: |
409 | | typedef bool (RtmpChunkStream::*MessageHandler)( |
410 | | const RtmpMessageHeader& mh, butil::IOBuf* msg_body, Socket* socket); |
411 | | |
412 | | typedef bool (RtmpChunkStream::*CommandHandler)( |
413 | | const RtmpMessageHeader& mh, AMFInputStream*, Socket* socket); |
414 | | |
415 | | public: |
416 | | RtmpChunkStream(RtmpContext* conn_ctx, uint32_t cs_id); |
417 | | |
418 | | ParseResult Feed(const RtmpBasicHeader& bh, |
419 | | butil::IOBuf* source, Socket* socket); |
420 | | |
421 | 0 | RtmpContext* connection_context() const { return _conn_ctx; } |
422 | | |
423 | 0 | uint32_t chunk_stream_id() const { return _cs_id; } |
424 | | |
425 | | int SerializeMessage(butil::IOBuf* buf, const RtmpMessageHeader& mh, |
426 | | butil::IOBuf* body); |
427 | | |
428 | | bool OnMessage( |
429 | | const RtmpBasicHeader& bh, const RtmpMessageHeader& mh, |
430 | | butil::IOBuf* msg_body, Socket* socket); |
431 | | |
432 | | bool OnSetChunkSize(const RtmpMessageHeader& mh, |
433 | | butil::IOBuf* msg_body, Socket* socket); |
434 | | bool OnAbortMessage(const RtmpMessageHeader& mh, |
435 | | butil::IOBuf* msg_body, Socket* socket); |
436 | | bool OnAck(const RtmpMessageHeader& mh, |
437 | | butil::IOBuf* msg_body, Socket* socket); |
438 | | bool OnUserControlMessage(const RtmpMessageHeader& mh, |
439 | | butil::IOBuf* msg_body, Socket* socket); |
440 | | bool OnStreamBegin(const RtmpMessageHeader&, |
441 | | const butil::StringPiece& event_data, Socket* socket); |
442 | | bool OnStreamEOF(const RtmpMessageHeader&, |
443 | | const butil::StringPiece& event_data, Socket* socket); |
444 | | bool OnStreamDry(const RtmpMessageHeader&, |
445 | | const butil::StringPiece& event_data, Socket* socket); |
446 | | bool OnSetBufferLength(const RtmpMessageHeader&, |
447 | | const butil::StringPiece& event_data, Socket* socket); |
448 | | bool OnStreamIsRecorded(const RtmpMessageHeader&, |
449 | | const butil::StringPiece& event_data, Socket* socket); |
450 | | bool OnPingRequest(const RtmpMessageHeader&, |
451 | | const butil::StringPiece& event_data, Socket* socket); |
452 | | bool OnPingResponse(const RtmpMessageHeader&, |
453 | | const butil::StringPiece& event_data, Socket* socket); |
454 | | bool OnBufferEmpty(const RtmpMessageHeader&, |
455 | | const butil::StringPiece& event_data, Socket* socket); |
456 | | bool OnBufferReady(const RtmpMessageHeader&, |
457 | | const butil::StringPiece& event_data, Socket* socket); |
458 | | |
459 | | bool OnWindowAckSize(const RtmpMessageHeader& mh, |
460 | | butil::IOBuf* msg_body, Socket* socket); |
461 | | bool OnSetPeerBandwidth(const RtmpMessageHeader& mh, |
462 | | butil::IOBuf* msg_body, Socket* socket); |
463 | | |
464 | | bool OnAudioMessage(const RtmpMessageHeader& mh, |
465 | | butil::IOBuf* msg_body, Socket* socket); |
466 | | bool OnVideoMessage(const RtmpMessageHeader& mh, |
467 | | butil::IOBuf* msg_body, Socket* socket); |
468 | | bool OnDataMessageAMF0(const RtmpMessageHeader& mh, |
469 | | butil::IOBuf* msg_body, Socket* socket); |
470 | | bool OnDataMessageAMF3(const RtmpMessageHeader& mh, |
471 | | butil::IOBuf* msg_body, Socket* socket); |
472 | | bool OnSharedObjectMessageAMF0(const RtmpMessageHeader& mh, |
473 | | butil::IOBuf* msg_body, Socket* socket); |
474 | | bool OnSharedObjectMessageAMF3(const RtmpMessageHeader& mh, |
475 | | butil::IOBuf* msg_body, Socket* socket); |
476 | | bool OnCommandMessageAMF0(const RtmpMessageHeader& mh, |
477 | | butil::IOBuf* msg_body, Socket* socket); |
478 | | bool OnCommandMessageAMF3(const RtmpMessageHeader& mh, |
479 | | butil::IOBuf* msg_body, Socket* socket); |
480 | | bool OnAggregateMessage(const RtmpMessageHeader& mh, |
481 | | butil::IOBuf* msg_body, Socket* socket); |
482 | | |
483 | | bool OnStatus(const RtmpMessageHeader& mh, AMFInputStream* istream, |
484 | | Socket* socket); |
485 | | bool OnConnect(const RtmpMessageHeader& mh, AMFInputStream* istream, |
486 | | Socket* socket); |
487 | | bool OnBWDone(const RtmpMessageHeader& mh, AMFInputStream* istream, |
488 | | Socket* socket); |
489 | | bool OnResult(const RtmpMessageHeader& mh, AMFInputStream* istream, |
490 | | Socket* socket); |
491 | | bool OnError(const RtmpMessageHeader& mh, AMFInputStream* istream, |
492 | | Socket* socket); |
493 | | bool OnPlay(const RtmpMessageHeader& mh, AMFInputStream* istream, |
494 | | Socket* socket); |
495 | | bool OnPlay2(const RtmpMessageHeader& mh, AMFInputStream* istream, |
496 | | Socket* socket); |
497 | | bool OnCreateStream(const RtmpMessageHeader& mh, AMFInputStream* istream, |
498 | | Socket* socket); |
499 | | bool OnDeleteStream(const RtmpMessageHeader& mh, AMFInputStream* istream, |
500 | | Socket* socket); |
501 | | bool OnCloseStream(const RtmpMessageHeader& mh, AMFInputStream* istream, |
502 | | Socket* socket); |
503 | | bool OnPublish(const RtmpMessageHeader& mh, AMFInputStream* istream, |
504 | | Socket* socket); |
505 | | bool OnReleaseStream(const RtmpMessageHeader& mh, AMFInputStream* istream, |
506 | | Socket* socket); |
507 | | bool OnFCPublish(const RtmpMessageHeader& mh, AMFInputStream* istream, |
508 | | Socket* socket); |
509 | | bool OnFCUnpublish(const RtmpMessageHeader& mh, AMFInputStream* istream, |
510 | | Socket* socket); |
511 | | bool OnGetStreamLength(const RtmpMessageHeader& mh, AMFInputStream* istream, |
512 | | Socket* socket); |
513 | | bool OnCheckBW(const RtmpMessageHeader& mh, AMFInputStream* istream, |
514 | | Socket* socket); |
515 | | bool OnSeek(const RtmpMessageHeader& mh, AMFInputStream* istream, |
516 | | Socket* socket); |
517 | | bool OnPause(const RtmpMessageHeader& mh, AMFInputStream* istream, |
518 | | Socket* socket); |
519 | | |
520 | | private: |
521 | | struct ReadParams { |
522 | | ReadParams(); |
523 | | bool last_has_extended_ts; |
524 | | bool first_chunk_of_message; |
525 | | uint32_t last_timestamp_delta; |
526 | | uint32_t left_message_length; |
527 | | RtmpMessageHeader last_msg_header; |
528 | | butil::IOBuf msg_body; |
529 | | }; |
530 | | struct WriteParams { |
531 | | WriteParams(); |
532 | | bool last_has_extended_ts; |
533 | | uint32_t last_timestamp_delta; |
534 | | RtmpMessageHeader last_msg_header; |
535 | | }; |
536 | | |
537 | | RtmpContext* _conn_ctx; |
538 | | uint32_t _cs_id; |
539 | | ReadParams _r; |
540 | | WriteParams _w; |
541 | | }; |
542 | | |
543 | | // Parse binary format of rmtp. |
544 | | ParseResult ParseRtmpMessage(butil::IOBuf* source, Socket *socket, bool read_eof, |
545 | | const void *arg); |
546 | | |
547 | | // no-op placeholder, never be called. |
548 | | void ProcessRtmpMessage(InputMessageBase* msg); |
549 | | |
550 | | // Pack createStream message |
551 | | void PackRtmpRequest(butil::IOBuf* buf, |
552 | | SocketMessage**, |
553 | | uint64_t correlation_id, |
554 | | const google::protobuf::MethodDescriptor* method, |
555 | | Controller* controller, |
556 | | const butil::IOBuf& request, |
557 | | const Authenticator* auth); |
558 | | |
559 | | // Serialize createStream message |
560 | | void SerializeRtmpRequest(butil::IOBuf* buf, |
561 | | Controller* cntl, |
562 | | const google::protobuf::Message* request); |
563 | | |
564 | | // ============== inline impl. ================= |
565 | | // TODO(gejun): impl. do not work for big-endian machines. |
566 | 0 | inline uint8_t Read1Byte(const void* void_buf) { |
567 | 0 | return *(const char*)void_buf; |
568 | 0 | } |
569 | 0 | inline uint16_t ReadBigEndian2Bytes(const void* void_buf) { |
570 | 0 | uint16_t ret = 0; |
571 | 0 | char* p = (char*)&ret; |
572 | 0 | const char* buf = (const char*)void_buf; |
573 | 0 | p[1] = buf[0]; |
574 | 0 | p[0] = buf[1]; |
575 | 0 | return ret; |
576 | 0 | } |
577 | 0 | inline uint32_t ReadBigEndian3Bytes(const void* void_buf) { |
578 | 0 | uint32_t ret = 0; |
579 | 0 | char* p = (char*)&ret; |
580 | 0 | const char* buf = (const char*)void_buf; |
581 | 0 | p[3] = 0; |
582 | 0 | p[2] = buf[0]; |
583 | 0 | p[1] = buf[1]; |
584 | 0 | p[0] = buf[2]; |
585 | 0 | return ret; |
586 | 0 | } |
587 | 0 | inline uint32_t ReadBigEndian4Bytes(const void* void_buf) { |
588 | 0 | uint32_t ret = 0; |
589 | 0 | char* p = (char*)&ret; |
590 | 0 | const char* buf = (const char*)void_buf; |
591 | 0 | p[3] = buf[0]; |
592 | 0 | p[2] = buf[1]; |
593 | 0 | p[1] = buf[2]; |
594 | 0 | p[0] = buf[3]; |
595 | 0 | return ret; |
596 | 0 | } |
597 | 0 | inline void Write1Byte(char** buf, uint8_t val) { |
598 | 0 | char* out = *buf; |
599 | 0 | *out= val; |
600 | 0 | *buf = out + 1; |
601 | 0 | } |
602 | 0 | inline void WriteBigEndian2Bytes(char** buf, uint16_t val) { |
603 | 0 | const char* p = (const char*)&val; |
604 | 0 | char* out = *buf; |
605 | 0 | out[0] = p[1]; |
606 | 0 | out[1] = p[0]; |
607 | 0 | *buf = out + 2; |
608 | 0 | } |
609 | 0 | inline void WriteBigEndian3Bytes(char** buf, uint32_t val) { |
610 | 0 | const char* p = (const char*)&val; |
611 | 0 | CHECK_EQ(p[3], 0); |
612 | 0 | char* out = *buf; |
613 | 0 | out[0] = p[2]; |
614 | 0 | out[1] = p[1]; |
615 | 0 | out[2] = p[0]; |
616 | 0 | *buf = out + 3; |
617 | 0 | } |
618 | 0 | inline void WriteBigEndian4Bytes(char** buf, uint32_t val) { |
619 | 0 | const char* p = (const char*)&val; |
620 | 0 | char* out = *buf; |
621 | 0 | out[0] = p[3]; |
622 | 0 | out[1] = p[2]; |
623 | 0 | out[2] = p[1]; |
624 | 0 | out[3] = p[0]; |
625 | 0 | *buf = out + 4; |
626 | 0 | } |
627 | 0 | inline void WriteLittleEndian4Bytes(char** buf, uint32_t val) { |
628 | 0 | const char* p = (const char*)&val; |
629 | 0 | char* out = *buf; |
630 | 0 | out[0] = p[0]; |
631 | 0 | out[1] = p[1]; |
632 | 0 | out[2] = p[2]; |
633 | 0 | out[3] = p[3]; |
634 | 0 | *buf = out + 4; |
635 | 0 | } |
636 | | |
637 | | } // namespace policy |
638 | | } // namespace brpc |
639 | | |
640 | | |
641 | | #endif // BRPC_POLICY_RTMP_PROTOCOL_H |