/src/brpc/src/brpc/rtmp.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | |
19 | | #include <gflags/gflags.h> |
20 | | #include <google/protobuf/io/zero_copy_stream_impl_lite.h> // StringOutputStream |
21 | | #include "bthread/bthread.h" // bthread_id_xx |
22 | | #include "bthread/unstable.h" // bthread_timer_del |
23 | | #include "brpc/log.h" |
24 | | #include "brpc/callback.h" // Closure |
25 | | #include "brpc/channel.h" // Channel |
26 | | #include "brpc/socket_map.h" // SocketMap |
27 | | #include "brpc/socket.h" // Socket |
28 | | #include "brpc/policy/rtmp_protocol.h" // policy::* |
29 | | #include "brpc/rtmp.h" |
30 | | #include "brpc/details/rtmp_utils.h" |
31 | | |
32 | | |
33 | | namespace brpc { |
34 | | |
35 | | DEFINE_bool(rtmp_server_close_connection_on_error, true, |
36 | | "Close the client connection on play/publish errors, clients setting" |
37 | | " RtmpConnectRequest.stream_multiplexing to true are not affected" |
38 | | " by this flag"); |
39 | | |
40 | | struct RtmpBvars { |
41 | | bvar::Adder<int> client_count; |
42 | | bvar::Adder<int> client_stream_count; |
43 | | bvar::Adder<int> retrying_client_stream_count; |
44 | | bvar::Adder<int> server_stream_count; |
45 | | |
46 | | RtmpBvars() |
47 | | : client_count("rtmp_client_count") |
48 | | , client_stream_count("rtmp_client_stream_count") |
49 | | , retrying_client_stream_count("rtmp_retrying_client_stream_count") |
50 | 0 | , server_stream_count("rtmp_server_stream_count") { |
51 | 0 | } |
52 | | }; |
53 | 0 | inline RtmpBvars* get_rtmp_bvars() { |
54 | 0 | return butil::get_leaky_singleton<RtmpBvars>(); |
55 | 0 | } |
56 | | |
57 | | namespace policy { |
58 | | int SendC0C1(int fd, bool* is_simple_handshake); |
59 | | int WriteWithoutOvercrowded(Socket*, SocketMessagePtr<>& msg); |
60 | | } |
61 | | |
62 | | FlvWriter::FlvWriter(butil::IOBuf* buf) |
63 | 0 | : _write_header(false), _buf(buf), _options() { |
64 | 0 | } |
65 | | |
66 | | FlvWriter::FlvWriter(butil::IOBuf* buf, const FlvWriterOptions& options) |
67 | 0 | : _write_header(false), _buf(buf), _options(options) { |
68 | 0 | } |
69 | | |
70 | 0 | butil::Status FlvWriter::Write(const RtmpVideoMessage& msg) { |
71 | 0 | char buf[32]; |
72 | 0 | char* p = buf; |
73 | 0 | if (!_write_header) { |
74 | 0 | _write_header = true; |
75 | 0 | const char flags_bit = static_cast<char>(_options.flv_content_type); |
76 | 0 | const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 }; |
77 | 0 | memcpy(p, header, sizeof(header)); |
78 | 0 | p += sizeof(header); |
79 | 0 | policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0 |
80 | 0 | } |
81 | | // FLV tag |
82 | 0 | *p++ = FLV_TAG_VIDEO; |
83 | 0 | policy::WriteBigEndian3Bytes(&p, msg.size()); |
84 | 0 | policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF)); |
85 | 0 | *p++ = (msg.timestamp >> 24) & 0xFF; |
86 | 0 | policy::WriteBigEndian3Bytes(&p, 0); // StreamID |
87 | | // header of VIDEODATA |
88 | 0 | *p++ = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF); |
89 | 0 | _buf->append(buf, p - buf); |
90 | 0 | _buf->append(msg.data); |
91 | | // PreviousTagSize |
92 | 0 | p = buf; |
93 | 0 | policy::WriteBigEndian4Bytes(&p, 11 + msg.size()); |
94 | 0 | _buf->append(buf, p - buf); |
95 | 0 | return butil::Status::OK(); |
96 | 0 | } |
97 | | |
98 | 0 | butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) { |
99 | 0 | char buf[32]; |
100 | 0 | char* p = buf; |
101 | 0 | if (!_write_header) { |
102 | 0 | _write_header = true; |
103 | 0 | const char flags_bit = static_cast<char>(_options.flv_content_type); |
104 | 0 | const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 }; |
105 | 0 | memcpy(p, header, sizeof(header)); |
106 | 0 | p += sizeof(header); |
107 | 0 | policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0 |
108 | 0 | } |
109 | | // FLV tag |
110 | 0 | *p++ = FLV_TAG_AUDIO; |
111 | 0 | policy::WriteBigEndian3Bytes(&p, msg.size()); |
112 | 0 | policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF)); |
113 | 0 | *p++ = (msg.timestamp >> 24) & 0xFF; |
114 | 0 | policy::WriteBigEndian3Bytes(&p, 0); // StreamID |
115 | | // header of AUDIODATA |
116 | 0 | *p++ = ((msg.codec & 0xF) << 4) |
117 | 0 | | ((msg.rate & 0x3) << 2) |
118 | 0 | | ((msg.bits & 0x1) << 1) |
119 | 0 | | (msg.type & 0x1); |
120 | 0 | _buf->append(buf, p - buf); |
121 | 0 | _buf->append(msg.data); |
122 | | // PreviousTagSize |
123 | 0 | p = buf; |
124 | 0 | policy::WriteBigEndian4Bytes(&p, 11 + msg.size()); |
125 | 0 | _buf->append(buf, p - buf); |
126 | 0 | return butil::Status::OK(); |
127 | 0 | } |
128 | | |
129 | 0 | butil::Status FlvWriter::WriteScriptData(const butil::IOBuf& req_buf, uint32_t timestamp) { |
130 | 0 | char buf[32]; |
131 | 0 | char* p = buf; |
132 | 0 | if (!_write_header) { |
133 | 0 | _write_header = true; |
134 | 0 | const char flags_bit = static_cast<char>(_options.flv_content_type); |
135 | 0 | const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 }; |
136 | 0 | memcpy(p, header, sizeof(header)); |
137 | 0 | p += sizeof(header); |
138 | 0 | policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0 |
139 | 0 | } |
140 | | // FLV tag |
141 | 0 | *p++ = FLV_TAG_SCRIPT_DATA; |
142 | 0 | policy::WriteBigEndian3Bytes(&p, req_buf.size()); |
143 | 0 | policy::WriteBigEndian3Bytes(&p, (timestamp & 0xFFFFFF)); |
144 | 0 | *p++ = (timestamp >> 24) & 0xFF; |
145 | 0 | policy::WriteBigEndian3Bytes(&p, 0); // StreamID |
146 | 0 | _buf->append(buf, p - buf); |
147 | 0 | _buf->append(req_buf); |
148 | | // PreviousTagSize |
149 | 0 | p = buf; |
150 | 0 | policy::WriteBigEndian4Bytes(&p, 11 + req_buf.size()); |
151 | 0 | _buf->append(buf, p - buf); |
152 | 0 | return butil::Status::OK(); |
153 | 0 | } |
154 | | |
155 | 0 | butil::Status FlvWriter::Write(const RtmpCuePoint& cuepoint) { |
156 | 0 | butil::IOBuf req_buf; |
157 | 0 | { |
158 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
159 | 0 | AMFOutputStream ostream(&zc_stream); |
160 | 0 | WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream); |
161 | 0 | WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream); |
162 | 0 | WriteAMFObject(cuepoint.data, &ostream); |
163 | 0 | if (!ostream.good()) { |
164 | 0 | return butil::Status(EINVAL, "Fail to serialize cuepoint"); |
165 | 0 | } |
166 | 0 | } |
167 | 0 | return WriteScriptData(req_buf, cuepoint.timestamp); |
168 | 0 | } |
169 | | |
170 | 0 | butil::Status FlvWriter::Write(const RtmpMetaData& metadata) { |
171 | 0 | butil::IOBuf req_buf; |
172 | 0 | { |
173 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
174 | 0 | AMFOutputStream ostream(&zc_stream); |
175 | 0 | WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream); |
176 | 0 | WriteAMFObject(metadata.data, &ostream); |
177 | 0 | if (!ostream.good()) { |
178 | 0 | return butil::Status(EINVAL, "Fail to serialize metadata"); |
179 | 0 | } |
180 | 0 | } |
181 | 0 | return WriteScriptData(req_buf, metadata.timestamp); |
182 | 0 | } |
183 | | |
184 | | FlvReader::FlvReader(butil::IOBuf* buf) |
185 | 0 | : _read_header(false), _buf(buf) { |
186 | 0 | } |
187 | | |
188 | 0 | butil::Status FlvReader::ReadHeader() { |
189 | 0 | if (!_read_header) { |
190 | | // 9 is the size of FlvHeader, which is usually composed of |
191 | | // { 'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 0x09 }. |
192 | 0 | char header_buf[9 + 4/* PreviousTagSize0 */]; |
193 | 0 | const char* p = (const char*)_buf->fetch(header_buf, sizeof(header_buf)); |
194 | 0 | if (p == NULL) { |
195 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
196 | 0 | } |
197 | 0 | const char flv_header_signature[3] = { 'F', 'L', 'V' }; |
198 | 0 | if (memcmp(p, flv_header_signature, sizeof(flv_header_signature)) != 0) { |
199 | 0 | LOG(FATAL) << "Fail to parse FLV header"; |
200 | 0 | return butil::Status(EINVAL, "Fail to parse FLV header"); |
201 | 0 | } |
202 | 0 | _buf->pop_front(sizeof(header_buf)); |
203 | 0 | _read_header = true; |
204 | 0 | } |
205 | 0 | return butil::Status::OK(); |
206 | 0 | } |
207 | | |
208 | 0 | butil::Status FlvReader::PeekMessageType(FlvTagType* type_out) { |
209 | 0 | butil::Status st = ReadHeader(); |
210 | 0 | if (!st.ok()) { |
211 | 0 | return st; |
212 | 0 | } |
213 | 0 | const char* p = (const char*)_buf->fetch1(); |
214 | 0 | if (p == NULL) { |
215 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
216 | 0 | } |
217 | 0 | FlvTagType type = (FlvTagType)*p; |
218 | 0 | if (type != FLV_TAG_AUDIO && type != FLV_TAG_VIDEO && |
219 | 0 | type != FLV_TAG_SCRIPT_DATA) { |
220 | 0 | return butil::Status(EINVAL, "Fail to parse FLV tag"); |
221 | 0 | } |
222 | 0 | if (type_out) { |
223 | 0 | *type_out = type; |
224 | 0 | } |
225 | 0 | return butil::Status::OK(); |
226 | 0 | } |
227 | | |
228 | 0 | butil::Status FlvReader::Read(RtmpVideoMessage* msg) { |
229 | 0 | char tags[11]; |
230 | 0 | const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags)); |
231 | 0 | if (p == NULL) { |
232 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
233 | 0 | } |
234 | 0 | if (*p != FLV_TAG_VIDEO) { |
235 | 0 | return butil::Status(EINVAL, "Fail to parse RtmpVideoMessage"); |
236 | 0 | } |
237 | 0 | uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1); |
238 | 0 | uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4); |
239 | 0 | timestamp |= (*(p + 7) << 24); |
240 | 0 | if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) { |
241 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
242 | 0 | } |
243 | 0 | _buf->pop_front(11); |
244 | 0 | char first_byte = 0; |
245 | 0 | CHECK(_buf->cut1(&first_byte)); |
246 | 0 | msg->timestamp = timestamp; |
247 | 0 | msg->frame_type = (FlvVideoFrameType)((first_byte >> 4) & 0xF); |
248 | 0 | msg->codec = (FlvVideoCodec)(first_byte & 0xF); |
249 | | // TODO(zhujiashun): check the validation of frame_type and codec |
250 | 0 | _buf->cutn(&msg->data, msg_size - 1); |
251 | 0 | _buf->pop_front(4/* PreviousTagSize0 */); |
252 | |
|
253 | 0 | return butil::Status::OK(); |
254 | 0 | } |
255 | | |
256 | 0 | butil::Status FlvReader::Read(RtmpAudioMessage* msg) { |
257 | 0 | char tags[11]; |
258 | 0 | const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags)); |
259 | 0 | if (p == NULL) { |
260 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
261 | 0 | } |
262 | 0 | if (*p != FLV_TAG_AUDIO) { |
263 | 0 | return butil::Status(EINVAL, "Fail to parse RtmpAudioMessage"); |
264 | 0 | } |
265 | 0 | uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1); |
266 | 0 | uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4); |
267 | 0 | timestamp |= (*(p + 7) << 24); |
268 | 0 | if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) { |
269 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
270 | 0 | } |
271 | 0 | _buf->pop_front(11); |
272 | 0 | char first_byte = 0; |
273 | 0 | CHECK(_buf->cut1(&first_byte)); |
274 | 0 | msg->timestamp = timestamp; |
275 | 0 | msg->codec = (FlvAudioCodec)((first_byte >> 4) & 0xF); |
276 | 0 | msg->rate = (FlvSoundRate)((first_byte >> 2) & 0x3); |
277 | 0 | msg->bits = (FlvSoundBits)((first_byte >> 1) & 0x1); |
278 | 0 | msg->type = (FlvSoundType)(first_byte & 0x1); |
279 | 0 | _buf->cutn(&msg->data, msg_size - 1); |
280 | 0 | _buf->pop_front(4/* PreviousTagSize0 */); |
281 | |
|
282 | 0 | return butil::Status::OK(); |
283 | 0 | } |
284 | | |
285 | 0 | butil::Status FlvReader::Read(RtmpMetaData* msg, std::string* name) { |
286 | 0 | char tags[11]; |
287 | 0 | const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags)); |
288 | 0 | if (p == NULL) { |
289 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
290 | 0 | } |
291 | 0 | if (*p != FLV_TAG_SCRIPT_DATA) { |
292 | 0 | return butil::Status(EINVAL, "Fail to parse RtmpScriptMessage"); |
293 | 0 | } |
294 | 0 | uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1); |
295 | 0 | uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4); |
296 | 0 | timestamp |= (*(p + 7) << 24); |
297 | 0 | if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) { |
298 | 0 | return butil::Status(EAGAIN, "Fail to read, not enough data"); |
299 | 0 | } |
300 | 0 | _buf->pop_front(11); |
301 | 0 | butil::IOBuf req_buf; |
302 | 0 | _buf->cutn(&req_buf, msg_size); |
303 | 0 | _buf->pop_front(4/* PreviousTagSize0 */); |
304 | 0 | { |
305 | 0 | butil::IOBufAsZeroCopyInputStream zc_stream(req_buf); |
306 | 0 | AMFInputStream istream(&zc_stream); |
307 | 0 | if (!ReadAMFString(name, &istream)) { |
308 | 0 | return butil::Status(EINVAL, "Fail to read AMF string"); |
309 | 0 | } |
310 | 0 | if (!ReadAMFObject(&msg->data, &istream)) { |
311 | 0 | return butil::Status(EINVAL, "Fail to read AMF object"); |
312 | 0 | } |
313 | 0 | } |
314 | 0 | msg->timestamp = timestamp; |
315 | 0 | return butil::Status::OK(); |
316 | 0 | } |
317 | | |
318 | 0 | const char* FlvVideoFrameType2Str(FlvVideoFrameType t) { |
319 | 0 | switch (t) { |
320 | 0 | case FLV_VIDEO_FRAME_KEYFRAME: return "keyframe"; |
321 | 0 | case FLV_VIDEO_FRAME_INTERFRAME: return "interframe"; |
322 | 0 | case FLV_VIDEO_FRAME_DISPOSABLE_INTERFRAME: return "disposable interframe"; |
323 | 0 | case FLV_VIDEO_FRAME_GENERATED_KEYFRAME: return "generated keyframe"; |
324 | 0 | case FLV_VIDEO_FRAME_INFOFRAME: return "info/command frame"; |
325 | 0 | } |
326 | 0 | return "Unknown FlvVideoFrameType"; |
327 | 0 | } |
328 | | |
329 | 0 | const char* FlvVideoCodec2Str(FlvVideoCodec id) { |
330 | 0 | switch (id) { |
331 | 0 | case FLV_VIDEO_JPEG: return "JPEG"; |
332 | 0 | case FLV_VIDEO_SORENSON_H263: return "Sorenson H.263"; |
333 | 0 | case FLV_VIDEO_SCREEN_VIDEO: return "Screen video"; |
334 | 0 | case FLV_VIDEO_ON2_VP6: return "On2 VP6"; |
335 | 0 | case FLV_VIDEO_ON2_VP6_WITH_ALPHA_CHANNEL: |
336 | 0 | return "On2 VP6 with alpha channel"; |
337 | 0 | case FLV_VIDEO_SCREEN_VIDEO_V2: return "Screen video version 2"; |
338 | 0 | case FLV_VIDEO_AVC: return "AVC"; |
339 | 0 | case FLV_VIDEO_HEVC: return "H.265"; |
340 | 0 | } |
341 | 0 | return "Unknown FlvVideoCodec"; |
342 | 0 | } |
343 | | |
344 | 0 | const char* FlvAudioCodec2Str(FlvAudioCodec codec) { |
345 | 0 | switch (codec) { |
346 | 0 | case FLV_AUDIO_LINEAR_PCM_PLATFORM_ENDIAN: |
347 | 0 | return "Linear PCM, platform endian"; |
348 | 0 | case FLV_AUDIO_ADPCM: return "ADPCM"; |
349 | 0 | case FLV_AUDIO_MP3: return "MP3"; |
350 | 0 | case FLV_AUDIO_LINEAR_PCM_LITTLE_ENDIAN: |
351 | 0 | return "Linear PCM, little endian"; |
352 | 0 | case FLV_AUDIO_NELLYMOSER_16KHZ_MONO: |
353 | 0 | return "Nellymoser 16-kHz mono"; |
354 | 0 | case FLV_AUDIO_NELLYMOSER_8KHZ_MONO: |
355 | 0 | return "Nellymoser 8-kHz mono"; |
356 | 0 | case FLV_AUDIO_NELLYMOSER: |
357 | 0 | return "Nellymoser"; |
358 | 0 | case FLV_AUDIO_G711_ALAW_LOGARITHMIC_PCM: |
359 | 0 | return "G.711 A-law logarithmic PCM"; |
360 | 0 | case FLV_AUDIO_G711_MULAW_LOGARITHMIC_PCM: |
361 | 0 | return "G.711 mu-law logarithmic PCM"; |
362 | 0 | case FLV_AUDIO_RESERVED: |
363 | 0 | return "reserved"; |
364 | 0 | case FLV_AUDIO_AAC: return "AAC"; |
365 | 0 | case FLV_AUDIO_SPEEX: return "Speex"; |
366 | 0 | case FLV_AUDIO_MP3_8KHZ: return "MP3 8-Khz"; |
367 | 0 | case FLV_AUDIO_DEVICE_SPECIFIC_SOUND: |
368 | 0 | return "Device-specific sound"; |
369 | 0 | } |
370 | 0 | return "Unknown FlvAudioCodec"; |
371 | 0 | } |
372 | | |
373 | 0 | const char* FlvSoundRate2Str(FlvSoundRate rate) { |
374 | 0 | switch (rate) { |
375 | 0 | case FLV_SOUND_RATE_5512HZ: return "5512"; |
376 | 0 | case FLV_SOUND_RATE_11025HZ: return "11025"; |
377 | 0 | case FLV_SOUND_RATE_22050HZ: return "22050"; |
378 | 0 | case FLV_SOUND_RATE_44100HZ: return "44100"; |
379 | 0 | } |
380 | 0 | return "Unknown FlvSoundRate"; |
381 | 0 | } |
382 | | |
383 | 0 | const char* FlvSoundBits2Str(FlvSoundBits size) { |
384 | 0 | switch (size) { |
385 | 0 | case FLV_SOUND_8BIT: return "8"; |
386 | 0 | case FLV_SOUND_16BIT: return "16"; |
387 | 0 | } |
388 | 0 | return "Unknown FlvSoundBits"; |
389 | 0 | } |
390 | | |
391 | 0 | const char* FlvSoundType2Str(FlvSoundType t) { |
392 | 0 | switch (t) { |
393 | 0 | case FLV_SOUND_MONO: return "mono"; |
394 | 0 | case FLV_SOUND_STEREO: return "stereo"; |
395 | 0 | } |
396 | 0 | return "Unknown FlvSoundType"; |
397 | 0 | } |
398 | | |
399 | 0 | std::ostream& operator<<(std::ostream& os, const RtmpAudioMessage& msg) { |
400 | 0 | return os << "AudioMessage{timestamp=" << msg.timestamp |
401 | 0 | << " codec=" << FlvAudioCodec2Str(msg.codec) |
402 | 0 | << " rate=" << FlvSoundRate2Str(msg.rate) |
403 | 0 | << " bits=" << FlvSoundBits2Str(msg.bits) |
404 | 0 | << " type=" << FlvSoundType2Str(msg.type) |
405 | 0 | << " data=" << butil::ToPrintable(msg.data) << '}'; |
406 | 0 | } |
407 | | |
408 | 0 | std::ostream& operator<<(std::ostream& os, const RtmpVideoMessage& msg) { |
409 | 0 | return os << "VideoMessage{timestamp=" << msg.timestamp |
410 | 0 | << " type=" << FlvVideoFrameType2Str(msg.frame_type) |
411 | 0 | << " codec=" << FlvVideoCodec2Str(msg.codec) |
412 | 0 | << " data=" << butil::ToPrintable(msg.data) << '}'; |
413 | 0 | } |
414 | | |
415 | 0 | butil::Status RtmpAACMessage::Create(const RtmpAudioMessage& msg) { |
416 | 0 | if (msg.codec != FLV_AUDIO_AAC) { |
417 | 0 | return butil::Status(EINVAL, "codec=%s is not AAC", |
418 | 0 | FlvAudioCodec2Str(msg.codec)); |
419 | 0 | } |
420 | 0 | const uint8_t* p = (const uint8_t*)msg.data.fetch1(); |
421 | 0 | if (p == NULL) { |
422 | 0 | return butil::Status(EINVAL, "Not enough data in AudioMessage"); |
423 | 0 | } |
424 | 0 | if (*p > FLV_AAC_PACKET_RAW) { |
425 | 0 | return butil::Status(EINVAL, "Invalid AAC packet_type=%d", (int)*p); |
426 | 0 | } |
427 | 0 | this->timestamp = msg.timestamp; |
428 | 0 | this->rate = msg.rate; |
429 | 0 | this->bits = msg.bits; |
430 | 0 | this->type = msg.type; |
431 | 0 | this->packet_type = (FlvAACPacketType)*p; |
432 | 0 | msg.data.append_to(&data, msg.data.size() - 1, 1); |
433 | 0 | return butil::Status::OK(); |
434 | 0 | } |
435 | | |
436 | | AudioSpecificConfig::AudioSpecificConfig() |
437 | | : aac_object(AAC_OBJECT_UNKNOWN) |
438 | | , aac_sample_rate(0) |
439 | 0 | , aac_channels(0) { |
440 | 0 | } |
441 | | |
442 | 0 | butil::Status AudioSpecificConfig::Create(const butil::IOBuf& buf) { |
443 | 0 | if (buf.size() < 2u) { |
444 | 0 | return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short", |
445 | 0 | (uint64_t)buf.size()); |
446 | 0 | } |
447 | 0 | char tmpbuf[2]; |
448 | 0 | buf.copy_to(tmpbuf, arraysize(tmpbuf)); |
449 | 0 | return Create(tmpbuf, arraysize(tmpbuf)); |
450 | 0 | } |
451 | | |
452 | 0 | butil::Status AudioSpecificConfig::Create(const void* data, size_t len) { |
453 | 0 | if (len < 2u) { |
454 | 0 | return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short", (uint64_t)len); |
455 | 0 | } |
456 | 0 | uint8_t profile_ObjectType = ((const char*)data)[0]; |
457 | 0 | uint8_t samplingFrequencyIndex = ((const char*)data)[1]; |
458 | 0 | aac_channels = (samplingFrequencyIndex >> 3) & 0x0f; |
459 | 0 | aac_sample_rate = ((profile_ObjectType << 1) & 0x0e) | ((samplingFrequencyIndex >> 7) & 0x01); |
460 | 0 | aac_object = (AACObjectType)((profile_ObjectType >> 3) & 0x1f); |
461 | 0 | if (aac_object == AAC_OBJECT_UNKNOWN) { |
462 | 0 | return butil::Status(EINVAL, "Invalid object type"); |
463 | 0 | } |
464 | 0 | return butil::Status::OK(); |
465 | 0 | } |
466 | | |
467 | 0 | bool RtmpAudioMessage::IsAACSequenceHeader() const { |
468 | 0 | if (codec != FLV_AUDIO_AAC) { |
469 | 0 | return false; |
470 | 0 | } |
471 | 0 | const uint8_t* p = (const uint8_t*)data.fetch1(); |
472 | 0 | if (p == NULL) { |
473 | 0 | return false; |
474 | 0 | } |
475 | 0 | return *p == FLV_AAC_PACKET_SEQUENCE_HEADER; |
476 | 0 | } |
477 | | |
478 | 0 | butil::Status RtmpAVCMessage::Create(const RtmpVideoMessage& msg) { |
479 | 0 | if (msg.codec != FLV_VIDEO_AVC) { |
480 | 0 | return butil::Status(EINVAL, "codec=%s is not AVC", |
481 | 0 | FlvVideoCodec2Str(msg.codec)); |
482 | 0 | } |
483 | 0 | uint8_t buf[4]; |
484 | 0 | const uint8_t* p = (const uint8_t*)msg.data.fetch(buf, sizeof(buf)); |
485 | 0 | if (p == NULL) { |
486 | 0 | return butil::Status(EINVAL, "Not enough data in VideoMessage"); |
487 | 0 | } |
488 | 0 | if (*p > FLV_AVC_PACKET_END_OF_SEQUENCE) { |
489 | 0 | return butil::Status(EINVAL, "Invalid AVC packet_type=%d", (int)*p); |
490 | 0 | } |
491 | 0 | this->timestamp = msg.timestamp; |
492 | 0 | this->frame_type = msg.frame_type; |
493 | 0 | this->packet_type = (FlvAVCPacketType)*p; |
494 | 0 | this->composition_time = policy::ReadBigEndian3Bytes(p + 1); |
495 | 0 | msg.data.append_to(&data, msg.data.size() - 4, 4); |
496 | 0 | return butil::Status::OK(); |
497 | 0 | } |
498 | | |
499 | 0 | bool RtmpVideoMessage::IsAVCSequenceHeader() const { |
500 | 0 | if (codec != FLV_VIDEO_AVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) { |
501 | 0 | return false; |
502 | 0 | } |
503 | 0 | const uint8_t* p = (const uint8_t*)data.fetch1(); |
504 | 0 | if (p == NULL) { |
505 | 0 | return false; |
506 | 0 | } |
507 | 0 | return *p == FLV_AVC_PACKET_SEQUENCE_HEADER; |
508 | 0 | } |
509 | | |
510 | 0 | bool RtmpVideoMessage::IsHEVCSequenceHeader() const { |
511 | 0 | if (codec != FLV_VIDEO_HEVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) { |
512 | 0 | return false; |
513 | 0 | } |
514 | 0 | const uint8_t* p = (const uint8_t*)data.fetch1(); |
515 | 0 | if (p == NULL) { |
516 | 0 | return false; |
517 | 0 | } |
518 | 0 | return *p == FLV_AVC_PACKET_SEQUENCE_HEADER; |
519 | 0 | } |
520 | | |
521 | 0 | const char* AVCProfile2Str(AVCProfile p) { |
522 | 0 | switch (p) { |
523 | 0 | case AVC_PROFILE_BASELINE: return "Baseline"; |
524 | 0 | case AVC_PROFILE_CONSTRAINED_BASELINE: return "ConstrainedBaseline"; |
525 | 0 | case AVC_PROFILE_MAIN: return "Main"; |
526 | 0 | case AVC_PROFILE_EXTENDED: return "Extended"; |
527 | 0 | case AVC_PROFILE_HIGH: return "High"; |
528 | 0 | case AVC_PROFILE_HIGH10: return "High10"; |
529 | 0 | case AVC_PROFILE_HIGH10_INTRA: return "High10Intra"; |
530 | 0 | case AVC_PROFILE_HIGH422: return "High422"; |
531 | 0 | case AVC_PROFILE_HIGH422_INTRA: return "High422Intra"; |
532 | 0 | case AVC_PROFILE_HIGH444: return "High444"; |
533 | 0 | case AVC_PROFILE_HIGH444_PREDICTIVE: return "High444Predictive"; |
534 | 0 | case AVC_PROFILE_HIGH444_INTRA: return "High444Intra"; |
535 | 0 | } |
536 | 0 | return "Unknown"; |
537 | 0 | } |
538 | | |
539 | | AVCDecoderConfigurationRecord::AVCDecoderConfigurationRecord() |
540 | | : width(0) |
541 | | , height(0) |
542 | | , avc_profile((AVCProfile)0) |
543 | | , avc_level((AVCLevel)0) |
544 | 0 | , length_size_minus1(-1) { |
545 | 0 | } |
546 | | |
547 | | std::ostream& operator<<(std::ostream& os, |
548 | 0 | const AVCDecoderConfigurationRecord& r) { |
549 | 0 | os << "{profile=" << AVCProfile2Str(r.avc_profile) |
550 | 0 | << " level=" << (int)r.avc_level |
551 | 0 | << " length_size_minus1=" << (int)r.length_size_minus1 |
552 | 0 | << " width=" << r.width |
553 | 0 | << " height=" << r.height |
554 | 0 | << " sps=["; |
555 | 0 | for (size_t i = 0; i < r.sps_list.size(); ++i) { |
556 | 0 | if (i) { |
557 | 0 | os << ' '; |
558 | 0 | } |
559 | 0 | os << r.sps_list[i].size(); |
560 | 0 | } |
561 | 0 | os << "] pps=["; |
562 | 0 | for (size_t i = 0; i < r.pps_list.size(); ++i) { |
563 | 0 | if (i) { |
564 | 0 | os << ' '; |
565 | 0 | } |
566 | 0 | os << r.pps_list[i].size(); |
567 | 0 | } |
568 | 0 | os << "]}"; |
569 | 0 | return os; |
570 | 0 | } |
571 | | |
572 | 0 | butil::Status AVCDecoderConfigurationRecord::Create(const butil::IOBuf& buf) { |
573 | | // the buf should be short generally, copy it out to continuous memory |
574 | | // to simplify parsing. |
575 | 0 | DEFINE_SMALL_ARRAY(char, cont_buf, buf.size(), 64); |
576 | 0 | buf.copy_to(cont_buf, buf.size()); |
577 | 0 | return Create(cont_buf, buf.size()); |
578 | 0 | } |
579 | | |
580 | 0 | butil::Status AVCDecoderConfigurationRecord::Create(const void* data, size_t len) { |
581 | 0 | butil::StringPiece buf((const char*)data, len); |
582 | 0 | if (buf.size() < 6) { |
583 | 0 | return butil::Status(EINVAL, "Length=%lu is not long enough", |
584 | 0 | (unsigned long)buf.size()); |
585 | 0 | } |
586 | | // skip configurationVersion at buf[0] |
587 | 0 | avc_profile = (AVCProfile)buf[1]; |
588 | | // skip profile_compatibility at buf[2] |
589 | 0 | avc_level = (AVCLevel)buf[3]; |
590 | | |
591 | | // 5.3.4.2.1 Syntax, H.264-AVC-ISO_IEC_14496-15.pdf, page 16 |
592 | | // 5.2.4.1 AVC decoder configuration record |
593 | | // 5.2.4.1.2 Semantics |
594 | | // The value of this field shall be one of 0, 1, or 3 corresponding to a |
595 | | // length encoded with 1, 2, or 4 bytes, respectively. |
596 | 0 | length_size_minus1 = buf[4] & 0x03; |
597 | 0 | if (length_size_minus1 == 2) { |
598 | 0 | return butil::Status(EINVAL, "lengthSizeMinusOne should never be 2"); |
599 | 0 | } |
600 | | |
601 | | // Parsing SPS |
602 | 0 | const int num_sps = (int)(buf[5] & 0x1f); |
603 | 0 | buf.remove_prefix(6); |
604 | 0 | sps_list.clear(); |
605 | 0 | sps_list.reserve(num_sps); |
606 | 0 | for (int i = 0; i < num_sps; ++i) { |
607 | 0 | if (buf.size() < 2) { |
608 | 0 | return butil::Status(EINVAL, "Not enough data to decode SPS-length"); |
609 | 0 | } |
610 | 0 | const uint16_t sps_length = policy::ReadBigEndian2Bytes(buf.data()); |
611 | 0 | if (buf.size() < 2u + sps_length) { |
612 | 0 | return butil::Status(EINVAL, "Not enough data to decode SPS"); |
613 | 0 | } |
614 | 0 | if (sps_length > 0) { |
615 | 0 | butil::Status st = ParseSPS(buf.data() + 2, sps_length); |
616 | 0 | if (!st.ok()) { |
617 | 0 | return st; |
618 | 0 | } |
619 | 0 | sps_list.push_back(buf.substr(2, sps_length).as_string()); |
620 | 0 | } |
621 | 0 | buf.remove_prefix(2 + sps_length); |
622 | 0 | } |
623 | | // Parsing PPS |
624 | 0 | pps_list.clear(); |
625 | 0 | if (buf.empty()) { |
626 | 0 | return butil::Status(EINVAL, "Not enough data to decode PPS"); |
627 | 0 | } |
628 | 0 | const int num_pps = (int)buf[0]; |
629 | 0 | buf.remove_prefix(1); |
630 | 0 | for (int i = 0; i < num_pps; ++i) { |
631 | 0 | if (buf.size() < 2) { |
632 | 0 | return butil::Status(EINVAL, "Not enough data to decode PPS-length"); |
633 | 0 | } |
634 | 0 | const uint16_t pps_length = policy::ReadBigEndian2Bytes(buf.data()); |
635 | 0 | if (buf.size() < 2u + pps_length) { |
636 | 0 | return butil::Status(EINVAL, "Not enough data to decode PPS"); |
637 | 0 | } |
638 | 0 | if (pps_length > 0) { |
639 | 0 | pps_list.push_back(buf.substr(2, pps_length).as_string()); |
640 | 0 | } |
641 | 0 | buf.remove_prefix(2 + pps_length); |
642 | 0 | } |
643 | 0 | return butil::Status::OK(); |
644 | 0 | } |
645 | | |
646 | | butil::Status AVCDecoderConfigurationRecord::ParseSPS( |
647 | 0 | const butil::StringPiece& buf, size_t sps_length) { |
648 | | // for NALU, 7.3.1 NAL unit syntax |
649 | | // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61. |
650 | 0 | if (buf.empty()) { |
651 | 0 | return butil::Status(EINVAL, "SPS is empty"); |
652 | 0 | } |
653 | 0 | const int8_t nutv = buf[0]; |
654 | 0 | const int8_t forbidden_zero_bit = (nutv >> 7) & 0x01; |
655 | 0 | if (forbidden_zero_bit) { |
656 | 0 | return butil::Status(EINVAL, "forbidden_zero_bit shall equal 0"); |
657 | 0 | } |
658 | | // nal_ref_idc not equal to 0 specifies that the content of the NAL unit |
659 | | // contains: |
660 | | // a sequence parameter set |
661 | | // or a picture parameter set |
662 | | // or a slice of a reference picture |
663 | | // or a slice data partition of a reference picture. |
664 | 0 | int8_t nal_ref_idc = (nutv >> 5) & 0x03; |
665 | 0 | if (!nal_ref_idc) { |
666 | 0 | return butil::Status(EINVAL, "nal_ref_idc is 0"); |
667 | 0 | } |
668 | | // 7.4.1 NAL unit semantics |
669 | | // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61. |
670 | | // nal_unit_type specifies the type of RBSP data structure contained in |
671 | | // the NAL unit as specified in Table 7-1. |
672 | 0 | const AVCNaluType nal_unit_type = (AVCNaluType)(nutv & 0x1f); |
673 | 0 | if (nal_unit_type != AVC_NALU_SPS) { |
674 | 0 | return butil::Status(EINVAL, "nal_unit_type is not %d", (int)AVC_NALU_SPS); |
675 | 0 | } |
676 | | // Extract the rbsp from sps. |
677 | 0 | DEFINE_SMALL_ARRAY(char, rbsp, sps_length - 1, 64); |
678 | 0 | buf.copy(rbsp, sps_length - 1, 1); |
679 | 0 | size_t rbsp_len = 0; |
680 | 0 | for (size_t i = 1; i < sps_length; ++i) { |
681 | | // XX 00 00 03 XX, the 03 byte should be dropped. |
682 | 0 | if (!(i >= 3 && buf[i - 2] == 0 && buf[i - 1] == 0 && buf[i] == 3)) { |
683 | 0 | rbsp[rbsp_len++] = buf[i]; |
684 | 0 | } |
685 | 0 | } |
686 | | // for SPS, 7.3.2.1.1 Sequence parameter set data syntax |
687 | | // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 62. |
688 | 0 | if (rbsp_len < 3) { |
689 | 0 | return butil::Status(EINVAL, "rbsp must be at least 3 bytes"); |
690 | 0 | } |
691 | | // Decode rbsp. |
692 | 0 | const char* p = rbsp; |
693 | 0 | uint8_t profile_idc = *p++; |
694 | 0 | if (!profile_idc) { |
695 | 0 | return butil::Status(EINVAL, "profile_idc is 0"); |
696 | 0 | } |
697 | 0 | int8_t flags = *p++; |
698 | 0 | if (flags & 0x03) { |
699 | 0 | return butil::Status(EINVAL, "Invalid flags=%d", (int)flags); |
700 | 0 | } |
701 | 0 | uint8_t level_idc = *p++; |
702 | 0 | if (!level_idc) { |
703 | 0 | return butil::Status(EINVAL, "level_idc is 0"); |
704 | 0 | } |
705 | 0 | BitStream bs(p, rbsp + rbsp_len - p); |
706 | 0 | int32_t seq_parameter_set_id = -1; |
707 | 0 | if (avc_nalu_read_uev(&bs, &seq_parameter_set_id) != 0) { |
708 | 0 | return butil::Status(EINVAL, "Fail to read seq_parameter_set_id"); |
709 | 0 | } |
710 | 0 | if (seq_parameter_set_id < 0) { |
711 | 0 | return butil::Status(EINVAL, "Invalid seq_parameter_set_id=%d", |
712 | 0 | (int)seq_parameter_set_id); |
713 | 0 | } |
714 | 0 | int32_t chroma_format_idc = -1; |
715 | 0 | if (profile_idc == 100 || profile_idc == 110 || profile_idc == 122 || |
716 | 0 | profile_idc == 244 || profile_idc == 44 || profile_idc == 83 || |
717 | 0 | profile_idc == 86 || profile_idc == 118 || profile_idc == 128) { |
718 | 0 | if (avc_nalu_read_uev(&bs, &chroma_format_idc) != 0) { |
719 | 0 | return butil::Status(EINVAL, "Fail to read chroma_format_idc"); |
720 | 0 | } |
721 | 0 | if (chroma_format_idc == 3) { |
722 | 0 | int8_t separate_colour_plane_flag = -1; |
723 | 0 | if (avc_nalu_read_bit(&bs, &separate_colour_plane_flag) != 0) { |
724 | 0 | return butil::Status(EINVAL, "Fail to read separate_colour_plane_flag"); |
725 | 0 | } |
726 | 0 | } |
727 | 0 | int32_t bit_depth_luma_minus8 = -1; |
728 | 0 | if (avc_nalu_read_uev(&bs, &bit_depth_luma_minus8) != 0) { |
729 | 0 | return butil::Status(EINVAL, "Fail to read bit_depth_luma_minus8"); |
730 | 0 | } |
731 | 0 | int32_t bit_depth_chroma_minus8 = -1; |
732 | 0 | if (avc_nalu_read_uev(&bs, &bit_depth_chroma_minus8) != 0) { |
733 | 0 | return butil::Status(EINVAL, "Fail to read bit_depth_chroma_minus8"); |
734 | 0 | } |
735 | 0 | int8_t qpprime_y_zero_transform_bypass_flag = -1; |
736 | 0 | if (avc_nalu_read_bit(&bs, &qpprime_y_zero_transform_bypass_flag) != 0) { |
737 | 0 | return butil::Status(EINVAL, "Fail to read qpprime_y_zero_transform_bypass_flag"); |
738 | 0 | } |
739 | 0 | int8_t seq_scaling_matrix_present_flag = -1; |
740 | 0 | if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag) != 0) { |
741 | 0 | return butil::Status(EINVAL, "Fail to read seq_scaling_matrix_present_flag"); |
742 | 0 | } |
743 | 0 | if (seq_scaling_matrix_present_flag) { |
744 | 0 | int nb_scmpfs = (chroma_format_idc != 3 ? 8 : 12); |
745 | 0 | for (int i = 0; i < nb_scmpfs; i++) { |
746 | 0 | int8_t seq_scaling_matrix_present_flag_i = -1; |
747 | 0 | if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag_i)) { |
748 | 0 | return butil::Status(EINVAL, "Fail to read seq_scaling_" |
749 | 0 | "matrix_present_flag[%d]", i); |
750 | 0 | } |
751 | 0 | if (seq_scaling_matrix_present_flag_i) { |
752 | 0 | return butil::Status(EINVAL, "Invalid seq_scaling_matrix_" |
753 | 0 | "present_flag[%d]=%d nb_scmpfs=%d", |
754 | 0 | i, (int)seq_scaling_matrix_present_flag_i, |
755 | 0 | nb_scmpfs); |
756 | 0 | } |
757 | 0 | } |
758 | 0 | } |
759 | 0 | } |
760 | 0 | int32_t log2_max_frame_num_minus4 = -1; |
761 | 0 | if (avc_nalu_read_uev(&bs, &log2_max_frame_num_minus4) != 0) { |
762 | 0 | return butil::Status(EINVAL, "Fail to read log2_max_frame_num_minus4"); |
763 | 0 | } |
764 | 0 | int32_t pic_order_cnt_type = -1; |
765 | 0 | if (avc_nalu_read_uev(&bs, &pic_order_cnt_type) != 0) { |
766 | 0 | return butil::Status(EINVAL, "Fail to read pic_order_cnt_type"); |
767 | 0 | } |
768 | 0 | if (pic_order_cnt_type == 0) { |
769 | 0 | int32_t log2_max_pic_order_cnt_lsb_minus4 = -1; |
770 | 0 | if (avc_nalu_read_uev(&bs, &log2_max_pic_order_cnt_lsb_minus4) != 0) { |
771 | 0 | return butil::Status(EINVAL, "Fail to read log2_max_pic_order_cnt_lsb_minus4"); |
772 | 0 | } |
773 | 0 | } else if (pic_order_cnt_type == 1) { |
774 | 0 | int8_t delta_pic_order_always_zero_flag = -1; |
775 | 0 | if (avc_nalu_read_bit(&bs, &delta_pic_order_always_zero_flag) != 0) { |
776 | 0 | return butil::Status(EINVAL, "Fail to read delta_pic_order_always_zero_flag"); |
777 | 0 | } |
778 | 0 | int32_t offset_for_non_ref_pic = -1; |
779 | 0 | if (avc_nalu_read_uev(&bs, &offset_for_non_ref_pic) != 0) { |
780 | 0 | return butil::Status(EINVAL, "Fail to read offset_for_non_ref_pic"); |
781 | 0 | } |
782 | 0 | int32_t offset_for_top_to_bottom_field = -1; |
783 | 0 | if (avc_nalu_read_uev(&bs, &offset_for_top_to_bottom_field) != 0) { |
784 | 0 | return butil::Status(EINVAL, "Fail to read offset_for_top_to_bottom_field"); |
785 | 0 | } |
786 | 0 | int32_t num_ref_frames_in_pic_order_cnt_cycle = -1; |
787 | 0 | if (avc_nalu_read_uev(&bs, &num_ref_frames_in_pic_order_cnt_cycle) != 0) { |
788 | 0 | return butil::Status(EINVAL, "Fail to read num_ref_frames_in_pic_order_cnt_cycle"); |
789 | 0 | } |
790 | 0 | if (num_ref_frames_in_pic_order_cnt_cycle) { |
791 | 0 | return butil::Status(EINVAL, "Invalid num_ref_frames_in_pic_order_cnt_cycle=%d", |
792 | 0 | num_ref_frames_in_pic_order_cnt_cycle); |
793 | 0 | } |
794 | 0 | } |
795 | 0 | int32_t max_num_ref_frames = -1; |
796 | 0 | if (avc_nalu_read_uev(&bs, &max_num_ref_frames) != 0) { |
797 | 0 | return butil::Status(EINVAL, "Fail to read max_num_ref_frames"); |
798 | 0 | } |
799 | 0 | int8_t gaps_in_frame_num_value_allowed_flag = -1; |
800 | 0 | if (avc_nalu_read_bit(&bs, &gaps_in_frame_num_value_allowed_flag) != 0) { |
801 | 0 | return butil::Status(EINVAL, "Fail to read gaps_in_frame_num_value_allowed_flag"); |
802 | 0 | } |
803 | 0 | int32_t pic_width_in_mbs_minus1 = -1; |
804 | 0 | if (avc_nalu_read_uev(&bs, &pic_width_in_mbs_minus1) != 0) { |
805 | 0 | return butil::Status(EINVAL, "Fail to read pic_width_in_mbs_minus1"); |
806 | 0 | } |
807 | 0 | int32_t pic_height_in_map_units_minus1 = -1; |
808 | 0 | if (avc_nalu_read_uev(&bs, &pic_height_in_map_units_minus1) != 0) { |
809 | 0 | return butil::Status(EINVAL, "Fail to read pic_height_in_map_units_minus1"); |
810 | 0 | } |
811 | 0 | width = (int)(pic_width_in_mbs_minus1 + 1) * 16; |
812 | 0 | height = (int)(pic_height_in_map_units_minus1 + 1) * 16; |
813 | 0 | return butil::Status::OK(); |
814 | 0 | } |
815 | | |
816 | | static bool find_avc_annexb_nalu_start_code(const butil::IOBuf& buf, |
817 | 0 | size_t* start_code_length) { |
818 | 0 | size_t consecutive_zero_count = 0; |
819 | 0 | for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) { |
820 | 0 | char c = *it; |
821 | 0 | if (c == 0) { |
822 | 0 | ++consecutive_zero_count; |
823 | 0 | } else if (c == 1) { |
824 | 0 | if (consecutive_zero_count >= 2) { |
825 | 0 | if (start_code_length) { |
826 | 0 | *start_code_length = consecutive_zero_count + 1; |
827 | 0 | } |
828 | 0 | return true; |
829 | 0 | } |
830 | 0 | return false; |
831 | 0 | } else { |
832 | 0 | return false; |
833 | 0 | } |
834 | 0 | } |
835 | 0 | return false; |
836 | 0 | } |
837 | | |
838 | | static void find_avc_annexb_nalu_stop_code(const butil::IOBuf& buf, |
839 | | size_t* nalu_length_out, |
840 | 0 | size_t* stop_code_length) { |
841 | 0 | size_t nalu_length = 0; |
842 | 0 | size_t consecutive_zero_count = 0; |
843 | 0 | for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) { |
844 | 0 | unsigned char c = (unsigned char)*it; |
845 | 0 | if (c > 1) { // most frequent |
846 | 0 | ++nalu_length; |
847 | 0 | consecutive_zero_count = 0; |
848 | 0 | continue; |
849 | 0 | } |
850 | 0 | if (c == 0) { |
851 | 0 | ++consecutive_zero_count; |
852 | 0 | } else { // c == 1 |
853 | 0 | if (consecutive_zero_count >= 2) { |
854 | 0 | if (nalu_length_out) { |
855 | 0 | *nalu_length_out = nalu_length; |
856 | 0 | } |
857 | 0 | if (stop_code_length) { |
858 | 0 | *stop_code_length = consecutive_zero_count + 1; |
859 | 0 | } |
860 | 0 | return; |
861 | 0 | } |
862 | 0 | ++nalu_length; |
863 | 0 | consecutive_zero_count = 0; |
864 | 0 | } |
865 | 0 | } |
866 | 0 | if (nalu_length_out) { |
867 | 0 | *nalu_length_out = nalu_length + consecutive_zero_count; |
868 | 0 | } |
869 | 0 | if (stop_code_length) { |
870 | 0 | *stop_code_length = 0; |
871 | 0 | } |
872 | 0 | } |
873 | | |
874 | | AVCNaluIterator::AVCNaluIterator(butil::IOBuf* data, uint32_t length_size_minus1, |
875 | | AVCNaluFormat* format) |
876 | | : _data(data) |
877 | | , _format(format) |
878 | | , _length_size_minus1(length_size_minus1) |
879 | 0 | , _nalu_type(AVC_NALU_EMPTY) { |
880 | 0 | if (_data) { |
881 | 0 | ++*this; |
882 | 0 | } |
883 | 0 | } |
884 | | |
885 | 0 | AVCNaluIterator::~AVCNaluIterator() { |
886 | 0 | } |
887 | | |
888 | 0 | void AVCNaluIterator::operator++() { |
889 | 0 | if (*_format == AVC_NALU_FORMAT_ANNEXB) { |
890 | 0 | if (!next_as_annexb()) { |
891 | 0 | return set_end(); |
892 | 0 | } |
893 | 0 | } else if (*_format == AVC_NALU_FORMAT_IBMF) { |
894 | 0 | if (!next_as_ibmf()) { |
895 | 0 | return set_end(); |
896 | 0 | } |
897 | 0 | } else { |
898 | 0 | size_t start_code_length = 0; |
899 | 0 | if (find_avc_annexb_nalu_start_code(*_data, &start_code_length) && |
900 | 0 | _data->size() > start_code_length) { |
901 | 0 | if (start_code_length > 0) { |
902 | 0 | _data->pop_front(start_code_length); |
903 | 0 | } |
904 | 0 | *_format = AVC_NALU_FORMAT_ANNEXB; |
905 | 0 | if (!next_as_annexb()) { |
906 | 0 | return set_end(); |
907 | 0 | } |
908 | 0 | } else if (next_as_ibmf()) { |
909 | 0 | *_format = AVC_NALU_FORMAT_IBMF; |
910 | 0 | } else { |
911 | 0 | set_end(); |
912 | 0 | } |
913 | 0 | } |
914 | 0 | } |
915 | | |
916 | 0 | bool AVCNaluIterator::next_as_annexb() { |
917 | 0 | if (_data->empty()) { |
918 | 0 | return false; |
919 | 0 | } |
920 | 0 | size_t nalu_length = 0; |
921 | 0 | size_t stop_code_length = 0; |
922 | 0 | find_avc_annexb_nalu_stop_code(*_data, &nalu_length, &stop_code_length); |
923 | 0 | _cur_nalu.clear(); |
924 | 0 | _nalu_type = AVC_NALU_EMPTY; |
925 | 0 | if (nalu_length) { |
926 | 0 | _data->cutn(&_cur_nalu, nalu_length); |
927 | 0 | const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1(); |
928 | 0 | _nalu_type = (AVCNaluType)(byte0 & 0x1f); |
929 | 0 | } |
930 | 0 | if (stop_code_length) { |
931 | 0 | _data->pop_front(stop_code_length); |
932 | 0 | } |
933 | 0 | return true; |
934 | 0 | } |
935 | | |
936 | 0 | bool AVCNaluIterator::next_as_ibmf() { |
937 | | // The value of this field shall be one of 0, 1, or 3 corresponding to a |
938 | | // length encoded with 1, 2, or 4 bytes, respectively. |
939 | 0 | CHECK_NE(_length_size_minus1, 2u); |
940 | |
|
941 | 0 | if (_data->empty()) { |
942 | 0 | return false; |
943 | 0 | } |
944 | 0 | if (_data->size() < _length_size_minus1 + 1) { |
945 | 0 | LOG(ERROR) << "Not enough data to decode length of NALU"; |
946 | 0 | return false; |
947 | 0 | } |
948 | 0 | int32_t nalu_length = 0; |
949 | 0 | char buf[4]; |
950 | 0 | if (_length_size_minus1 == 3) { |
951 | 0 | _data->copy_to(buf, 4); |
952 | 0 | nalu_length = policy::ReadBigEndian4Bytes(buf); |
953 | 0 | } else if (_length_size_minus1 == 1) { |
954 | 0 | _data->copy_to(buf, 2); |
955 | 0 | nalu_length = policy::ReadBigEndian2Bytes(buf); |
956 | 0 | } else { |
957 | 0 | _data->copy_to(buf, 1); |
958 | 0 | nalu_length = *buf; |
959 | 0 | } |
960 | | // maybe stream is invalid format. |
961 | | // see: https://github.com/ossrs/srs/issues/183 |
962 | 0 | if (nalu_length < 0) { |
963 | 0 | LOG(ERROR) << "Invalid nalu_length=" << nalu_length; |
964 | 0 | return false; |
965 | 0 | } |
966 | 0 | if (_data->size() < _length_size_minus1 + 1 + nalu_length) { |
967 | 0 | LOG(ERROR) << "Not enough data to decode NALU"; |
968 | 0 | return false; |
969 | 0 | } |
970 | 0 | _data->pop_front(_length_size_minus1 + 1); |
971 | 0 | _cur_nalu.clear(); |
972 | 0 | _nalu_type = AVC_NALU_EMPTY; |
973 | 0 | if (nalu_length) { |
974 | 0 | _data->cutn(&_cur_nalu, nalu_length); |
975 | 0 | const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1(); |
976 | 0 | _nalu_type = (AVCNaluType)(byte0 & 0x1f); |
977 | 0 | } |
978 | 0 | return true; |
979 | 0 | } |
980 | | |
981 | | RtmpClientOptions::RtmpClientOptions() |
982 | | : fpad(false) |
983 | | , audioCodecs((RtmpAudioCodec)3575) // Copy from SRS |
984 | | , videoCodecs((RtmpVideoCodec)252) // Copy from SRS |
985 | | , videoFunction(RTMP_VIDEO_FUNCTION_CLIENT_SEEK) |
986 | | , timeout_ms(1000) |
987 | | , connect_timeout_ms(500) |
988 | | , buffer_length_ms(1000) |
989 | | , chunk_size(policy::RTMP_DEFAULT_CHUNK_SIZE) |
990 | | , window_ack_size(policy::RTMP_DEFAULT_WINDOW_ACK_SIZE) |
991 | 0 | , simplified_rtmp(false) { |
992 | 0 | } |
993 | | |
994 | | // Shared by RtmpClient and RtmpClientStream(s) |
995 | | class RtmpClientImpl : public SharedObject { |
996 | | friend class RtmpClientStream; |
997 | | public: |
998 | 0 | RtmpClientImpl() { |
999 | 0 | get_rtmp_bvars()->client_count << 1; |
1000 | 0 | } |
1001 | 0 | ~RtmpClientImpl() { |
1002 | 0 | get_rtmp_bvars()->client_count << -1; |
1003 | 0 | RPC_VLOG << "Destroying RtmpClientImpl=" << this; |
1004 | 0 | } |
1005 | | |
1006 | | // Specify the servers to connect. |
1007 | | int Init(butil::EndPoint server_addr_and_port, |
1008 | | const RtmpClientOptions& options); |
1009 | | int Init(const char* server_addr_and_port, |
1010 | | const RtmpClientOptions& options); |
1011 | | int Init(const char* server_addr, int port, |
1012 | | const RtmpClientOptions& options); |
1013 | | int Init(const char* naming_service_url, |
1014 | | const char* load_balancer_name, |
1015 | | const RtmpClientOptions& options); |
1016 | | |
1017 | 0 | const RtmpClientOptions& options() const { return _connect_options; } |
1018 | 0 | SocketMap& socket_map() { return _socket_map; } |
1019 | | |
1020 | | int CreateSocket(const butil::EndPoint& pt, SocketId* id); |
1021 | | |
1022 | | private: |
1023 | | DISALLOW_COPY_AND_ASSIGN(RtmpClientImpl); |
1024 | | int CommonInit(const RtmpClientOptions& options); |
1025 | | |
1026 | | Channel _chan; |
1027 | | RtmpClientOptions _connect_options; |
1028 | | SocketMap _socket_map; |
1029 | | }; |
1030 | | |
1031 | | class RtmpConnect : public AppConnect { |
1032 | | public: |
1033 | | // @AppConnect |
1034 | | void StartConnect(const Socket* s, void (*done)(int, void*), void* data) override; |
1035 | | void StopConnect(Socket* s) override; |
1036 | | }; |
1037 | | |
1038 | | void RtmpConnect::StartConnect( |
1039 | 0 | const Socket* s, void (*done)(int, void*), void* data) { |
1040 | 0 | RPC_VLOG << "Establish rtmp-level connection on " << *s; |
1041 | 0 | policy::RtmpContext* ctx = |
1042 | 0 | static_cast<policy::RtmpContext*>(s->parsing_context()); |
1043 | 0 | if (ctx == NULL) { |
1044 | 0 | LOG(FATAL) << "RtmpContext of " << *s << " is NULL"; |
1045 | 0 | return done(EINVAL, data); |
1046 | 0 | } |
1047 | | |
1048 | 0 | const RtmpClientOptions* _client_options = ctx->client_options(); |
1049 | 0 | if (_client_options && _client_options->simplified_rtmp) { |
1050 | 0 | ctx->set_simplified_rtmp(true); |
1051 | 0 | if (ctx->SendConnectRequest(s->remote_side(), s->fd(), true) != 0) { |
1052 | 0 | LOG(ERROR) << s->remote_side() << ": Fail to send simple connect"; |
1053 | 0 | return done(EINVAL, data); |
1054 | 0 | } |
1055 | 0 | ctx->SetState(s->remote_side(), policy::RtmpContext::STATE_RECEIVED_S2); |
1056 | 0 | ctx->set_create_stream_with_play_or_publish(true); |
1057 | 0 | return done(0, data); |
1058 | 0 | } |
1059 | | |
1060 | | // Save to callback to call when RTMP connect is done. |
1061 | 0 | ctx->SetConnectCallback(done, data); |
1062 | | |
1063 | | // Initiate the rtmp handshake. |
1064 | 0 | bool is_simple_handshake = false; |
1065 | 0 | if (policy::SendC0C1(s->fd(), &is_simple_handshake) != 0) { |
1066 | 0 | LOG(ERROR) << s->remote_side() << ": Fail to send C0 C1"; |
1067 | 0 | return done(EINVAL, data); |
1068 | 0 | } |
1069 | 0 | if (is_simple_handshake) { |
1070 | 0 | ctx->only_check_simple_s0s1(); |
1071 | 0 | } |
1072 | 0 | } |
1073 | | |
1074 | 0 | void RtmpConnect::StopConnect(Socket* s) { |
1075 | 0 | policy::RtmpContext* ctx = |
1076 | 0 | static_cast<policy::RtmpContext*>(s->parsing_context()); |
1077 | 0 | if (ctx == NULL) { |
1078 | 0 | LOG(FATAL) << "RtmpContext of " << *s << " is NULL"; |
1079 | 0 | } else { |
1080 | 0 | ctx->OnConnected(EFAILEDSOCKET); |
1081 | 0 | } |
1082 | 0 | } |
1083 | | |
1084 | | class RtmpSocketCreator : public SocketCreator { |
1085 | | public: |
1086 | | RtmpSocketCreator(const RtmpClientOptions& connect_options) |
1087 | 0 | : _connect_options(connect_options) { |
1088 | 0 | } |
1089 | | |
1090 | 0 | int CreateSocket(const SocketOptions& opt, SocketId* id) override { |
1091 | 0 | SocketOptions sock_opt = opt; |
1092 | 0 | sock_opt.app_connect = std::make_shared<RtmpConnect>(); |
1093 | 0 | sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL); |
1094 | 0 | return get_client_side_messenger()->Create(sock_opt, id); |
1095 | 0 | } |
1096 | | |
1097 | | private: |
1098 | | RtmpClientOptions _connect_options; |
1099 | | }; |
1100 | | |
1101 | 0 | int RtmpClientImpl::CreateSocket(const butil::EndPoint& pt, SocketId* id) { |
1102 | 0 | SocketOptions sock_opt; |
1103 | 0 | sock_opt.remote_side = pt; |
1104 | 0 | sock_opt.app_connect = std::make_shared<RtmpConnect>(); |
1105 | 0 | sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL); |
1106 | 0 | return get_client_side_messenger()->Create(sock_opt, id); |
1107 | 0 | } |
1108 | | |
1109 | 0 | int RtmpClientImpl::CommonInit(const RtmpClientOptions& options) { |
1110 | 0 | _connect_options = options; |
1111 | 0 | SocketMapOptions sm_options; |
1112 | 0 | sm_options.socket_creator = new RtmpSocketCreator(_connect_options); |
1113 | 0 | if (_socket_map.Init(sm_options) != 0) { |
1114 | 0 | LOG(ERROR) << "Fail to init _socket_map"; |
1115 | 0 | return -1; |
1116 | 0 | } |
1117 | 0 | return 0; |
1118 | 0 | } |
1119 | | |
1120 | | int RtmpClientImpl::Init(butil::EndPoint server_addr_and_port, |
1121 | 0 | const RtmpClientOptions& options) { |
1122 | 0 | if (CommonInit(options) != 0) { |
1123 | 0 | return -1; |
1124 | 0 | } |
1125 | 0 | ChannelOptions copts; |
1126 | 0 | copts.connect_timeout_ms = options.connect_timeout_ms; |
1127 | 0 | copts.timeout_ms = options.timeout_ms; |
1128 | 0 | copts.protocol = PROTOCOL_RTMP; |
1129 | 0 | return _chan.Init(server_addr_and_port, &copts); |
1130 | 0 | } |
1131 | | int RtmpClientImpl::Init(const char* server_addr_and_port, |
1132 | 0 | const RtmpClientOptions& options) { |
1133 | 0 | if (CommonInit(options) != 0) { |
1134 | 0 | return -1; |
1135 | 0 | } |
1136 | 0 | ChannelOptions copts; |
1137 | 0 | copts.connect_timeout_ms = options.connect_timeout_ms; |
1138 | 0 | copts.timeout_ms = options.timeout_ms; |
1139 | 0 | copts.protocol = PROTOCOL_RTMP; |
1140 | 0 | return _chan.Init(server_addr_and_port, &copts); |
1141 | 0 | } |
1142 | | int RtmpClientImpl::Init(const char* server_addr, int port, |
1143 | 0 | const RtmpClientOptions& options) { |
1144 | 0 | if (CommonInit(options) != 0) { |
1145 | 0 | return -1; |
1146 | 0 | } |
1147 | 0 | ChannelOptions copts; |
1148 | 0 | copts.connect_timeout_ms = options.connect_timeout_ms; |
1149 | 0 | copts.timeout_ms = options.timeout_ms; |
1150 | 0 | copts.protocol = PROTOCOL_RTMP; |
1151 | 0 | return _chan.Init(server_addr, port, &copts); |
1152 | 0 | } |
1153 | | int RtmpClientImpl::Init(const char* naming_service_url, |
1154 | | const char* load_balancer_name, |
1155 | 0 | const RtmpClientOptions& options) { |
1156 | 0 | if (CommonInit(options) != 0) { |
1157 | 0 | return -1; |
1158 | 0 | } |
1159 | 0 | ChannelOptions copts; |
1160 | 0 | copts.connect_timeout_ms = options.connect_timeout_ms; |
1161 | 0 | copts.timeout_ms = options.timeout_ms; |
1162 | 0 | copts.protocol = PROTOCOL_RTMP; |
1163 | 0 | return _chan.Init(naming_service_url, load_balancer_name, &copts); |
1164 | 0 | } |
1165 | | |
1166 | 0 | RtmpClient::RtmpClient() {} |
1167 | 0 | RtmpClient::~RtmpClient() {} |
1168 | 0 | RtmpClient::RtmpClient(const RtmpClient& rhs) : _impl(rhs._impl) {} |
1169 | | |
1170 | 0 | RtmpClient& RtmpClient::operator=(const RtmpClient& rhs) { |
1171 | 0 | _impl = rhs._impl; |
1172 | 0 | return *this; |
1173 | 0 | } |
1174 | | |
1175 | 0 | const RtmpClientOptions& RtmpClient::options() const { |
1176 | 0 | if (_impl) { |
1177 | 0 | return _impl->options(); |
1178 | 0 | } else { |
1179 | 0 | static RtmpClientOptions dft_opt; |
1180 | 0 | return dft_opt; |
1181 | 0 | } |
1182 | 0 | } |
1183 | | |
1184 | | int RtmpClient::Init(butil::EndPoint server_addr_and_port, |
1185 | 0 | const RtmpClientOptions& options) { |
1186 | 0 | butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl); |
1187 | 0 | if (tmp == NULL) { |
1188 | 0 | LOG(FATAL) << "Fail to new RtmpClientImpl"; |
1189 | 0 | return -1; |
1190 | 0 | } |
1191 | 0 | if (tmp->Init(server_addr_and_port, options) != 0) { |
1192 | 0 | return -1; |
1193 | 0 | } |
1194 | 0 | tmp.swap(_impl); |
1195 | 0 | return 0; |
1196 | 0 | } |
1197 | | |
1198 | | int RtmpClient::Init(const char* server_addr_and_port, |
1199 | 0 | const RtmpClientOptions& options) { |
1200 | 0 | butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl); |
1201 | 0 | if (tmp == NULL) { |
1202 | 0 | LOG(FATAL) << "Fail to new RtmpClientImpl"; |
1203 | 0 | return -1; |
1204 | 0 | } |
1205 | 0 | if (tmp->Init(server_addr_and_port, options) != 0) { |
1206 | 0 | return -1; |
1207 | 0 | } |
1208 | 0 | tmp.swap(_impl); |
1209 | 0 | return 0; |
1210 | 0 | } |
1211 | | |
1212 | | int RtmpClient::Init(const char* server_addr, int port, |
1213 | 0 | const RtmpClientOptions& options) { |
1214 | 0 | butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl); |
1215 | 0 | if (tmp == NULL) { |
1216 | 0 | LOG(FATAL) << "Fail to new RtmpClientImpl"; |
1217 | 0 | return -1; |
1218 | 0 | } |
1219 | 0 | if (tmp->Init(server_addr, port, options) != 0) { |
1220 | 0 | return -1; |
1221 | 0 | } |
1222 | 0 | tmp.swap(_impl); |
1223 | 0 | return 0; |
1224 | 0 | } |
1225 | | |
1226 | | int RtmpClient::Init(const char* naming_service_url, |
1227 | | const char* load_balancer_name, |
1228 | 0 | const RtmpClientOptions& options) { |
1229 | 0 | butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl); |
1230 | 0 | if (tmp == NULL) { |
1231 | 0 | LOG(FATAL) << "Fail to new RtmpClientImpl"; |
1232 | 0 | return -1; |
1233 | 0 | } |
1234 | 0 | if (tmp->Init(naming_service_url, load_balancer_name, options) != 0) { |
1235 | 0 | return -1; |
1236 | 0 | } |
1237 | 0 | tmp.swap(_impl); |
1238 | 0 | return 0; |
1239 | 0 | } |
1240 | | |
1241 | 0 | bool RtmpClient::initialized() const { return _impl != NULL; } |
1242 | | |
1243 | | RtmpStreamBase::RtmpStreamBase(bool is_client) |
1244 | | : _is_client(is_client) |
1245 | | , _paused(false) |
1246 | | , _stopped(false) |
1247 | | , _processing_msg(false) |
1248 | | , _has_data_ever(false) |
1249 | | , _message_stream_id(0) |
1250 | | , _chunk_stream_id(0) |
1251 | | , _create_realtime_us(butil::gettimeofday_us()) |
1252 | 0 | , _is_server_accepted(false) { |
1253 | 0 | } |
1254 | | |
1255 | 0 | RtmpStreamBase::~RtmpStreamBase() { |
1256 | 0 | } |
1257 | | |
1258 | 0 | void RtmpStreamBase::Destroy() { |
1259 | 0 | return; |
1260 | 0 | } |
1261 | | |
1262 | | int RtmpStreamBase::SendMessage(uint32_t timestamp, |
1263 | | uint8_t message_type, |
1264 | 0 | const butil::IOBuf& body) { |
1265 | 0 | if (_rtmpsock == NULL) { |
1266 | 0 | errno = EPERM; |
1267 | 0 | return -1; |
1268 | 0 | } |
1269 | 0 | if (_chunk_stream_id == 0) { |
1270 | 0 | LOG(ERROR) << "SendXXXMessage can't be called before play() is received"; |
1271 | 0 | errno = EPERM; |
1272 | 0 | return -1; |
1273 | 0 | } |
1274 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage); |
1275 | 0 | msg->header.timestamp = timestamp; |
1276 | 0 | msg->header.message_length = body.size(); |
1277 | 0 | msg->header.message_type = message_type; |
1278 | 0 | msg->header.stream_id = _message_stream_id; |
1279 | 0 | msg->chunk_stream_id = _chunk_stream_id; |
1280 | 0 | msg->body = body; |
1281 | 0 | return _rtmpsock->Write(msg); |
1282 | 0 | } |
1283 | | |
1284 | | int RtmpStreamBase::SendControlMessage( |
1285 | 0 | uint8_t message_type, const void* body, size_t size) { |
1286 | 0 | if (_rtmpsock == NULL) { |
1287 | 0 | errno = EPERM; |
1288 | 0 | return -1; |
1289 | 0 | } |
1290 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg( |
1291 | 0 | policy::MakeUnsentControlMessage(message_type, body, size)); |
1292 | 0 | return _rtmpsock->Write(msg); |
1293 | 0 | } |
1294 | | |
1295 | 0 | int RtmpStreamBase::SendCuePoint(const RtmpCuePoint& cuepoint) { |
1296 | 0 | butil::IOBuf req_buf; |
1297 | 0 | { |
1298 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
1299 | 0 | AMFOutputStream ostream(&zc_stream); |
1300 | 0 | WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream); |
1301 | 0 | WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream); |
1302 | 0 | WriteAMFObject(cuepoint.data, &ostream); |
1303 | 0 | if (!ostream.good()) { |
1304 | 0 | LOG(ERROR) << "Fail to serialize cuepoint"; |
1305 | 0 | return -1; |
1306 | 0 | } |
1307 | 0 | } |
1308 | 0 | return SendMessage(cuepoint.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf); |
1309 | 0 | } |
1310 | | |
1311 | | int RtmpStreamBase::SendMetaData(const RtmpMetaData& metadata, |
1312 | 0 | const butil::StringPiece& name) { |
1313 | 0 | butil::IOBuf req_buf; |
1314 | 0 | { |
1315 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
1316 | 0 | AMFOutputStream ostream(&zc_stream); |
1317 | 0 | WriteAMFString(name, &ostream); |
1318 | 0 | WriteAMFObject(metadata.data, &ostream); |
1319 | 0 | if (!ostream.good()) { |
1320 | 0 | LOG(ERROR) << "Fail to serialize metadata"; |
1321 | 0 | return -1; |
1322 | 0 | } |
1323 | 0 | } |
1324 | 0 | return SendMessage(metadata.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf); |
1325 | 0 | } |
1326 | | |
1327 | 0 | int RtmpStreamBase::SendSharedObjectMessage(const RtmpSharedObjectMessage&) { |
1328 | 0 | CHECK(false) << "Not supported yet"; |
1329 | 0 | return -1; |
1330 | 0 | } |
1331 | | |
1332 | 0 | int RtmpStreamBase::SendAudioMessage(const RtmpAudioMessage& msg) { |
1333 | 0 | if (_rtmpsock == NULL) { |
1334 | 0 | errno = EPERM; |
1335 | 0 | return -1; |
1336 | 0 | } |
1337 | 0 | if (_chunk_stream_id == 0) { |
1338 | 0 | LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received"; |
1339 | 0 | errno = EPERM; |
1340 | 0 | return -1; |
1341 | 0 | } |
1342 | 0 | if (_paused) { |
1343 | 0 | errno = EPERM; |
1344 | 0 | return -1; |
1345 | 0 | } |
1346 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage); |
1347 | 0 | msg2->header.timestamp = msg.timestamp; |
1348 | 0 | msg2->header.message_length = msg.size(); |
1349 | 0 | msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO; |
1350 | 0 | msg2->header.stream_id = _message_stream_id; |
1351 | 0 | msg2->chunk_stream_id = _chunk_stream_id; |
1352 | | // Make audio header. |
1353 | 0 | const char audio_head = |
1354 | 0 | ((msg.codec & 0xF) << 4) |
1355 | 0 | | ((msg.rate & 0x3) << 2) |
1356 | 0 | | ((msg.bits & 0x1) << 1) |
1357 | 0 | | (msg.type & 0x1); |
1358 | 0 | msg2->body.push_back(audio_head); |
1359 | 0 | msg2->body.append(msg.data); |
1360 | 0 | return _rtmpsock->Write(msg2); |
1361 | 0 | } |
1362 | | |
1363 | 0 | int RtmpStreamBase::SendAACMessage(const RtmpAACMessage& msg) { |
1364 | 0 | if (_rtmpsock == NULL) { |
1365 | 0 | errno = EPERM; |
1366 | 0 | return -1; |
1367 | 0 | } |
1368 | 0 | if (_chunk_stream_id == 0) { |
1369 | 0 | LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received"; |
1370 | 0 | errno = EPERM; |
1371 | 0 | return -1; |
1372 | 0 | } |
1373 | 0 | if (_paused) { |
1374 | 0 | errno = EPERM; |
1375 | 0 | return -1; |
1376 | 0 | } |
1377 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage); |
1378 | 0 | msg2->header.timestamp = msg.timestamp; |
1379 | 0 | msg2->header.message_length = msg.size(); |
1380 | 0 | msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO; |
1381 | 0 | msg2->header.stream_id = _message_stream_id; |
1382 | 0 | msg2->chunk_stream_id = _chunk_stream_id; |
1383 | | // Make audio header. |
1384 | 0 | char aac_head[2]; |
1385 | 0 | aac_head[0] = ((FLV_AUDIO_AAC & 0xF) << 4) |
1386 | 0 | | ((msg.rate & 0x3) << 2) |
1387 | 0 | | ((msg.bits & 0x1) << 1) |
1388 | 0 | | (msg.type & 0x1); |
1389 | 0 | aac_head[1] = (FlvAACPacketType)msg.packet_type; |
1390 | 0 | msg2->body.append(aac_head, sizeof(aac_head)); |
1391 | 0 | msg2->body.append(msg.data); |
1392 | 0 | return _rtmpsock->Write(msg2); |
1393 | 0 | } |
1394 | | |
1395 | 0 | int RtmpStreamBase::SendUserMessage(void*) { |
1396 | 0 | CHECK(false) << "You should implement your own SendUserMessage"; |
1397 | 0 | return 0; |
1398 | 0 | } |
1399 | | |
1400 | 0 | int RtmpStreamBase::SendVideoMessage(const RtmpVideoMessage& msg) { |
1401 | 0 | if (_rtmpsock == NULL) { |
1402 | 0 | errno = EPERM; |
1403 | 0 | return -1; |
1404 | 0 | } |
1405 | 0 | if (_chunk_stream_id == 0) { |
1406 | 0 | LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received"; |
1407 | 0 | errno = EPERM; |
1408 | 0 | return -1; |
1409 | 0 | } |
1410 | 0 | if (!policy::is_video_frame_type_valid(msg.frame_type)) { |
1411 | 0 | LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type; |
1412 | 0 | } |
1413 | 0 | if (!policy::is_video_codec_valid(msg.codec)) { |
1414 | 0 | LOG(WARNING) << "Invalid codec=" << (int)msg.codec; |
1415 | 0 | } |
1416 | 0 | if (_paused) { |
1417 | 0 | errno = EPERM; |
1418 | 0 | return -1; |
1419 | 0 | } |
1420 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage); |
1421 | 0 | msg2->header.timestamp = msg.timestamp; |
1422 | 0 | msg2->header.message_length = msg.size(); |
1423 | 0 | msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO; |
1424 | 0 | msg2->header.stream_id = _message_stream_id; |
1425 | 0 | msg2->chunk_stream_id = _chunk_stream_id; |
1426 | | // Make video header |
1427 | 0 | const char video_head = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF); |
1428 | 0 | msg2->body.push_back(video_head); |
1429 | 0 | msg2->body.append(msg.data); |
1430 | 0 | return _rtmpsock->Write(msg2); |
1431 | 0 | } |
1432 | | |
1433 | 0 | int RtmpStreamBase::SendAVCMessage(const RtmpAVCMessage& msg) { |
1434 | 0 | if (_rtmpsock == NULL) { |
1435 | 0 | errno = EPERM; |
1436 | 0 | return -1; |
1437 | 0 | } |
1438 | 0 | if (_chunk_stream_id == 0) { |
1439 | 0 | LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received"; |
1440 | 0 | errno = EPERM; |
1441 | 0 | return -1; |
1442 | 0 | } |
1443 | 0 | if (!policy::is_video_frame_type_valid(msg.frame_type)) { |
1444 | 0 | LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type; |
1445 | 0 | } |
1446 | 0 | if (_paused) { |
1447 | 0 | errno = EPERM; |
1448 | 0 | return -1; |
1449 | 0 | } |
1450 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage); |
1451 | 0 | msg2->header.timestamp = msg.timestamp; |
1452 | 0 | msg2->header.message_length = msg.size(); |
1453 | 0 | msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO; |
1454 | 0 | msg2->header.stream_id = _message_stream_id; |
1455 | 0 | msg2->chunk_stream_id = _chunk_stream_id; |
1456 | | // Make video header |
1457 | 0 | char avc_head[5]; |
1458 | 0 | char* p = avc_head; |
1459 | 0 | *p++ = ((msg.frame_type & 0xF) << 4) | (FLV_VIDEO_AVC & 0xF); |
1460 | 0 | *p++ = (FlvAVCPacketType)msg.packet_type; |
1461 | 0 | policy::WriteBigEndian3Bytes(&p, msg.composition_time); |
1462 | 0 | msg2->body.append(avc_head, sizeof(avc_head)); |
1463 | 0 | msg2->body.append(msg.data); |
1464 | 0 | return _rtmpsock->Write(msg2); |
1465 | 0 | } |
1466 | | |
1467 | 0 | int RtmpStreamBase::SendStopMessage(const butil::StringPiece&) { |
1468 | 0 | return -1; |
1469 | 0 | } |
1470 | | |
1471 | 0 | const char* RtmpObjectEncoding2Str(RtmpObjectEncoding e) { |
1472 | 0 | switch (e) { |
1473 | 0 | case RTMP_AMF0: return "AMF0"; |
1474 | 0 | case RTMP_AMF3: return "AMF3"; |
1475 | 0 | } |
1476 | 0 | return "Unknown RtmpObjectEncoding"; |
1477 | 0 | } |
1478 | | |
1479 | 0 | void RtmpStreamBase::SignalError() { |
1480 | 0 | return; |
1481 | 0 | } |
1482 | | |
1483 | 0 | void RtmpStreamBase::OnFirstMessage() {} |
1484 | | |
1485 | 0 | void RtmpStreamBase::OnUserData(void*) { |
1486 | 0 | LOG(INFO) << remote_side() << '[' << stream_id() |
1487 | 0 | << "] ignored UserData{}"; |
1488 | 0 | } |
1489 | | |
1490 | 0 | void RtmpStreamBase::OnCuePoint(RtmpCuePoint* cuepoint) { |
1491 | 0 | LOG(INFO) << remote_side() << '[' << stream_id() |
1492 | 0 | << "] ignored CuePoint{" << cuepoint->data << '}'; |
1493 | 0 | } |
1494 | | |
1495 | 0 | void RtmpStreamBase::OnMetaData(RtmpMetaData* metadata, const butil::StringPiece& name) { |
1496 | 0 | LOG(INFO) << remote_side() << '[' << stream_id() |
1497 | 0 | << "] ignored MetaData{" << metadata->data << '}' |
1498 | 0 | << " name{" << name << '}'; |
1499 | 0 | } |
1500 | | |
1501 | 0 | void RtmpStreamBase::OnSharedObjectMessage(RtmpSharedObjectMessage*) { |
1502 | 0 | LOG(ERROR) << remote_side() << '[' << stream_id() |
1503 | 0 | << "] ignored SharedObjectMessage{}"; |
1504 | 0 | } |
1505 | | |
1506 | 0 | void RtmpStreamBase::OnAudioMessage(RtmpAudioMessage* msg) { |
1507 | 0 | LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg; |
1508 | 0 | } |
1509 | | |
1510 | 0 | void RtmpStreamBase::OnVideoMessage(RtmpVideoMessage* msg) { |
1511 | 0 | LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg; |
1512 | 0 | } |
1513 | | |
1514 | 0 | void RtmpStreamBase::OnStop() { |
1515 | | // do nothing by default |
1516 | 0 | } |
1517 | | |
1518 | 0 | bool RtmpStreamBase::BeginProcessingMessage(const char* fun_name) { |
1519 | 0 | std::unique_lock<butil::Mutex> mu(_call_mutex); |
1520 | 0 | if (_stopped) { |
1521 | 0 | mu.unlock(); |
1522 | 0 | LOG(ERROR) << fun_name << " is called after OnStop()"; |
1523 | 0 | return false; |
1524 | 0 | } |
1525 | 0 | if (_processing_msg) { |
1526 | 0 | mu.unlock(); |
1527 | 0 | LOG(ERROR) << "Impossible: Another OnXXXMessage is being called!"; |
1528 | 0 | return false; |
1529 | 0 | } |
1530 | 0 | _processing_msg = true; |
1531 | 0 | if (!_has_data_ever) { |
1532 | 0 | _has_data_ever = true; |
1533 | 0 | OnFirstMessage(); |
1534 | 0 | } |
1535 | 0 | return true; |
1536 | 0 | } |
1537 | | |
1538 | 0 | void RtmpStreamBase::EndProcessingMessage() { |
1539 | 0 | std::unique_lock<butil::Mutex> mu(_call_mutex); |
1540 | 0 | _processing_msg = false; |
1541 | 0 | if (_stopped) { |
1542 | 0 | mu.unlock(); |
1543 | 0 | return OnStop(); |
1544 | 0 | } |
1545 | 0 | } |
1546 | | |
1547 | 0 | void RtmpStreamBase::CallOnUserData(void* data) { |
1548 | 0 | if (BeginProcessingMessage("OnUserData()")) { |
1549 | 0 | OnUserData(data); |
1550 | 0 | EndProcessingMessage(); |
1551 | 0 | } |
1552 | 0 | } |
1553 | | |
1554 | 0 | void RtmpStreamBase::CallOnCuePoint(RtmpCuePoint* obj) { |
1555 | 0 | if (BeginProcessingMessage("OnCuePoint()")) { |
1556 | 0 | OnCuePoint(obj); |
1557 | 0 | EndProcessingMessage(); |
1558 | 0 | } |
1559 | 0 | } |
1560 | | |
1561 | 0 | void RtmpStreamBase::CallOnMetaData(RtmpMetaData* obj, const butil::StringPiece& name) { |
1562 | 0 | if (BeginProcessingMessage("OnMetaData()")) { |
1563 | 0 | OnMetaData(obj, name); |
1564 | 0 | EndProcessingMessage(); |
1565 | 0 | } |
1566 | 0 | } |
1567 | | |
1568 | 0 | void RtmpStreamBase::CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg) { |
1569 | 0 | if (BeginProcessingMessage("OnSharedObjectMessage()")) { |
1570 | 0 | OnSharedObjectMessage(msg); |
1571 | 0 | EndProcessingMessage(); |
1572 | 0 | } |
1573 | 0 | } |
1574 | | |
1575 | 0 | void RtmpStreamBase::CallOnAudioMessage(RtmpAudioMessage* msg) { |
1576 | 0 | if (BeginProcessingMessage("OnAudioMessage()")) { |
1577 | 0 | OnAudioMessage(msg); |
1578 | 0 | EndProcessingMessage(); |
1579 | 0 | } |
1580 | 0 | } |
1581 | | |
1582 | 0 | void RtmpStreamBase::CallOnVideoMessage(RtmpVideoMessage* msg) { |
1583 | 0 | if (BeginProcessingMessage("OnVideoMessage()")) { |
1584 | 0 | OnVideoMessage(msg); |
1585 | 0 | EndProcessingMessage(); |
1586 | 0 | } |
1587 | 0 | } |
1588 | | |
1589 | 0 | void RtmpStreamBase::CallOnStop() { |
1590 | 0 | { |
1591 | 0 | std::unique_lock<butil::Mutex> mu(_call_mutex); |
1592 | 0 | if (_stopped) { |
1593 | 0 | mu.unlock(); |
1594 | 0 | LOG(ERROR) << "OnStop() was called more than once"; |
1595 | 0 | return; |
1596 | 0 | } |
1597 | 0 | _stopped = true; |
1598 | 0 | if (_processing_msg) { |
1599 | | // EndProcessingMessage() will call OnStop(); |
1600 | 0 | return; |
1601 | 0 | } |
1602 | 0 | } |
1603 | 0 | OnStop(); |
1604 | 0 | } |
1605 | | |
1606 | | butil::EndPoint RtmpStreamBase::remote_side() const |
1607 | 0 | { return _rtmpsock ? _rtmpsock->remote_side() : butil::EndPoint(); } |
1608 | | |
1609 | | butil::EndPoint RtmpStreamBase::local_side() const |
1610 | 0 | { return _rtmpsock ? _rtmpsock->local_side() : butil::EndPoint(); } |
1611 | | |
1612 | | // ============ RtmpClientStream ============= |
1613 | | |
1614 | | RtmpClientStream::RtmpClientStream() |
1615 | | : RtmpStreamBase(true) |
1616 | | , _onfail_id(INVALID_BTHREAD_ID) |
1617 | | , _create_stream_rpc_id(INVALID_BTHREAD_ID) |
1618 | | , _from_socketmap(true) |
1619 | | , _created_stream_with_play_or_publish(false) |
1620 | 0 | , _state(STATE_UNINITIALIZED) { |
1621 | 0 | get_rtmp_bvars()->client_stream_count << 1; |
1622 | 0 | _self_ref.reset(this); |
1623 | 0 | } |
1624 | | |
1625 | 0 | RtmpClientStream::~RtmpClientStream() { |
1626 | 0 | get_rtmp_bvars()->client_stream_count << -1; |
1627 | 0 | } |
1628 | | |
1629 | 0 | void RtmpClientStream::Destroy() { |
1630 | 0 | bthread_id_t onfail_id = INVALID_BTHREAD_ID; |
1631 | 0 | CallId create_stream_rpc_id = INVALID_BTHREAD_ID; |
1632 | 0 | butil::intrusive_ptr<RtmpClientStream> self_ref; |
1633 | | |
1634 | 0 | std::unique_lock<butil::Mutex> mu(_state_mutex); |
1635 | 0 | switch (_state) { |
1636 | 0 | case STATE_UNINITIALIZED: |
1637 | 0 | _state = STATE_DESTROYING; |
1638 | 0 | mu.unlock(); |
1639 | 0 | OnStopInternal(); |
1640 | 0 | _self_ref.swap(self_ref); |
1641 | 0 | return; |
1642 | 0 | case STATE_CREATING: |
1643 | 0 | _state = STATE_DESTROYING; |
1644 | 0 | create_stream_rpc_id = _create_stream_rpc_id; |
1645 | 0 | mu.unlock(); |
1646 | 0 | _self_ref.swap(self_ref); |
1647 | 0 | StartCancel(create_stream_rpc_id); |
1648 | 0 | return; |
1649 | 0 | case STATE_CREATED: |
1650 | 0 | _state = STATE_DESTROYING; |
1651 | 0 | onfail_id = _onfail_id; |
1652 | 0 | mu.unlock(); |
1653 | 0 | _self_ref.swap(self_ref); |
1654 | 0 | bthread_id_error(onfail_id, 0); |
1655 | 0 | return; |
1656 | 0 | case STATE_ERROR: |
1657 | 0 | _state = STATE_DESTROYING; |
1658 | 0 | mu.unlock(); |
1659 | 0 | _self_ref.swap(self_ref); |
1660 | 0 | return; |
1661 | 0 | case STATE_DESTROYING: |
1662 | | // Destroy() was already called. |
1663 | 0 | return; |
1664 | 0 | } |
1665 | 0 | } |
1666 | | |
1667 | 0 | void RtmpClientStream::SignalError() { |
1668 | 0 | bthread_id_t onfail_id = INVALID_BTHREAD_ID; |
1669 | 0 | std::unique_lock<butil::Mutex> mu(_state_mutex); |
1670 | 0 | switch (_state) { |
1671 | 0 | case STATE_UNINITIALIZED: |
1672 | 0 | _state = STATE_ERROR; |
1673 | 0 | mu.unlock(); |
1674 | 0 | OnStopInternal(); |
1675 | 0 | return; |
1676 | 0 | case STATE_CREATING: |
1677 | 0 | _state = STATE_ERROR; |
1678 | 0 | mu.unlock(); |
1679 | 0 | return; |
1680 | 0 | case STATE_CREATED: |
1681 | 0 | _state = STATE_ERROR; |
1682 | 0 | onfail_id = _onfail_id; |
1683 | 0 | mu.unlock(); |
1684 | 0 | bthread_id_error(onfail_id, 0); |
1685 | 0 | return; |
1686 | 0 | case STATE_ERROR: |
1687 | 0 | case STATE_DESTROYING: |
1688 | | // SignalError() or Destroy() was already called. |
1689 | 0 | return; |
1690 | 0 | } |
1691 | 0 | } |
1692 | | |
1693 | | StreamUserData* RtmpClientStream::OnCreatingStream( |
1694 | 0 | SocketUniquePtr* inout, Controller* cntl) { |
1695 | 0 | { |
1696 | 0 | std::unique_lock<butil::Mutex> mu(_state_mutex); |
1697 | 0 | if (_state == STATE_ERROR || _state == STATE_DESTROYING) { |
1698 | 0 | cntl->SetFailed(EINVAL, "Fail to replace socket for stream, _state is error or destroying"); |
1699 | 0 | return NULL; |
1700 | 0 | } |
1701 | 0 | } |
1702 | 0 | SocketId esid; |
1703 | 0 | if (cntl->connection_type() == CONNECTION_TYPE_SHORT) { |
1704 | 0 | if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) { |
1705 | 0 | cntl->SetFailed(EINVAL, "Fail to create RTMP socket"); |
1706 | 0 | return NULL; |
1707 | 0 | } |
1708 | 0 | } else { |
1709 | 0 | if (_client_impl->socket_map().Insert( |
1710 | 0 | SocketMapKey((*inout)->remote_side()), &esid) != 0) { |
1711 | 0 | cntl->SetFailed(EINVAL, "Fail to get the RTMP socket"); |
1712 | 0 | return NULL; |
1713 | 0 | } |
1714 | 0 | } |
1715 | 0 | SocketUniquePtr tmp_ptr; |
1716 | 0 | if (Socket::Address(esid, &tmp_ptr) != 0) { |
1717 | 0 | cntl->SetFailed(EFAILEDSOCKET, "Fail to address RTMP SocketId=%" PRIu64 |
1718 | 0 | " from SocketMap of RtmpClient=%p", |
1719 | 0 | esid, _client_impl.get()); |
1720 | 0 | return NULL; |
1721 | 0 | } |
1722 | 0 | RPC_VLOG << "Replace Socket For Stream, RTMP socketId=" << esid |
1723 | 0 | << ", main socketId=" << (*inout)->id(); |
1724 | 0 | tmp_ptr->ShareStats(inout->get()); |
1725 | 0 | inout->reset(tmp_ptr.release()); |
1726 | 0 | return this; |
1727 | 0 | } |
1728 | | |
1729 | 0 | int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) { |
1730 | 0 | butil::intrusive_ptr<RtmpClientStream> stream( |
1731 | 0 | static_cast<RtmpClientStream*>(data), false); |
1732 | 0 | CHECK(stream->_rtmpsock); |
1733 | | // Must happen after NotifyOnFailed which is after all other callsites |
1734 | | // to OnStopInternal(). |
1735 | 0 | stream->OnStopInternal(); |
1736 | 0 | bthread_id_unlock_and_destroy(id); |
1737 | 0 | return 0; |
1738 | 0 | } |
1739 | | |
1740 | 0 | void RtmpClientStream::OnFailedToCreateStream() { |
1741 | 0 | { |
1742 | 0 | std::unique_lock<butil::Mutex> mu(_state_mutex); |
1743 | 0 | switch (_state) { |
1744 | 0 | case STATE_CREATING: |
1745 | 0 | _state = STATE_ERROR; |
1746 | 0 | break; |
1747 | 0 | case STATE_UNINITIALIZED: |
1748 | 0 | case STATE_CREATED: |
1749 | 0 | _state = STATE_ERROR; |
1750 | 0 | mu.unlock(); |
1751 | 0 | CHECK(false) << "Impossible"; |
1752 | 0 | break; |
1753 | 0 | case STATE_ERROR: |
1754 | 0 | case STATE_DESTROYING: |
1755 | 0 | break; |
1756 | 0 | } |
1757 | 0 | } |
1758 | 0 | return OnStopInternal(); |
1759 | 0 | } |
1760 | | |
1761 | | void RtmpClientStream::DestroyStreamUserData(SocketUniquePtr& sending_sock, |
1762 | | Controller* cntl, |
1763 | | int /*error_code*/, |
1764 | 0 | bool end_of_rpc) { |
1765 | 0 | if (!end_of_rpc) { |
1766 | 0 | if (sending_sock) { |
1767 | 0 | if (_from_socketmap) { |
1768 | 0 | _client_impl->socket_map().Remove(SocketMapKey(sending_sock->remote_side()), |
1769 | 0 | sending_sock->id()); |
1770 | 0 | } else { |
1771 | 0 | sending_sock->SetFailed(); // not necessary, already failed. |
1772 | 0 | } |
1773 | 0 | } |
1774 | 0 | } else { |
1775 | | // Always move sending_sock into _rtmpsock at the end of rpc. |
1776 | | // - If the RPC is successful, moving sending_sock prevents it from |
1777 | | // setfailed in Controller after calling this method. |
1778 | | // - If the RPC is failed, OnStopInternal() can clean up the socket_map |
1779 | | // inserted in OnCreatingStream(). |
1780 | 0 | _rtmpsock.swap(sending_sock); |
1781 | 0 | } |
1782 | 0 | } |
1783 | | |
1784 | | |
1785 | 0 | void RtmpClientStream::DestroyStreamCreator(Controller* cntl) { |
1786 | 0 | if (cntl->Failed()) { |
1787 | 0 | if (_rtmpsock != NULL && |
1788 | | // ^ If sending_sock is NULL, the RPC fails before _pack_request |
1789 | | // which calls AddTransaction, in another word, RemoveTransaction |
1790 | | // is not needed. |
1791 | 0 | cntl->ErrorCode() != ERTMPCREATESTREAM) { |
1792 | | // ^ ERTMPCREATESTREAM is triggered by receiving "_error" command, |
1793 | | // RemoveTransaction should already be called. |
1794 | 0 | CHECK_LT(cntl->log_id(), (uint64_t)std::numeric_limits<uint32_t>::max()); |
1795 | 0 | const uint32_t transaction_id = cntl->log_id(); |
1796 | 0 | policy::RtmpContext* rtmp_ctx = |
1797 | 0 | static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context()); |
1798 | 0 | if (rtmp_ctx == NULL) { |
1799 | 0 | LOG(FATAL) << "RtmpContext must be created"; |
1800 | 0 | } else { |
1801 | 0 | policy::RtmpTransactionHandler* handler = |
1802 | 0 | rtmp_ctx->RemoveTransaction(transaction_id); |
1803 | 0 | if (handler) { |
1804 | 0 | handler->Cancel(); |
1805 | 0 | } |
1806 | 0 | } |
1807 | 0 | } |
1808 | 0 | return OnFailedToCreateStream(); |
1809 | 0 | } |
1810 | | |
1811 | 0 | int rc = 0; |
1812 | 0 | bthread_id_t onfail_id = INVALID_BTHREAD_ID; |
1813 | 0 | { |
1814 | 0 | std::unique_lock<butil::Mutex> mu(_state_mutex); |
1815 | 0 | switch (_state) { |
1816 | 0 | case STATE_CREATING: |
1817 | 0 | CHECK(_rtmpsock); |
1818 | 0 | rc = bthread_id_create(&onfail_id, this, RunOnFailed); |
1819 | 0 | if (rc) { |
1820 | 0 | cntl->SetFailed(ENOMEM, "Fail to create _onfail_id: %s", berror(rc)); |
1821 | 0 | mu.unlock(); |
1822 | 0 | return OnFailedToCreateStream(); |
1823 | 0 | } |
1824 | | // Add a ref for RunOnFailed. |
1825 | 0 | butil::intrusive_ptr<RtmpClientStream>(this).detach(); |
1826 | 0 | _state = STATE_CREATED; |
1827 | 0 | _onfail_id = onfail_id; |
1828 | 0 | break; |
1829 | 0 | case STATE_UNINITIALIZED: |
1830 | 0 | case STATE_CREATED: |
1831 | 0 | _state = STATE_ERROR; |
1832 | 0 | mu.unlock(); |
1833 | 0 | CHECK(false) << "Impossible"; |
1834 | 0 | return OnStopInternal(); |
1835 | 0 | case STATE_ERROR: |
1836 | 0 | case STATE_DESTROYING: |
1837 | 0 | mu.unlock(); |
1838 | 0 | return OnStopInternal(); |
1839 | 0 | } |
1840 | 0 | } |
1841 | 0 | if (onfail_id != INVALID_BTHREAD_ID) { |
1842 | 0 | _rtmpsock->NotifyOnFailed(onfail_id); |
1843 | 0 | } |
1844 | 0 | } |
1845 | | |
1846 | 0 | void RtmpClientStream::OnStopInternal() { |
1847 | 0 | if (_rtmpsock == NULL) { |
1848 | 0 | return CallOnStop(); |
1849 | 0 | } |
1850 | | |
1851 | 0 | if (!_rtmpsock->Failed() && _chunk_stream_id != 0) { |
1852 | | // SRS requires closeStream which is sent over this stream. |
1853 | 0 | butil::IOBuf req_buf1; |
1854 | 0 | { |
1855 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf1); |
1856 | 0 | AMFOutputStream ostream(&zc_stream); |
1857 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_CLOSE_STREAM, &ostream); |
1858 | 0 | WriteAMFUint32(0, &ostream); |
1859 | 0 | WriteAMFNull(&ostream); |
1860 | 0 | CHECK(ostream.good()); |
1861 | 0 | } |
1862 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage); |
1863 | 0 | msg1->header.message_length = req_buf1.size(); |
1864 | 0 | msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0; |
1865 | 0 | msg1->header.stream_id = _message_stream_id; |
1866 | 0 | msg1->chunk_stream_id = _chunk_stream_id; |
1867 | 0 | msg1->body = req_buf1; |
1868 | | |
1869 | | // Send deleteStream over the control stream. |
1870 | 0 | butil::IOBuf req_buf2; |
1871 | 0 | { |
1872 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf2); |
1873 | 0 | AMFOutputStream ostream(&zc_stream); |
1874 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_DELETE_STREAM, &ostream); |
1875 | 0 | WriteAMFUint32(0, &ostream); |
1876 | 0 | WriteAMFNull(&ostream); |
1877 | 0 | WriteAMFUint32(_message_stream_id, &ostream); |
1878 | 0 | CHECK(ostream.good()); |
1879 | 0 | } |
1880 | 0 | policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage( |
1881 | 0 | policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf2); |
1882 | 0 | msg1->next.reset(msg2); |
1883 | |
|
1884 | 0 | if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg1) != 0) { |
1885 | 0 | if (errno != EFAILEDSOCKET) { |
1886 | 0 | PLOG(WARNING) << "Fail to send closeStream/deleteStream to " |
1887 | 0 | << _rtmpsock->remote_side() << "[" |
1888 | 0 | << _message_stream_id << "]"; |
1889 | | // Close the connection to make sure the server-side knows the |
1890 | | // closing event, however this may terminate other streams over |
1891 | | // the connection as well. |
1892 | 0 | _rtmpsock->SetFailed(EFAILEDSOCKET, "Fail to send closeStream/deleteStream"); |
1893 | 0 | } |
1894 | 0 | } |
1895 | 0 | } |
1896 | 0 | policy::RtmpContext* ctx = |
1897 | 0 | static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context()); |
1898 | 0 | if (ctx != NULL) { |
1899 | 0 | if (!ctx->RemoveMessageStream(this)) { |
1900 | | // The stream is not registered yet. Is this normal? |
1901 | 0 | LOG(ERROR) << "Fail to remove stream_id=" << _message_stream_id; |
1902 | 0 | } |
1903 | 0 | } else { |
1904 | 0 | LOG(FATAL) << "RtmpContext of " << *_rtmpsock << " is NULL"; |
1905 | 0 | } |
1906 | 0 | if (_from_socketmap) { |
1907 | 0 | _client_impl->socket_map().Remove(SocketMapKey(_rtmpsock->remote_side()), |
1908 | 0 | _rtmpsock->id()); |
1909 | 0 | } else { |
1910 | 0 | _rtmpsock->ReleaseAdditionalReference(); |
1911 | 0 | } |
1912 | 0 | CallOnStop(); |
1913 | 0 | } |
1914 | | |
1915 | | RtmpPlayOptions::RtmpPlayOptions() |
1916 | | : start(-2) |
1917 | | , duration(-1) |
1918 | 0 | , reset(true) { |
1919 | 0 | } |
1920 | | |
1921 | 0 | int RtmpClientStream::Play(const RtmpPlayOptions& opt) { |
1922 | 0 | if (_rtmpsock == NULL) { |
1923 | 0 | errno = EPERM; |
1924 | 0 | return -1; |
1925 | 0 | } |
1926 | 0 | if (opt.stream_name.empty()) { |
1927 | 0 | LOG(ERROR) << "Empty stream_name"; |
1928 | 0 | errno = EINVAL; |
1929 | 0 | return -1; |
1930 | 0 | } |
1931 | 0 | if (_client_impl == NULL) { |
1932 | 0 | LOG(ERROR) << "The client stream is not created yet"; |
1933 | 0 | errno = EPERM; |
1934 | 0 | return -1; |
1935 | 0 | } |
1936 | 0 | butil::IOBuf req_buf; |
1937 | 0 | { |
1938 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
1939 | 0 | AMFOutputStream ostream(&zc_stream); |
1940 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_PLAY, &ostream); |
1941 | 0 | WriteAMFUint32(0, &ostream); |
1942 | 0 | WriteAMFNull(&ostream); |
1943 | 0 | WriteAMFString(opt.stream_name, &ostream); |
1944 | 0 | WriteAMFNumber(opt.start, &ostream); |
1945 | 0 | WriteAMFNumber(opt.duration, &ostream); |
1946 | 0 | WriteAMFBool(opt.reset, &ostream); |
1947 | 0 | CHECK(ostream.good()); |
1948 | 0 | } |
1949 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage); |
1950 | 0 | msg1->header.message_length = req_buf.size(); |
1951 | 0 | msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0; |
1952 | 0 | msg1->header.stream_id = _message_stream_id; |
1953 | 0 | msg1->chunk_stream_id = _chunk_stream_id; |
1954 | 0 | msg1->body = req_buf; |
1955 | |
|
1956 | 0 | if (_client_impl->options().buffer_length_ms > 0) { |
1957 | 0 | char data[10]; |
1958 | 0 | char* p = data; |
1959 | 0 | policy::WriteBigEndian2Bytes( |
1960 | 0 | &p, policy::RTMP_USER_CONTROL_EVENT_SET_BUFFER_LENGTH); |
1961 | 0 | policy::WriteBigEndian4Bytes(&p, stream_id()); |
1962 | 0 | policy::WriteBigEndian4Bytes(&p, _client_impl->options().buffer_length_ms); |
1963 | 0 | policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage( |
1964 | 0 | policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data)); |
1965 | 0 | msg1->next.reset(msg2); |
1966 | 0 | } |
1967 | | // FIXME(gejun): Do we need to SetChunkSize for play? |
1968 | | // if (_client_impl->options().chunk_size > policy::RTMP_INITIAL_CHUNK_SIZE) { |
1969 | | // if (SetChunkSize(_client_impl->options().chunk_size) != 0) { |
1970 | | // return -1; |
1971 | | // } |
1972 | | // } |
1973 | 0 | return _rtmpsock->Write(msg1); |
1974 | 0 | } |
1975 | | |
1976 | 0 | int RtmpClientStream::Play2(const RtmpPlay2Options& opt) { |
1977 | 0 | butil::IOBuf req_buf; |
1978 | 0 | { |
1979 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
1980 | 0 | AMFOutputStream ostream(&zc_stream); |
1981 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_PLAY2, &ostream); |
1982 | 0 | WriteAMFUint32(0, &ostream); |
1983 | 0 | WriteAMFNull(&ostream); |
1984 | 0 | WriteAMFObject(opt, &ostream); |
1985 | 0 | if (!ostream.good()) { |
1986 | 0 | LOG(ERROR) << "Fail to serialize play2 request"; |
1987 | 0 | errno = EINVAL; |
1988 | 0 | return -1; |
1989 | 0 | } |
1990 | 0 | } |
1991 | 0 | return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf); |
1992 | 0 | } |
1993 | | |
1994 | 0 | const char* RtmpPublishType2Str(RtmpPublishType type) { |
1995 | 0 | switch (type) { |
1996 | 0 | case RTMP_PUBLISH_RECORD: return "record"; |
1997 | 0 | case RTMP_PUBLISH_APPEND: return "append"; |
1998 | 0 | case RTMP_PUBLISH_LIVE: return "live"; |
1999 | 0 | } |
2000 | 0 | return "Unknown RtmpPublishType"; |
2001 | 0 | } |
2002 | | |
2003 | 0 | bool Str2RtmpPublishType(const butil::StringPiece& str, RtmpPublishType* type) { |
2004 | 0 | if (str == "record") { |
2005 | 0 | *type = RTMP_PUBLISH_RECORD; |
2006 | 0 | return true; |
2007 | 0 | } else if (str == "append") { |
2008 | 0 | *type = RTMP_PUBLISH_APPEND; |
2009 | 0 | return true; |
2010 | 0 | } else if (str == "live") { |
2011 | 0 | *type = RTMP_PUBLISH_LIVE; |
2012 | 0 | return true; |
2013 | 0 | } |
2014 | 0 | return false; |
2015 | 0 | } |
2016 | | |
2017 | | int RtmpClientStream::Publish(const butil::StringPiece& name, |
2018 | 0 | RtmpPublishType type) { |
2019 | 0 | butil::IOBuf req_buf; |
2020 | 0 | { |
2021 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
2022 | 0 | AMFOutputStream ostream(&zc_stream); |
2023 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_PUBLISH, &ostream); |
2024 | 0 | WriteAMFUint32(0, &ostream); |
2025 | 0 | WriteAMFNull(&ostream); |
2026 | 0 | WriteAMFString(name, &ostream); |
2027 | 0 | WriteAMFString(RtmpPublishType2Str(type), &ostream); |
2028 | 0 | CHECK(ostream.good()); |
2029 | 0 | } |
2030 | 0 | return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf); |
2031 | 0 | } |
2032 | | |
2033 | 0 | int RtmpClientStream::Seek(double offset_ms) { |
2034 | 0 | butil::IOBuf req_buf; |
2035 | 0 | { |
2036 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
2037 | 0 | AMFOutputStream ostream(&zc_stream); |
2038 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_SEEK, &ostream); |
2039 | 0 | WriteAMFUint32(0, &ostream); |
2040 | 0 | WriteAMFNull(&ostream); |
2041 | 0 | WriteAMFNumber(offset_ms, &ostream); |
2042 | 0 | CHECK(ostream.good()); |
2043 | 0 | } |
2044 | 0 | return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf); |
2045 | 0 | } |
2046 | | |
2047 | 0 | int RtmpClientStream::Pause(bool pause_or_unpause, double offset_ms) { |
2048 | 0 | butil::IOBuf req_buf; |
2049 | 0 | { |
2050 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
2051 | 0 | AMFOutputStream ostream(&zc_stream); |
2052 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_PAUSE, &ostream); |
2053 | 0 | WriteAMFUint32(0, &ostream); |
2054 | 0 | WriteAMFNull(&ostream); |
2055 | 0 | WriteAMFBool(pause_or_unpause, &ostream); |
2056 | 0 | WriteAMFNumber(offset_ms, &ostream); |
2057 | 0 | CHECK(ostream.good()); |
2058 | 0 | } |
2059 | 0 | return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf); |
2060 | 0 | } |
2061 | | |
2062 | 0 | void RtmpClientStream::OnStatus(const RtmpInfo& info) { |
2063 | 0 | if (info.level() == RTMP_INFO_LEVEL_ERROR) { |
2064 | 0 | LOG(WARNING) << remote_side() << '[' << stream_id() |
2065 | 0 | << "] " << info.code() << ": " << info.description(); |
2066 | 0 | return SignalError(); |
2067 | 0 | } else if (info.level() == RTMP_INFO_LEVEL_STATUS) { |
2068 | 0 | if ((!_options.play_name.empty() && |
2069 | 0 | info.code() == RTMP_STATUS_CODE_PLAY_START) || |
2070 | 0 | (!_options.publish_name.empty() && |
2071 | 0 | info.code() == RTMP_STATUS_CODE_PUBLISH_START)) { |
2072 | | // the memory fence makes sure that if _is_server_accepted is true, |
2073 | | // publish request must be sent (so that SendXXX functions can |
2074 | | // be enabled) |
2075 | 0 | _is_server_accepted.store(true, butil::memory_order_release); |
2076 | 0 | } |
2077 | 0 | } |
2078 | 0 | } |
2079 | | |
2080 | | RtmpClientStreamOptions::RtmpClientStreamOptions() |
2081 | | : share_connection(true) |
2082 | | , wait_until_play_or_publish_is_sent(false) |
2083 | | , create_stream_max_retry(3) |
2084 | 0 | , publish_type(RTMP_PUBLISH_LIVE) { |
2085 | 0 | } |
2086 | | |
2087 | | class OnClientStreamCreated : public google::protobuf::Closure { |
2088 | | public: |
2089 | | void Run(); // @Closure |
2090 | 0 | void CancelBeforeCallMethod() { delete this; } |
2091 | | |
2092 | | public: |
2093 | | Controller cntl; |
2094 | | // Hold a reference of stream to prevent it from destructing during an |
2095 | | // async Create(). |
2096 | | butil::intrusive_ptr<RtmpClientStream> stream; |
2097 | | }; |
2098 | | |
2099 | 0 | void OnClientStreamCreated::Run() { |
2100 | 0 | std::unique_ptr<OnClientStreamCreated> delete_self(this); |
2101 | 0 | if (cntl.Failed()) { |
2102 | 0 | LOG(WARNING) << "Fail to create stream=" << stream->rtmp_url() |
2103 | 0 | << ": " << cntl.ErrorText(); |
2104 | 0 | return; |
2105 | 0 | } |
2106 | 0 | if (stream->_created_stream_with_play_or_publish) { |
2107 | | // the server accepted the play/publish command packed in createStream |
2108 | 0 | return; |
2109 | 0 | } |
2110 | 0 | const RtmpClientStreamOptions& options = stream->options(); |
2111 | 0 | bool do_nothing = true; |
2112 | 0 | if (!options.play_name.empty()) { |
2113 | 0 | do_nothing = false; |
2114 | 0 | RtmpPlayOptions play_opt; |
2115 | 0 | play_opt.stream_name = options.play_name; |
2116 | 0 | if (stream->Play(play_opt) != 0) { |
2117 | 0 | LOG(WARNING) << "Fail to play " << options.play_name; |
2118 | 0 | return stream->SignalError(); |
2119 | 0 | } |
2120 | 0 | } |
2121 | 0 | if (!options.publish_name.empty()) { |
2122 | 0 | do_nothing = false; |
2123 | 0 | if (stream->Publish(options.publish_name, options.publish_type) != 0) { |
2124 | 0 | LOG(WARNING) << "Fail to publish " << stream->rtmp_url(); |
2125 | 0 | return stream->SignalError(); |
2126 | 0 | } |
2127 | 0 | } |
2128 | 0 | if (do_nothing) { |
2129 | 0 | LOG(ERROR) << "play_name and publish_name are both empty"; |
2130 | 0 | return stream->SignalError(); |
2131 | 0 | } |
2132 | 0 | } |
2133 | | |
2134 | | void RtmpClientStream::Init(const RtmpClient* client, |
2135 | 0 | const RtmpClientStreamOptions& options) { |
2136 | 0 | if (client->_impl == NULL) { |
2137 | 0 | LOG(FATAL) << "RtmpClient is not initialized"; |
2138 | 0 | return OnStopInternal(); |
2139 | 0 | } |
2140 | 0 | { |
2141 | 0 | std::unique_lock<butil::Mutex> mu(_state_mutex); |
2142 | 0 | if (_state == STATE_DESTROYING || _state == STATE_ERROR) { |
2143 | | // already Destroy()-ed or SignalError()-ed |
2144 | 0 | LOG(WARNING) << "RtmpClientStream=" << this << " was already " |
2145 | 0 | "Destroy()-ed, stop Init()"; |
2146 | 0 | return; |
2147 | 0 | } |
2148 | 0 | } |
2149 | 0 | _client_impl = client->_impl; |
2150 | 0 | _options = options; |
2151 | 0 | OnClientStreamCreated* done = new OnClientStreamCreated; |
2152 | 0 | done->stream.reset(this); |
2153 | 0 | done->cntl.set_stream_creator(this); |
2154 | 0 | done->cntl.set_connection_type(_options.share_connection ? |
2155 | 0 | CONNECTION_TYPE_SINGLE : |
2156 | 0 | CONNECTION_TYPE_SHORT); |
2157 | 0 | _from_socketmap = (done->cntl.connection_type() == CONNECTION_TYPE_SINGLE); |
2158 | 0 | done->cntl.set_max_retry(_options.create_stream_max_retry); |
2159 | 0 | if (_options.hash_code.has_been_set()) { |
2160 | 0 | done->cntl.set_request_code(_options.hash_code); |
2161 | 0 | } |
2162 | | |
2163 | | // Hack: we pass stream as response so that PackRtmpRequest can get |
2164 | | // the stream from controller. |
2165 | 0 | google::protobuf::Message* res = (google::protobuf::Message*)this; |
2166 | 0 | const CallId call_id = done->cntl.call_id(); |
2167 | 0 | { |
2168 | 0 | std::unique_lock<butil::Mutex> mu(_state_mutex); |
2169 | 0 | switch (_state) { |
2170 | 0 | case STATE_UNINITIALIZED: |
2171 | 0 | _state = STATE_CREATING; |
2172 | 0 | _create_stream_rpc_id = call_id; |
2173 | 0 | break; |
2174 | 0 | case STATE_CREATING: |
2175 | 0 | case STATE_CREATED: |
2176 | 0 | mu.unlock(); |
2177 | 0 | LOG(ERROR) << "RtmpClientStream::Init() is called by multiple " |
2178 | 0 | "threads simultaneously"; |
2179 | 0 | return done->CancelBeforeCallMethod(); |
2180 | 0 | case STATE_ERROR: |
2181 | 0 | case STATE_DESTROYING: |
2182 | 0 | mu.unlock(); |
2183 | 0 | return done->CancelBeforeCallMethod(); |
2184 | 0 | } |
2185 | 0 | } |
2186 | 0 | _client_impl->_chan.CallMethod(NULL, &done->cntl, NULL, res, done); |
2187 | 0 | if (options.wait_until_play_or_publish_is_sent) { |
2188 | 0 | Join(call_id); |
2189 | 0 | } |
2190 | 0 | } |
2191 | | |
2192 | 0 | std::string RtmpClientStream::rtmp_url() const { |
2193 | 0 | if (_client_impl == NULL) { |
2194 | 0 | return std::string(); |
2195 | 0 | } |
2196 | 0 | butil::StringPiece tcurl = _client_impl->options().tcUrl; |
2197 | 0 | butil::StringPiece stream_name = _options.stream_name(); |
2198 | 0 | std::string result; |
2199 | 0 | result.reserve(tcurl.size() + 1 + stream_name.size()); |
2200 | 0 | result.append(tcurl.data(), tcurl.size()); |
2201 | 0 | result.push_back('/'); |
2202 | 0 | result.append(stream_name.data(), stream_name.size()); |
2203 | 0 | return result; |
2204 | 0 | } |
2205 | | |
2206 | | // ========= RtmpRetryingClientStream ============ |
2207 | | |
2208 | | RtmpRetryingClientStreamOptions::RtmpRetryingClientStreamOptions() |
2209 | | : retry_interval_ms(1000) |
2210 | | , max_retry_duration_ms(-1) |
2211 | | , fast_retry_count(2) |
2212 | 0 | , quit_when_no_data_ever(true) { |
2213 | 0 | } |
2214 | | |
2215 | | RtmpRetryingClientStream::RtmpRetryingClientStream() |
2216 | | : RtmpStreamBase(true) |
2217 | | , _destroying(false) |
2218 | | , _called_on_stop(false) |
2219 | | , _changed_stream(false) |
2220 | | , _has_timer_ever(false) |
2221 | | , _is_server_accepted_ever(false) |
2222 | | , _num_fast_retries(0) |
2223 | | , _last_creation_time_us(0) |
2224 | | , _last_retry_start_time_us(0) |
2225 | | , _create_timer_id(0) |
2226 | 0 | , _sub_stream_creator(NULL) { |
2227 | 0 | get_rtmp_bvars()->retrying_client_stream_count << 1; |
2228 | 0 | _self_ref.reset(this); |
2229 | 0 | } |
2230 | | |
2231 | 0 | RtmpRetryingClientStream::~RtmpRetryingClientStream() { |
2232 | 0 | delete _sub_stream_creator; |
2233 | 0 | _sub_stream_creator = NULL; |
2234 | 0 | get_rtmp_bvars()->retrying_client_stream_count << -1; |
2235 | 0 | } |
2236 | | |
2237 | 0 | void RtmpRetryingClientStream::CallOnStopIfNeeded() { |
2238 | | // CallOnStop uses locks, we don't need memory fence on _called_on_stop, |
2239 | | // atomic ops is enough. |
2240 | 0 | if (!_called_on_stop.load(butil::memory_order_relaxed) && |
2241 | 0 | !_called_on_stop.exchange(true, butil::memory_order_relaxed)) { |
2242 | 0 | CallOnStop(); |
2243 | 0 | } |
2244 | 0 | } |
2245 | | |
2246 | 0 | void RtmpRetryingClientStream::Destroy() { |
2247 | 0 | if (_destroying.exchange(true, butil::memory_order_relaxed)) { |
2248 | | // Destroy() was already called. |
2249 | 0 | return; |
2250 | 0 | } |
2251 | | |
2252 | | // Make sure _self_ref is released before quiting this function. |
2253 | | // Notice that _self_ref.reset(NULL) is wrong because it may destructs |
2254 | | // this object immediately. |
2255 | 0 | butil::intrusive_ptr<RtmpRetryingClientStream> self_ref; |
2256 | 0 | _self_ref.swap(self_ref); |
2257 | |
|
2258 | 0 | butil::intrusive_ptr<RtmpStreamBase> old_sub_stream; |
2259 | 0 | { |
2260 | 0 | BAIDU_SCOPED_LOCK(_stream_mutex); |
2261 | | // swap instead of reset(NULL) to make the stream destructed |
2262 | | // outside _stream_mutex. |
2263 | 0 | _using_sub_stream.swap(old_sub_stream); |
2264 | 0 | } |
2265 | 0 | if (old_sub_stream) { |
2266 | 0 | old_sub_stream->Destroy(); |
2267 | 0 | } |
2268 | | |
2269 | 0 | if (_has_timer_ever) { |
2270 | 0 | if (bthread_timer_del(_create_timer_id) == 0) { |
2271 | | // The callback is not run yet. Remove the additional ref added |
2272 | | // before creating the timer. |
2273 | 0 | butil::intrusive_ptr<RtmpRetryingClientStream> deref(this, false); |
2274 | 0 | } |
2275 | 0 | } |
2276 | 0 | return CallOnStopIfNeeded(); |
2277 | 0 | } |
2278 | | |
2279 | | void RtmpRetryingClientStream::Init( |
2280 | | SubStreamCreator* sub_stream_creator, |
2281 | 0 | const RtmpRetryingClientStreamOptions& options) { |
2282 | 0 | if (sub_stream_creator == NULL) { |
2283 | 0 | LOG(ERROR) << "sub_stream_creator is NULL"; |
2284 | 0 | return CallOnStopIfNeeded(); |
2285 | 0 | } |
2286 | 0 | _sub_stream_creator = sub_stream_creator; |
2287 | 0 | if (_destroying.load(butil::memory_order_relaxed)) { |
2288 | 0 | LOG(WARNING) << "RtmpRetryingClientStream=" << this << " was already " |
2289 | 0 | "Destroy()-ed, stop Init()"; |
2290 | 0 | return; |
2291 | 0 | } |
2292 | 0 | _options = options; |
2293 | | // retrying stream does not support this option. |
2294 | 0 | _options.wait_until_play_or_publish_is_sent = false; |
2295 | 0 | _last_retry_start_time_us = butil::gettimeofday_us(); |
2296 | 0 | Recreate(); |
2297 | 0 | } |
2298 | | |
2299 | 0 | void RetryingClientMessageHandler::OnPlayable() { |
2300 | 0 | _parent->OnPlayable(); |
2301 | 0 | } |
2302 | | |
2303 | 0 | void RetryingClientMessageHandler::OnUserData(void* msg) { |
2304 | 0 | _parent->CallOnUserData(msg); |
2305 | 0 | } |
2306 | | |
2307 | 0 | void RetryingClientMessageHandler::OnCuePoint(brpc::RtmpCuePoint* cuepoint) { |
2308 | 0 | _parent->CallOnCuePoint(cuepoint); |
2309 | 0 | } |
2310 | | |
2311 | 0 | void RetryingClientMessageHandler::OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) { |
2312 | 0 | _parent->CallOnMetaData(metadata, name); |
2313 | 0 | } |
2314 | | |
2315 | 0 | void RetryingClientMessageHandler::OnAudioMessage(brpc::RtmpAudioMessage* msg) { |
2316 | 0 | _parent->CallOnAudioMessage(msg); |
2317 | 0 | } |
2318 | | |
2319 | 0 | void RetryingClientMessageHandler::OnVideoMessage(brpc::RtmpVideoMessage* msg) { |
2320 | 0 | _parent->CallOnVideoMessage(msg); |
2321 | 0 | } |
2322 | | |
2323 | 0 | void RetryingClientMessageHandler::OnSharedObjectMessage(RtmpSharedObjectMessage* msg) { |
2324 | 0 | _parent->CallOnSharedObjectMessage(msg); |
2325 | 0 | } |
2326 | | |
2327 | 0 | void RetryingClientMessageHandler::OnSubStreamStop(RtmpStreamBase* sub_stream) { |
2328 | 0 | _parent->OnSubStreamStop(sub_stream); |
2329 | 0 | } |
2330 | | |
2331 | | RetryingClientMessageHandler::RetryingClientMessageHandler(RtmpRetryingClientStream* parent) |
2332 | 0 | : _parent(parent) {} |
2333 | | |
2334 | 0 | void RtmpRetryingClientStream::Recreate() { |
2335 | 0 | butil::intrusive_ptr<RtmpStreamBase> sub_stream; |
2336 | 0 | _sub_stream_creator->NewSubStream(new RetryingClientMessageHandler(this), &sub_stream); |
2337 | 0 | butil::intrusive_ptr<RtmpStreamBase> old_sub_stream; |
2338 | 0 | bool destroying = false; |
2339 | 0 | { |
2340 | 0 | BAIDU_SCOPED_LOCK(_stream_mutex); |
2341 | | // Need to check _destroying to avoid setting the new sub_stream to a |
2342 | | // destroying retrying stream. |
2343 | | // Note: the load of _destroying and the setting of _using_sub_stream |
2344 | | // must be in the same lock, otherwise current bthread may be scheduled |
2345 | | // and Destroy() may be called, making new sub_stream leaked. |
2346 | 0 | destroying = _destroying.load(butil::memory_order_relaxed); |
2347 | 0 | if (!destroying) { |
2348 | 0 | _using_sub_stream.swap(old_sub_stream); |
2349 | 0 | _using_sub_stream = sub_stream; |
2350 | 0 | _changed_stream = true; |
2351 | 0 | } |
2352 | 0 | } |
2353 | 0 | if (old_sub_stream) { |
2354 | 0 | old_sub_stream->Destroy(); |
2355 | 0 | } |
2356 | 0 | if (destroying) { |
2357 | 0 | sub_stream->Destroy(); |
2358 | 0 | return; |
2359 | 0 | } |
2360 | 0 | _last_creation_time_us = butil::gettimeofday_us(); |
2361 | | // If Init() of sub_stream is called before setting _using_sub_stream, |
2362 | | // OnStop() may happen before _using_sub_stream is set and the stopped |
2363 | | // stream is wrongly left in the variable. |
2364 | | |
2365 | 0 | _sub_stream_creator->LaunchSubStream(sub_stream.get(), &_options); |
2366 | 0 | } |
2367 | | |
2368 | 0 | void RtmpRetryingClientStream::OnRecreateTimer(void* arg) { |
2369 | | // Hold the referenced stream. |
2370 | 0 | butil::intrusive_ptr<RtmpRetryingClientStream> ptr( |
2371 | 0 | static_cast<RtmpRetryingClientStream*>(arg), false/*not add ref*/); |
2372 | 0 | ptr->Recreate(); |
2373 | 0 | } |
2374 | | |
2375 | 0 | void RtmpRetryingClientStream::OnSubStreamStop(RtmpStreamBase* sub_stream) { |
2376 | | // Make sure the sub_stream is destroyed after this function. |
2377 | 0 | DestroyingPtr<RtmpStreamBase> sub_stream_guard(sub_stream); |
2378 | | |
2379 | 0 | butil::intrusive_ptr<RtmpStreamBase> removed_sub_stream; |
2380 | 0 | { |
2381 | 0 | BAIDU_SCOPED_LOCK(_stream_mutex); |
2382 | 0 | if (sub_stream == _using_sub_stream) { |
2383 | 0 | _using_sub_stream.swap(removed_sub_stream); |
2384 | 0 | } |
2385 | 0 | } |
2386 | 0 | if (removed_sub_stream == NULL || |
2387 | 0 | _destroying.load(butil::memory_order_relaxed) || |
2388 | 0 | _called_on_stop.load(butil::memory_order_relaxed)) { |
2389 | 0 | return; |
2390 | 0 | } |
2391 | | // Update _is_server_accepted_ever |
2392 | 0 | if (sub_stream->is_server_accepted()) { |
2393 | 0 | _is_server_accepted_ever = true; |
2394 | 0 | } |
2395 | | |
2396 | 0 | if (_options.max_retry_duration_ms == 0) { |
2397 | 0 | return CallOnStopIfNeeded(); |
2398 | 0 | } |
2399 | | // If the sub_stream has data ever, count this retry as the beginning |
2400 | | // of RtmpRetryingClientStreamOptions.max_retry_duration_ms. |
2401 | 0 | if ((!_options.play_name.empty() && sub_stream->has_data_ever()) || |
2402 | 0 | (!_options.publish_name.empty() && sub_stream->is_server_accepted())) { |
2403 | 0 | const int64_t now = butil::gettimeofday_us(); |
2404 | 0 | if (now >= _last_retry_start_time_us + |
2405 | 0 | 3 * _options.retry_interval_ms * 1000L) { |
2406 | | // re-enable fast retries when the interval is long enough. |
2407 | | // `3' is just a randomly-chosen (small) number. |
2408 | 0 | _num_fast_retries = 0; |
2409 | 0 | } |
2410 | 0 | _last_retry_start_time_us = now; |
2411 | 0 | } |
2412 | | // Check max duration. Notice that this branch cannot be moved forward |
2413 | | // above branch which may update _last_retry_start_time_us |
2414 | 0 | if (_options.max_retry_duration_ms > 0 && |
2415 | 0 | butil::gettimeofday_us() > |
2416 | 0 | (_last_retry_start_time_us + _options.max_retry_duration_ms * 1000L)) { |
2417 | | // exceed the duration, stop retrying. |
2418 | 0 | return CallOnStopIfNeeded(); |
2419 | 0 | } |
2420 | 0 | if (_num_fast_retries < _options.fast_retry_count) { |
2421 | 0 | ++_num_fast_retries; |
2422 | | // Retry immediately for several times. Works for scenarios like: |
2423 | | // restarting servers, occasional connection lost etc... |
2424 | 0 | return Recreate(); |
2425 | 0 | } |
2426 | 0 | if (_options.quit_when_no_data_ever && |
2427 | 0 | ((!_options.play_name.empty() && !has_data_ever()) || |
2428 | 0 | (!_options.publish_name.empty() && !_is_server_accepted_ever))) { |
2429 | | // Stop retrying when created playing streams never have data or |
2430 | | // publishing streams were never accepted. It's very likely that |
2431 | | // continuing retrying does not make sense. |
2432 | 0 | return CallOnStopIfNeeded(); |
2433 | 0 | } |
2434 | 0 | const int64_t wait_us = _last_creation_time_us + |
2435 | 0 | _options.retry_interval_ms * 1000L - butil::gettimeofday_us(); |
2436 | 0 | if (wait_us > 0) { |
2437 | | // retry is too frequent, schedule the retry. |
2438 | | // Add a ref for OnRecreateTimer which does deref. |
2439 | 0 | butil::intrusive_ptr<RtmpRetryingClientStream>(this).detach(); |
2440 | 0 | if (bthread_timer_add(&_create_timer_id, |
2441 | 0 | butil::microseconds_from_now(wait_us), |
2442 | 0 | OnRecreateTimer, this) != 0) { |
2443 | 0 | LOG(ERROR) << "Fail to create timer"; |
2444 | 0 | return CallOnStopIfNeeded(); |
2445 | 0 | } |
2446 | 0 | _has_timer_ever = true; |
2447 | 0 | } else { |
2448 | 0 | Recreate(); |
2449 | 0 | } |
2450 | 0 | } |
2451 | | |
2452 | | int RtmpRetryingClientStream::AcquireStreamToSend( |
2453 | 0 | butil::intrusive_ptr<RtmpStreamBase>* ptr) { |
2454 | 0 | BAIDU_SCOPED_LOCK(_stream_mutex); |
2455 | 0 | if (!_using_sub_stream) { |
2456 | 0 | errno = EPERM; |
2457 | 0 | return -1; |
2458 | 0 | } |
2459 | 0 | if (!_using_sub_stream->is_server_accepted()) { |
2460 | | // not published yet. |
2461 | 0 | errno = EPERM; |
2462 | 0 | return -1; |
2463 | 0 | } |
2464 | 0 | if (_changed_stream) { |
2465 | 0 | _changed_stream = false; |
2466 | 0 | errno = ERTMPPUBLISHABLE; |
2467 | 0 | return -1; |
2468 | 0 | } |
2469 | 0 | *ptr = _using_sub_stream; |
2470 | 0 | return 0; |
2471 | 0 | } |
2472 | | |
2473 | 0 | int RtmpRetryingClientStream::SendCuePoint(const RtmpCuePoint& obj) { |
2474 | 0 | butil::intrusive_ptr<RtmpStreamBase> ptr; |
2475 | 0 | if (AcquireStreamToSend(&ptr) != 0) { |
2476 | 0 | return -1; |
2477 | 0 | } |
2478 | 0 | return ptr->SendCuePoint(obj); |
2479 | 0 | } |
2480 | | |
2481 | 0 | int RtmpRetryingClientStream::SendMetaData(const RtmpMetaData& obj, const butil::StringPiece& name) { |
2482 | 0 | butil::intrusive_ptr<RtmpStreamBase> ptr; |
2483 | 0 | if (AcquireStreamToSend(&ptr) != 0) { |
2484 | 0 | return -1; |
2485 | 0 | } |
2486 | 0 | return ptr->SendMetaData(obj, name); |
2487 | 0 | } |
2488 | | |
2489 | | int RtmpRetryingClientStream::SendSharedObjectMessage( |
2490 | 0 | const RtmpSharedObjectMessage& msg) { |
2491 | 0 | butil::intrusive_ptr<RtmpStreamBase> ptr; |
2492 | 0 | if (AcquireStreamToSend(&ptr) != 0) { |
2493 | 0 | return -1; |
2494 | 0 | } |
2495 | 0 | return ptr->SendSharedObjectMessage(msg); |
2496 | 0 | } |
2497 | | |
2498 | 0 | int RtmpRetryingClientStream::SendAudioMessage(const RtmpAudioMessage& msg) { |
2499 | 0 | butil::intrusive_ptr<RtmpStreamBase> ptr; |
2500 | 0 | if (AcquireStreamToSend(&ptr) != 0) { |
2501 | 0 | return -1; |
2502 | 0 | } |
2503 | 0 | return ptr->SendAudioMessage(msg); |
2504 | 0 | } |
2505 | | |
2506 | 0 | int RtmpRetryingClientStream::SendAACMessage(const RtmpAACMessage& msg) { |
2507 | 0 | butil::intrusive_ptr<RtmpStreamBase> ptr; |
2508 | 0 | if (AcquireStreamToSend(&ptr) != 0) { |
2509 | 0 | return -1; |
2510 | 0 | } |
2511 | 0 | return ptr->SendAACMessage(msg); |
2512 | 0 | } |
2513 | | |
2514 | 0 | int RtmpRetryingClientStream::SendVideoMessage(const RtmpVideoMessage& msg) { |
2515 | 0 | butil::intrusive_ptr<RtmpStreamBase> ptr; |
2516 | 0 | if (AcquireStreamToSend(&ptr) != 0) { |
2517 | 0 | return -1; |
2518 | 0 | } |
2519 | 0 | return ptr->SendVideoMessage(msg); |
2520 | 0 | } |
2521 | | |
2522 | 0 | int RtmpRetryingClientStream::SendAVCMessage(const RtmpAVCMessage& msg) { |
2523 | 0 | butil::intrusive_ptr<RtmpStreamBase> ptr; |
2524 | 0 | if (AcquireStreamToSend(&ptr) != 0) { |
2525 | 0 | return -1; |
2526 | 0 | } |
2527 | 0 | return ptr->SendAVCMessage(msg); |
2528 | 0 | } |
2529 | | |
2530 | 0 | void RtmpRetryingClientStream::StopCurrentStream() { |
2531 | 0 | butil::intrusive_ptr<RtmpStreamBase> sub_stream; |
2532 | 0 | { |
2533 | 0 | BAIDU_SCOPED_LOCK(_stream_mutex); |
2534 | 0 | sub_stream = _using_sub_stream; |
2535 | 0 | } |
2536 | 0 | if (sub_stream) { |
2537 | 0 | sub_stream->SignalError(); |
2538 | 0 | } |
2539 | 0 | } |
2540 | | |
2541 | 0 | void RtmpRetryingClientStream::OnPlayable() {} |
2542 | | |
2543 | 0 | butil::EndPoint RtmpRetryingClientStream::remote_side() const { |
2544 | 0 | { |
2545 | 0 | BAIDU_SCOPED_LOCK(_stream_mutex); |
2546 | 0 | if (_using_sub_stream) { |
2547 | 0 | return _using_sub_stream->remote_side(); |
2548 | 0 | } |
2549 | 0 | } |
2550 | 0 | return butil::EndPoint(); |
2551 | 0 | } |
2552 | | |
2553 | 0 | butil::EndPoint RtmpRetryingClientStream::local_side() const { |
2554 | 0 | { |
2555 | 0 | BAIDU_SCOPED_LOCK(_stream_mutex); |
2556 | 0 | if (_using_sub_stream) { |
2557 | 0 | return _using_sub_stream->local_side(); |
2558 | 0 | } |
2559 | 0 | } |
2560 | 0 | return butil::EndPoint(); |
2561 | 0 | } |
2562 | | |
2563 | | // =========== RtmpService =============== |
2564 | 0 | void RtmpService::OnPingResponse(const butil::EndPoint&, uint32_t) { |
2565 | | // TODO: put into some bvars? |
2566 | 0 | } |
2567 | | |
2568 | | RtmpServerStream::RtmpServerStream() |
2569 | | : RtmpStreamBase(false) |
2570 | | , _client_supports_stream_multiplexing(false) |
2571 | | , _is_publish(false) |
2572 | 0 | , _onfail_id(INVALID_BTHREAD_ID) { |
2573 | 0 | get_rtmp_bvars()->server_stream_count << 1; |
2574 | 0 | } |
2575 | | |
2576 | 0 | RtmpServerStream::~RtmpServerStream() { |
2577 | 0 | get_rtmp_bvars()->server_stream_count << -1; |
2578 | 0 | } |
2579 | | |
2580 | 0 | void RtmpServerStream::Destroy() { |
2581 | 0 | CHECK(false) << "You're not supposed to call Destroy() for server-side streams"; |
2582 | 0 | } |
2583 | | |
2584 | | void RtmpServerStream::OnPlay(const RtmpPlayOptions& opt, |
2585 | | butil::Status* status, |
2586 | 0 | google::protobuf::Closure* done) { |
2587 | 0 | ClosureGuard done_guard(done); |
2588 | 0 | status->set_error(EPERM, "%s[%u] ignored play{stream_name=%s start=%f" |
2589 | 0 | " duration=%f reset=%d}", |
2590 | 0 | butil::endpoint2str(remote_side()).c_str(), stream_id(), |
2591 | 0 | opt.stream_name.c_str(), opt.start, opt.duration, |
2592 | 0 | (int)opt.reset); |
2593 | 0 | } |
2594 | | |
2595 | 0 | void RtmpServerStream::OnPlay2(const RtmpPlay2Options& opt) { |
2596 | 0 | LOG(ERROR) << remote_side() << '[' << stream_id() |
2597 | 0 | << "] ignored play2{" << opt.ShortDebugString() << '}'; |
2598 | 0 | } |
2599 | | |
2600 | | void RtmpServerStream::OnPublish(const std::string& name, |
2601 | | RtmpPublishType type, |
2602 | | butil::Status* status, |
2603 | 0 | google::protobuf::Closure* done) { |
2604 | 0 | ClosureGuard done_guard(done); |
2605 | 0 | status->set_error(EPERM, "%s[%u] ignored publish{stream_name=%s type=%s}", |
2606 | 0 | butil::endpoint2str(remote_side()).c_str(), stream_id(), |
2607 | 0 | name.c_str(), RtmpPublishType2Str(type)); |
2608 | 0 | } |
2609 | | |
2610 | 0 | int RtmpServerStream::OnSeek(double offset_ms) { |
2611 | 0 | LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored seek(" |
2612 | 0 | << offset_ms << ")"; |
2613 | 0 | return -1; |
2614 | 0 | } |
2615 | | |
2616 | 0 | int RtmpServerStream::OnPause(bool pause, double offset_ms) { |
2617 | 0 | LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " |
2618 | 0 | << (pause ? "pause" : "unpause") |
2619 | 0 | << "(offset_ms=" << offset_ms << ")"; |
2620 | 0 | return -1; |
2621 | 0 | } |
2622 | | |
2623 | 0 | void RtmpServerStream::OnSetBufferLength(uint32_t /*buffer_length_ms*/) {} |
2624 | | |
2625 | 0 | int RtmpServerStream::SendStopMessage(const butil::StringPiece& error_desc) { |
2626 | 0 | if (_rtmpsock == NULL) { |
2627 | 0 | errno = EINVAL; |
2628 | 0 | return -1; |
2629 | 0 | } |
2630 | 0 | if (FLAGS_rtmp_server_close_connection_on_error && |
2631 | 0 | !_client_supports_stream_multiplexing) { |
2632 | 0 | _rtmpsock->SetFailed(EFAILEDSOCKET, "Close connection because %.*s", |
2633 | 0 | (int)error_desc.size(), error_desc.data()); |
2634 | | // The purpose is to close the connection, no matter what SetFailed() |
2635 | | // returns, the operation should be done. |
2636 | 0 | LOG_IF(WARNING, FLAGS_log_error_text) |
2637 | 0 | << "Close connection because " << error_desc; |
2638 | 0 | return 0; |
2639 | 0 | } |
2640 | | |
2641 | | // Send StreamNotFound error to make the client close connections. |
2642 | | // Works for flashplayer and ffplay(not started playing), not work for SRS |
2643 | | // and ffplay(started playing) |
2644 | 0 | butil::IOBuf req_buf; |
2645 | 0 | RtmpInfo info; |
2646 | 0 | { |
2647 | 0 | butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); |
2648 | 0 | AMFOutputStream ostream(&zc_stream); |
2649 | 0 | WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream); |
2650 | 0 | WriteAMFUint32(0, &ostream); |
2651 | 0 | WriteAMFNull(&ostream); |
2652 | 0 | if (_is_publish) { |
2653 | | // NetStream.Publish.Rejected does not work for ffmpeg, works for OBS. |
2654 | | // NetStream.Publish.BadName does not work for OBS. |
2655 | | // NetStream.Play.StreamNotFound is not accurate but works for both |
2656 | | // ffmpeg and OBS. |
2657 | 0 | info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND); |
2658 | 0 | } else { |
2659 | 0 | info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND); |
2660 | 0 | } |
2661 | 0 | info.set_level(RTMP_INFO_LEVEL_ERROR); |
2662 | 0 | if (!error_desc.empty()) { |
2663 | 0 | info.set_description(error_desc.as_string()); |
2664 | 0 | } |
2665 | 0 | WriteAMFObject(info, &ostream); |
2666 | 0 | } |
2667 | 0 | SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage); |
2668 | 0 | msg->header.message_length = req_buf.size(); |
2669 | 0 | msg->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0; |
2670 | 0 | msg->header.stream_id = _message_stream_id; |
2671 | 0 | msg->chunk_stream_id = _chunk_stream_id; |
2672 | 0 | msg->body = req_buf; |
2673 | | |
2674 | 0 | if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg) != 0) { |
2675 | 0 | PLOG_IF(WARNING, errno != EFAILEDSOCKET) |
2676 | 0 | << _rtmpsock->remote_side() << '[' << _message_stream_id |
2677 | 0 | << "]: Fail to send " << info.code() << ": " << error_desc; |
2678 | 0 | return -1; |
2679 | 0 | } |
2680 | 0 | LOG_IF(WARNING, FLAGS_log_error_text) |
2681 | 0 | << _rtmpsock->remote_side() << '[' << _message_stream_id << "]: Sent " |
2682 | 0 | << info.code() << ' ' << error_desc; |
2683 | 0 | return 0; |
2684 | 0 | } |
2685 | | |
2686 | | // Call this method to send StreamDry to the client. |
2687 | | // Returns 0 on success, -1 otherwise. |
2688 | 0 | int RtmpServerStream::SendStreamDry() { |
2689 | 0 | char data[6]; |
2690 | 0 | char* p = data; |
2691 | 0 | policy::WriteBigEndian2Bytes(&p, policy::RTMP_USER_CONTROL_EVENT_STREAM_DRY); |
2692 | 0 | policy::WriteBigEndian4Bytes(&p, stream_id()); |
2693 | 0 | return SendControlMessage(policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data)); |
2694 | 0 | } |
2695 | | |
2696 | 0 | int RtmpServerStream::RunOnFailed(bthread_id_t id, void* data, int) { |
2697 | 0 | butil::intrusive_ptr<RtmpServerStream> stream( |
2698 | 0 | static_cast<RtmpServerStream*>(data), false); |
2699 | 0 | CHECK(stream->_rtmpsock); |
2700 | 0 | stream->OnStopInternal(); |
2701 | 0 | bthread_id_unlock_and_destroy(id); |
2702 | 0 | return 0; |
2703 | 0 | } |
2704 | | |
2705 | 0 | void RtmpServerStream::OnStopInternal() { |
2706 | 0 | if (_rtmpsock == NULL) { |
2707 | 0 | return CallOnStop(); |
2708 | 0 | } |
2709 | 0 | policy::RtmpContext* ctx = |
2710 | 0 | static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context()); |
2711 | 0 | if (ctx == NULL) { |
2712 | 0 | LOG(FATAL) << _rtmpsock->remote_side() << ": RtmpContext of " |
2713 | 0 | << *_rtmpsock << " is NULL"; |
2714 | 0 | return CallOnStop(); |
2715 | 0 | } |
2716 | 0 | if (ctx->RemoveMessageStream(this)) { |
2717 | 0 | return CallOnStop(); |
2718 | 0 | } |
2719 | 0 | } |
2720 | | |
2721 | 541 | butil::StringPiece RemoveRtmpPrefix(const butil::StringPiece& url_in) { |
2722 | 541 | if (!url_in.starts_with("rtmp://")) { |
2723 | 508 | return url_in; |
2724 | 508 | } |
2725 | 33 | butil::StringPiece url = url_in; |
2726 | 33 | size_t i = 7; |
2727 | 238 | for (; i < url.size() && url[i] == '/'; ++i); |
2728 | 33 | url.remove_prefix(i); |
2729 | 33 | return url; |
2730 | 541 | } |
2731 | | |
2732 | 0 | butil::StringPiece RemoveProtocolPrefix(const butil::StringPiece& url_in) { |
2733 | 0 | size_t proto_pos = url_in.find("://"); |
2734 | 0 | if (proto_pos == butil::StringPiece::npos) { |
2735 | 0 | return url_in; |
2736 | 0 | } |
2737 | 0 | butil::StringPiece url = url_in; |
2738 | 0 | size_t i = proto_pos + 3; |
2739 | 0 | for (; i < url.size() && url[i] == '/'; ++i); |
2740 | 0 | url.remove_prefix(i); |
2741 | 0 | return url; |
2742 | 0 | } |
2743 | | |
2744 | | void ParseRtmpHostAndPort(const butil::StringPiece& host_and_port, |
2745 | | butil::StringPiece* host, |
2746 | 541 | butil::StringPiece* port) { |
2747 | 541 | size_t colon_pos = host_and_port.find(':'); |
2748 | 541 | if (colon_pos == butil::StringPiece::npos) { |
2749 | 419 | if (host) { |
2750 | 419 | *host = host_and_port; |
2751 | 419 | } |
2752 | 419 | if (port) { |
2753 | 419 | *port = "1935"; |
2754 | 419 | } |
2755 | 419 | } else { |
2756 | 122 | if (host) { |
2757 | 122 | *host = host_and_port.substr(0, colon_pos); |
2758 | 122 | } |
2759 | 122 | if (port) { |
2760 | 122 | *port = host_and_port.substr(colon_pos + 1); |
2761 | 122 | } |
2762 | 122 | } |
2763 | 541 | } |
2764 | | |
2765 | | butil::StringPiece RemoveQueryStrings(const butil::StringPiece& stream_name_in, |
2766 | 0 | butil::StringPiece* query_strings) { |
2767 | 0 | const size_t qm_pos = stream_name_in.find('?'); |
2768 | 0 | if (qm_pos == butil::StringPiece::npos) { |
2769 | 0 | if (query_strings) { |
2770 | 0 | query_strings->clear(); |
2771 | 0 | } |
2772 | 0 | return stream_name_in; |
2773 | 0 | } else { |
2774 | 0 | if (query_strings) { |
2775 | 0 | *query_strings = stream_name_in.substr(qm_pos + 1); |
2776 | 0 | } |
2777 | 0 | return stream_name_in.substr(0, qm_pos); |
2778 | 0 | } |
2779 | 0 | } |
2780 | | |
2781 | | // Split vhost from *app in forms of "APP?vhost=..." and overwrite *host. |
2782 | | static void SplitVHostFromApp(const butil::StringPiece& app_and_vhost, |
2783 | | butil::StringPiece* app, |
2784 | 332 | butil::StringPiece* vhost) { |
2785 | 332 | const size_t q_pos = app_and_vhost.find('?'); |
2786 | 332 | if (q_pos == butil::StringPiece::npos) { |
2787 | 138 | if (app) { |
2788 | 138 | *app = app_and_vhost; |
2789 | 138 | } |
2790 | 138 | if (vhost) { |
2791 | 138 | vhost->clear(); |
2792 | 138 | } |
2793 | 138 | return; |
2794 | 138 | } |
2795 | | |
2796 | 194 | if (app) { |
2797 | 194 | *app = app_and_vhost.substr(0, q_pos); |
2798 | 194 | } |
2799 | 194 | if (vhost) { |
2800 | 194 | butil::StringPiece qstr = app_and_vhost.substr(q_pos + 1); |
2801 | 194 | butil::StringSplitter sp(qstr.data(), qstr.data() + qstr.size(), '&'); |
2802 | 1.55k | for (; sp; ++sp) { |
2803 | 1.41k | butil::StringPiece field(sp.field(), sp.length()); |
2804 | 1.41k | if (field.starts_with("vhost=")) { |
2805 | 53 | *vhost = field.substr(6); |
2806 | | // vhost cannot have port. |
2807 | 53 | const size_t colon_pos = vhost->find_last_of(':'); |
2808 | 53 | if (colon_pos != butil::StringPiece::npos) { |
2809 | 20 | vhost->remove_suffix(vhost->size() - colon_pos); |
2810 | 20 | } |
2811 | 53 | return; |
2812 | 53 | } |
2813 | 1.41k | } |
2814 | 141 | vhost->clear(); |
2815 | 141 | } |
2816 | 194 | } |
2817 | | |
2818 | | void ParseRtmpURL(const butil::StringPiece& rtmp_url_in, |
2819 | | butil::StringPiece* host, |
2820 | | butil::StringPiece* vhost, |
2821 | | butil::StringPiece* port, |
2822 | | butil::StringPiece* app, |
2823 | 541 | butil::StringPiece* stream_name) { |
2824 | 541 | if (stream_name) { |
2825 | 541 | stream_name->clear(); |
2826 | 541 | } |
2827 | 541 | butil::StringPiece rtmp_url = RemoveRtmpPrefix(rtmp_url_in); |
2828 | 541 | size_t slash1_pos = rtmp_url.find_first_of('/'); |
2829 | 541 | if (slash1_pos == butil::StringPiece::npos) { |
2830 | 209 | if (host || port) { |
2831 | 209 | ParseRtmpHostAndPort(rtmp_url, host, port); |
2832 | 209 | } |
2833 | 209 | if (app) { |
2834 | 209 | app->clear(); |
2835 | 209 | } |
2836 | 209 | return; |
2837 | 209 | } |
2838 | 332 | if (host || port) { |
2839 | 332 | ParseRtmpHostAndPort(rtmp_url.substr(0, slash1_pos), host, port); |
2840 | 332 | } |
2841 | | // Remove duplicated slashes. |
2842 | 788 | for (++slash1_pos; slash1_pos < rtmp_url.size() && |
2843 | 788 | rtmp_url[slash1_pos] == '/'; ++slash1_pos); |
2844 | 332 | rtmp_url.remove_prefix(slash1_pos); |
2845 | 332 | size_t slash2_pos = rtmp_url.find_first_of('/'); |
2846 | 332 | if (slash2_pos == butil::StringPiece::npos) { |
2847 | 266 | return SplitVHostFromApp(rtmp_url, app, vhost); |
2848 | 266 | } |
2849 | 66 | SplitVHostFromApp(rtmp_url.substr(0, slash2_pos), app, vhost); |
2850 | 66 | if (stream_name != NULL) { |
2851 | | // Remove duplicated slashes. |
2852 | 412 | for (++slash2_pos; slash2_pos < rtmp_url.size() && |
2853 | 412 | rtmp_url[slash2_pos] == '/'; ++slash2_pos); |
2854 | 66 | rtmp_url.remove_prefix(slash2_pos); |
2855 | 66 | *stream_name = rtmp_url; |
2856 | 66 | } |
2857 | 66 | } |
2858 | | |
2859 | | std::string MakeRtmpURL(const butil::StringPiece& host, |
2860 | | const butil::StringPiece& port, |
2861 | | const butil::StringPiece& app, |
2862 | 0 | const butil::StringPiece& stream_name) { |
2863 | 0 | std::string result; |
2864 | 0 | result.reserve(15 + host.size() + app.size() + stream_name.size()); |
2865 | 0 | result.append("rtmp://"); |
2866 | 0 | result.append(host.data(), host.size()); |
2867 | 0 | if (!port.empty()) { |
2868 | 0 | result.push_back(':'); |
2869 | 0 | result.append(port.data(), port.size()); |
2870 | 0 | } |
2871 | 0 | if (!app.empty()) { |
2872 | 0 | result.push_back('/'); |
2873 | 0 | result.append(app.data(), app.size()); |
2874 | 0 | } |
2875 | 0 | if (!stream_name.empty()) { |
2876 | 0 | if (app.empty()) { // extra / to notify user that app is empty. |
2877 | 0 | result.push_back('/'); |
2878 | 0 | } |
2879 | 0 | result.push_back('/'); |
2880 | 0 | result.append(stream_name.data(), stream_name.size()); |
2881 | 0 | } |
2882 | 0 | return result; |
2883 | 0 | } |
2884 | | |
2885 | | } // namespace brpc |