/src/suricata7/rust/src/mqtt/parser.rs
Line | Count | Source (jump to first uncovered line) |
1 | | /* Copyright (C) 2020-2022 Open Information Security Foundation |
2 | | * |
3 | | * You can copy, redistribute or modify this Program under the terms of |
4 | | * the GNU General Public License version 2 as published by the Free |
5 | | * Software Foundation. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * version 2 along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
15 | | * 02110-1301, USA. |
16 | | */ |
17 | | |
18 | | // written by Sascha Steinbiss <sascha@steinbiss.name> |
19 | | |
20 | | use crate::common::nom7::bits; |
21 | | use crate::mqtt::mqtt_message::*; |
22 | | use crate::mqtt::mqtt_property::*; |
23 | | use nom7::bits::streaming::take as take_bits; |
24 | | use nom7::bytes::complete::take; |
25 | | use nom7::bytes::streaming::take_while_m_n; |
26 | | use nom7::combinator::{complete, cond, verify}; |
27 | | use nom7::multi::{length_data, many0, many1}; |
28 | | use nom7::number::streaming::*; |
29 | | use nom7::sequence::tuple; |
30 | | use nom7::{Err, IResult, Needed}; |
31 | | use num_traits::FromPrimitive; |
32 | | |
33 | | #[derive(Copy, Clone, Debug)] |
34 | | pub struct FixedHeader { |
35 | | pub message_type: MQTTTypeCode, |
36 | | pub dup_flag: bool, |
37 | | pub qos_level: u8, |
38 | | pub retain: bool, |
39 | | pub remaining_length: u32, |
40 | | } |
41 | | |
42 | | // PARSING HELPERS |
43 | | |
44 | | #[inline] |
45 | 84.6M | fn is_continuation_bit_set(b: u8) -> bool { |
46 | 84.6M | return (b & 128) != 0; |
47 | 84.6M | } |
48 | | |
49 | | #[inline] |
50 | 36.0M | fn convert_varint(continued: Vec<u8>, last: u8) -> u32 { |
51 | 36.0M | let mut multiplier = 1u32; |
52 | 36.0M | let mut value = 0u32; |
53 | 37.1M | for val in &continued { |
54 | 1.06M | value += (val & 127) as u32 * multiplier; |
55 | 1.06M | multiplier *= 128u32; |
56 | 1.06M | } |
57 | 36.0M | value += (last & 127) as u32 * multiplier; |
58 | 36.0M | return value; |
59 | 36.0M | } |
60 | | |
61 | | // DATA TYPES |
62 | | |
63 | | #[inline] |
64 | 1.27M | pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> { |
65 | 1.27M | let (i, content) = length_data(be_u16)(i)?; |
66 | 984k | Ok((i, String::from_utf8_lossy(content).to_string())) |
67 | 1.27M | } |
68 | | |
69 | | #[inline] |
70 | 36.1M | pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> { |
71 | 36.1M | let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?; |
72 | 36.0M | let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?; |
73 | 36.0M | Ok(( |
74 | 36.0M | i, |
75 | 36.0M | convert_varint(continued_part.to_vec(), non_continued_part), |
76 | 36.0M | )) |
77 | 36.1M | } |
78 | | |
79 | | #[inline] |
80 | 147k | pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec<u8>> { |
81 | 147k | let (i, data) = length_data(be_u16)(i)?; |
82 | 146k | Ok((i, data.to_vec())) |
83 | 147k | } |
84 | | |
85 | | #[inline] |
86 | 935 | pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> { |
87 | 935 | let (i, name) = parse_mqtt_string(i)?; |
88 | 922 | let (i, value) = parse_mqtt_string(i)?; |
89 | 905 | Ok((i, (name, value))) |
90 | 935 | } |
91 | | |
92 | | // MESSAGE COMPONENTS |
93 | | |
94 | | #[inline] |
95 | 4.78M | fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> { |
96 | 4.78M | let (i, identifier) = parse_mqtt_variable_integer(i)?; |
97 | 4.78M | let (i, value) = parse_qualified_property(i, identifier)?; |
98 | 4.78M | Ok((i, value)) |
99 | 4.78M | } |
100 | | |
101 | | #[inline] |
102 | 1.64M | fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQTTProperty>>> { |
103 | 1.64M | // do not try to parse anything when precondition is not met |
104 | 1.64M | if !precond { |
105 | 1.57M | return Ok((input, None)); |
106 | 71.5k | } |
107 | 71.5k | // parse properties length |
108 | 71.5k | match parse_mqtt_variable_integer(input) { |
109 | 71.4k | Ok((rem, proplen)) => { |
110 | 71.4k | if proplen == 0 { |
111 | | // no properties |
112 | 26.7k | return Ok((rem, None)); |
113 | 44.7k | } |
114 | 44.7k | // parse properties |
115 | 44.7k | let mut props = Vec::<MQTTProperty>::new(); |
116 | 44.7k | let (rem, mut newrem) = take(proplen as usize)(rem)?; |
117 | 4.82M | while !newrem.is_empty() { |
118 | 4.78M | match parse_property(newrem) { |
119 | 4.78M | Ok((rem2, val)) => { |
120 | 4.78M | props.push(val); |
121 | 4.78M | newrem = rem2; |
122 | 4.78M | } |
123 | 679 | Err(e) => return Err(e), |
124 | | } |
125 | | } |
126 | 43.9k | return Ok((rem, Some(props))); |
127 | | } |
128 | 38 | Err(e) => return Err(e), |
129 | | } |
130 | 1.64M | } |
131 | | |
132 | | #[inline] |
133 | 31.2M | fn parse_fixed_header_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8)> { |
134 | 31.2M | bits(tuple(( |
135 | 31.2M | take_bits(4u8), |
136 | 31.2M | take_bits(1u8), |
137 | 31.2M | take_bits(2u8), |
138 | 31.2M | take_bits(1u8), |
139 | 31.2M | )))(i) |
140 | 31.2M | } |
141 | | |
142 | | #[inline] |
143 | 31.2M | fn parse_message_type(code: u8) -> MQTTTypeCode { |
144 | 31.2M | match code { |
145 | 31.2M | 0..=15 => { |
146 | 31.2M | if let Some(t) = FromPrimitive::from_u8(code) { |
147 | 31.2M | return t; |
148 | | } else { |
149 | 0 | return MQTTTypeCode::UNASSIGNED; |
150 | | } |
151 | | } |
152 | | _ => { |
153 | | // unreachable state in parser: we only pass values parsed from take_bits!(4u8) |
154 | 0 | debug_validate_fail!("can't have message codes >15 from 4 bits"); |
155 | 0 | MQTTTypeCode::UNASSIGNED |
156 | | } |
157 | | } |
158 | 31.2M | } |
159 | | |
160 | | #[inline] |
161 | 31.2M | pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> { |
162 | 31.2M | let (i, flags) = parse_fixed_header_flags(i)?; |
163 | 31.2M | let (i, remaining_length) = parse_mqtt_variable_integer(i)?; |
164 | 31.2M | Ok(( |
165 | 31.2M | i, |
166 | 31.2M | FixedHeader { |
167 | 31.2M | message_type: parse_message_type(flags.0), |
168 | 31.2M | dup_flag: flags.1 != 0, |
169 | 31.2M | qos_level: flags.2, |
170 | 31.2M | retain: flags.3 != 0, |
171 | 31.2M | remaining_length, |
172 | 31.2M | }, |
173 | 31.2M | )) |
174 | 31.2M | } |
175 | | |
176 | | #[inline] |
177 | | #[allow(clippy::type_complexity)] |
178 | 41.4k | fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> { |
179 | 41.4k | bits(tuple(( |
180 | 41.4k | take_bits(1u8), |
181 | 41.4k | take_bits(1u8), |
182 | 41.4k | take_bits(1u8), |
183 | 41.4k | take_bits(2u8), |
184 | 41.4k | take_bits(1u8), |
185 | 41.4k | take_bits(1u8), |
186 | 41.4k | take_bits(1u8), |
187 | 41.4k | )))(i) |
188 | 41.4k | } |
189 | | |
190 | | #[inline] |
191 | 41.5k | fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> { |
192 | 41.5k | let (i, protocol_string) = parse_mqtt_string(i)?; |
193 | 41.4k | let (i, protocol_version) = be_u8(i)?; |
194 | 41.4k | let (i, flags) = parse_connect_variable_flags(i)?; |
195 | 41.4k | let (i, keepalive) = be_u16(i)?; |
196 | 41.4k | let (i, properties) = parse_properties(i, protocol_version == 5)?; |
197 | 41.4k | let (i, client_id) = parse_mqtt_string(i)?; |
198 | 41.3k | let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?; |
199 | 41.3k | let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?; |
200 | 41.3k | let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?; |
201 | 41.2k | let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?; |
202 | 41.2k | let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?; |
203 | 41.2k | Ok(( |
204 | 41.2k | i, |
205 | 41.2k | MQTTConnectData { |
206 | 41.2k | protocol_string, |
207 | 41.2k | protocol_version, |
208 | 41.2k | username_flag: flags.0 != 0, |
209 | 41.2k | password_flag: flags.1 != 0, |
210 | 41.2k | will_retain: flags.2 != 0, |
211 | 41.2k | will_qos: flags.3, |
212 | 41.2k | will_flag: flags.4 != 0, |
213 | 41.2k | clean_session: flags.5 != 0, |
214 | 41.2k | keepalive, |
215 | 41.2k | client_id, |
216 | 41.2k | will_topic, |
217 | 41.2k | will_message, |
218 | 41.2k | username, |
219 | 41.2k | password, |
220 | 41.2k | properties, |
221 | 41.2k | will_properties, |
222 | 41.2k | }, |
223 | 41.2k | )) |
224 | 41.5k | } |
225 | | |
226 | | #[inline] |
227 | 314k | fn parse_connack(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTConnackData> { |
228 | 314k | move |i: &[u8]| { |
229 | 314k | let (i, topic_name_compression_response) = be_u8(i)?; |
230 | 313k | let (i, return_code) = be_u8(i)?; |
231 | 313k | let (i, properties) = parse_properties(i, protocol_version == 5)?; |
232 | 313k | Ok(( |
233 | 313k | i, |
234 | 313k | MQTTConnackData { |
235 | 313k | session_present: (topic_name_compression_response & 1) != 0, |
236 | 313k | return_code, |
237 | 313k | properties, |
238 | 313k | }, |
239 | 313k | )) |
240 | 314k | } |
241 | 314k | } |
242 | | |
243 | | #[inline] |
244 | 112k | fn parse_publish( |
245 | 112k | protocol_version: u8, |
246 | 112k | has_id: bool, |
247 | 112k | ) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTPublishData> { |
248 | 112k | move |i: &[u8]| { |
249 | 112k | let (i, topic) = parse_mqtt_string(i)?; |
250 | 112k | let (i, message_id) = cond(has_id, be_u16)(i)?; |
251 | 112k | let (message, properties) = parse_properties(i, protocol_version == 5)?; |
252 | 112k | Ok(( |
253 | 112k | i, |
254 | 112k | MQTTPublishData { |
255 | 112k | topic, |
256 | 112k | message_id, |
257 | 112k | message: message.to_vec(), |
258 | 112k | properties, |
259 | 112k | }, |
260 | 112k | )) |
261 | 112k | } |
262 | 112k | } |
263 | | |
264 | | #[inline] |
265 | 1.99M | fn parse_msgidonly(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTMessageIdOnly> { |
266 | 1.99M | move |input: &[u8]| { |
267 | 1.99M | if protocol_version < 5 { |
268 | | // before v5 we don't even have to care about reason codes |
269 | | // and properties, lucky us |
270 | 1.96M | return parse_msgidonly_v3(input); |
271 | 22.9k | } |
272 | 22.9k | let remaining_len = input.len(); |
273 | 22.9k | match be_u16(input) { |
274 | 22.8k | Ok((rem, message_id)) => { |
275 | 22.8k | if remaining_len == 2 { |
276 | | // from the spec: " The Reason Code and Property Length can be |
277 | | // omitted if the Reason Code is 0x00 (Success) and there are |
278 | | // no Properties. In this case the message has a Remaining |
279 | | // Length of 2." |
280 | 12.2k | return Ok(( |
281 | 12.2k | rem, |
282 | 12.2k | MQTTMessageIdOnly { |
283 | 12.2k | message_id, |
284 | 12.2k | reason_code: Some(0), |
285 | 12.2k | properties: None, |
286 | 12.2k | }, |
287 | 12.2k | )); |
288 | 10.6k | } |
289 | 10.6k | match be_u8(rem) { |
290 | 10.6k | Ok((rem, reason_code)) => { |
291 | 10.6k | // We are checking for 3 because in that case we have a |
292 | 10.6k | // header plus reason code, but no properties. |
293 | 10.6k | if remaining_len == 3 { |
294 | | // no properties |
295 | 929 | return Ok(( |
296 | 929 | rem, |
297 | 929 | MQTTMessageIdOnly { |
298 | 929 | message_id, |
299 | 929 | reason_code: Some(reason_code), |
300 | 929 | properties: None, |
301 | 929 | }, |
302 | 929 | )); |
303 | 9.68k | } |
304 | 9.68k | match parse_properties(rem, true) { |
305 | 9.57k | Ok((rem, properties)) => { |
306 | 9.57k | return Ok(( |
307 | 9.57k | rem, |
308 | 9.57k | MQTTMessageIdOnly { |
309 | 9.57k | message_id, |
310 | 9.57k | reason_code: Some(reason_code), |
311 | 9.57k | properties, |
312 | 9.57k | }, |
313 | 9.57k | )); |
314 | | } |
315 | 102 | Err(e) => return Err(e), |
316 | | } |
317 | | } |
318 | 0 | Err(e) => return Err(e), |
319 | | } |
320 | | } |
321 | 36 | Err(e) => return Err(e), |
322 | | } |
323 | 1.99M | } |
324 | 1.99M | } |
325 | | |
326 | | #[inline] |
327 | 1.96M | fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> { |
328 | 1.96M | let (i, message_id) = be_u16(i)?; |
329 | 1.96M | Ok(( |
330 | 1.96M | i, |
331 | 1.96M | MQTTMessageIdOnly { |
332 | 1.96M | message_id, |
333 | 1.96M | reason_code: None, |
334 | 1.96M | properties: None, |
335 | 1.96M | }, |
336 | 1.96M | )) |
337 | 1.96M | } |
338 | | |
339 | | #[inline] |
340 | 388k | fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> { |
341 | 388k | let (i, topic_name) = parse_mqtt_string(i)?; |
342 | 343k | let (i, qos) = be_u8(i)?; |
343 | 302k | Ok((i, MQTTSubscribeTopicData { topic_name, qos })) |
344 | 388k | } |
345 | | |
346 | | #[inline] |
347 | 85.9k | fn parse_subscribe(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubscribeData> { |
348 | 85.9k | move |i: &[u8]| { |
349 | 85.9k | let (i, message_id) = be_u16(i)?; |
350 | 85.9k | let (i, properties) = parse_properties(i, protocol_version == 5)?; |
351 | 85.9k | let (i, topics) = many1(complete(parse_subscribe_topic))(i)?; |
352 | 85.8k | Ok(( |
353 | 85.8k | i, |
354 | 85.8k | MQTTSubscribeData { |
355 | 85.8k | message_id, |
356 | 85.8k | topics, |
357 | 85.8k | properties, |
358 | 85.8k | }, |
359 | 85.8k | )) |
360 | 85.9k | } |
361 | 85.9k | } |
362 | | |
363 | | #[inline] |
364 | 332k | fn parse_suback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubackData> { |
365 | 332k | move |i: &[u8]| { |
366 | 332k | let (i, message_id) = be_u16(i)?; |
367 | 332k | let (qoss, properties) = parse_properties(i, protocol_version == 5)?; |
368 | 332k | Ok(( |
369 | 332k | i, |
370 | 332k | MQTTSubackData { |
371 | 332k | message_id, |
372 | 332k | qoss: qoss.to_vec(), |
373 | 332k | properties, |
374 | 332k | }, |
375 | 332k | )) |
376 | 332k | } |
377 | 332k | } |
378 | | |
379 | | #[inline] |
380 | 249k | fn parse_unsubscribe( |
381 | 249k | protocol_version: u8, |
382 | 249k | ) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubscribeData> { |
383 | 249k | move |i: &[u8]| { |
384 | 249k | let (i, message_id) = be_u16(i)?; |
385 | 249k | let (i, properties) = parse_properties(i, protocol_version == 5)?; |
386 | 249k | let (i, topics) = many0(complete(parse_mqtt_string))(i)?; |
387 | 249k | Ok(( |
388 | 249k | i, |
389 | 249k | MQTTUnsubscribeData { |
390 | 249k | message_id, |
391 | 249k | topics, |
392 | 249k | properties, |
393 | 249k | }, |
394 | 249k | )) |
395 | 249k | } |
396 | 249k | } |
397 | | |
398 | | #[inline] |
399 | 420k | fn parse_unsuback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubackData> { |
400 | 420k | move |i: &[u8]| { |
401 | 420k | let (i, message_id) = be_u16(i)?; |
402 | 420k | let (i, properties) = parse_properties(i, protocol_version == 5)?; |
403 | 420k | let (i, reason_codes) = many0(complete(be_u8))(i)?; |
404 | 420k | Ok(( |
405 | 420k | i, |
406 | 420k | MQTTUnsubackData { |
407 | 420k | message_id, |
408 | 420k | properties, |
409 | 420k | reason_codes: Some(reason_codes), |
410 | 420k | }, |
411 | 420k | )) |
412 | 420k | } |
413 | 420k | } |
414 | | |
415 | | #[inline] |
416 | 1.11M | fn parse_disconnect( |
417 | 1.11M | remaining_len: usize, |
418 | 1.11M | protocol_version: u8, |
419 | 1.11M | ) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTDisconnectData> { |
420 | 1.11M | move |input: &[u8]| { |
421 | 1.11M | if protocol_version < 5 { |
422 | 940k | return Ok(( |
423 | 940k | input, |
424 | 940k | MQTTDisconnectData { |
425 | 940k | reason_code: None, |
426 | 940k | properties: None, |
427 | 940k | }, |
428 | 940k | )); |
429 | 171k | } |
430 | 171k | if remaining_len == 0 { |
431 | | // The Reason Code and Property Length can be omitted if the Reason |
432 | | // Code is 0x00 (Normal disconnection) and there are no Properties. |
433 | | // In this case the DISCONNECT has a Remaining Length of 0. |
434 | 100k | return Ok(( |
435 | 100k | input, |
436 | 100k | MQTTDisconnectData { |
437 | 100k | reason_code: Some(0), |
438 | 100k | properties: None, |
439 | 100k | }, |
440 | 100k | )); |
441 | 70.1k | } |
442 | 70.1k | match be_u8(input) { |
443 | 70.1k | Ok((rem, reason_code)) => { |
444 | 70.1k | // We are checking for 1 because in that case we have a |
445 | 70.1k | // header plus reason code, but no properties. |
446 | 70.1k | if remaining_len == 1 { |
447 | | // no properties |
448 | 69.2k | return Ok(( |
449 | 69.2k | rem, |
450 | 69.2k | MQTTDisconnectData { |
451 | 69.2k | reason_code: Some(0), |
452 | 69.2k | properties: None, |
453 | 69.2k | }, |
454 | 69.2k | )); |
455 | 962 | } |
456 | 962 | match parse_properties(rem, true) { |
457 | 949 | Ok((rem, properties)) => { |
458 | 949 | return Ok(( |
459 | 949 | rem, |
460 | 949 | MQTTDisconnectData { |
461 | 949 | reason_code: Some(reason_code), |
462 | 949 | properties, |
463 | 949 | }, |
464 | 949 | )); |
465 | | } |
466 | 13 | Err(e) => return Err(e), |
467 | | } |
468 | | } |
469 | 0 | Err(e) => return Err(e), |
470 | | } |
471 | 1.11M | } |
472 | 1.11M | } |
473 | | |
474 | | #[inline] |
475 | 35.9k | fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> { |
476 | 35.9k | let (i, reason_code) = be_u8(i)?; |
477 | 35.9k | let (i, properties) = parse_properties(i, true)?; |
478 | 35.2k | Ok(( |
479 | 35.2k | i, |
480 | 35.2k | MQTTAuthData { |
481 | 35.2k | reason_code, |
482 | 35.2k | properties, |
483 | 35.2k | }, |
484 | 35.2k | )) |
485 | 35.9k | } |
486 | | |
487 | | #[inline] |
488 | 31.0M | fn parse_remaining_message<'a>( |
489 | 31.0M | full: &'a [u8], |
490 | 31.0M | len: usize, |
491 | 31.0M | skiplen: usize, |
492 | 31.0M | header: FixedHeader, |
493 | 31.0M | message_type: MQTTTypeCode, |
494 | 31.0M | protocol_version: u8, |
495 | 31.0M | ) -> impl Fn(&'a [u8]) -> IResult<&'a [u8], MQTTMessage> { |
496 | 31.0M | move |input: &'a [u8]| { |
497 | 31.0M | match message_type { |
498 | 41.5k | MQTTTypeCode::CONNECT => match parse_connect(input) { |
499 | 41.2k | Ok((_rem, conn)) => { |
500 | 41.2k | let msg = MQTTMessage { |
501 | 41.2k | header, |
502 | 41.2k | op: MQTTOperation::CONNECT(conn), |
503 | 41.2k | }; |
504 | 41.2k | Ok((&full[skiplen + len..], msg)) |
505 | | } |
506 | 366 | Err(e) => Err(e), |
507 | | }, |
508 | 314k | MQTTTypeCode::CONNACK => match parse_connack(protocol_version)(input) { |
509 | 313k | Ok((_rem, connack)) => { |
510 | 313k | let msg = MQTTMessage { |
511 | 313k | header, |
512 | 313k | op: MQTTOperation::CONNACK(connack), |
513 | 313k | }; |
514 | 313k | Ok((&full[skiplen + len..], msg)) |
515 | | } |
516 | 92 | Err(e) => Err(e), |
517 | | }, |
518 | | MQTTTypeCode::PUBLISH => { |
519 | 112k | match parse_publish(protocol_version, header.qos_level > 0)(input) { |
520 | 112k | Ok((_rem, publish)) => { |
521 | 112k | let msg = MQTTMessage { |
522 | 112k | header, |
523 | 112k | op: MQTTOperation::PUBLISH(publish), |
524 | 112k | }; |
525 | 112k | Ok((&full[skiplen + len..], msg)) |
526 | | } |
527 | 179 | Err(e) => Err(e), |
528 | | } |
529 | | } |
530 | | MQTTTypeCode::PUBACK |
531 | | | MQTTTypeCode::PUBREC |
532 | | | MQTTTypeCode::PUBREL |
533 | 1.99M | | MQTTTypeCode::PUBCOMP => match parse_msgidonly(protocol_version)(input) { |
534 | 1.99M | Ok((_rem, msgidonly)) => { |
535 | 1.99M | let msg = MQTTMessage { |
536 | 1.99M | header, |
537 | 1.99M | op: match message_type { |
538 | 120k | MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly), |
539 | 1.50M | MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly), |
540 | 201k | MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly), |
541 | 162k | MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly), |
542 | 0 | _ => MQTTOperation::UNASSIGNED, |
543 | | }, |
544 | | }; |
545 | 1.99M | Ok((&full[skiplen + len..], msg)) |
546 | | } |
547 | 269 | Err(e) => Err(e), |
548 | | }, |
549 | 85.9k | MQTTTypeCode::SUBSCRIBE => match parse_subscribe(protocol_version)(input) { |
550 | 85.8k | Ok((_rem, subs)) => { |
551 | 85.8k | let msg = MQTTMessage { |
552 | 85.8k | header, |
553 | 85.8k | op: MQTTOperation::SUBSCRIBE(subs), |
554 | 85.8k | }; |
555 | 85.8k | Ok((&full[skiplen + len..], msg)) |
556 | | } |
557 | 129 | Err(e) => Err(e), |
558 | | }, |
559 | 332k | MQTTTypeCode::SUBACK => match parse_suback(protocol_version)(input) { |
560 | 332k | Ok((_rem, suback)) => { |
561 | 332k | let msg = MQTTMessage { |
562 | 332k | header, |
563 | 332k | op: MQTTOperation::SUBACK(suback), |
564 | 332k | }; |
565 | 332k | Ok((&full[skiplen + len..], msg)) |
566 | | } |
567 | 25 | Err(e) => Err(e), |
568 | | }, |
569 | 249k | MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(protocol_version)(input) { |
570 | 249k | Ok((_rem, unsub)) => { |
571 | 249k | let msg = MQTTMessage { |
572 | 249k | header, |
573 | 249k | op: MQTTOperation::UNSUBSCRIBE(unsub), |
574 | 249k | }; |
575 | 249k | Ok((&full[skiplen + len..], msg)) |
576 | | } |
577 | 25 | Err(e) => Err(e), |
578 | | }, |
579 | 420k | MQTTTypeCode::UNSUBACK => match parse_unsuback(protocol_version)(input) { |
580 | 420k | Ok((_rem, unsuback)) => { |
581 | 420k | let msg = MQTTMessage { |
582 | 420k | header, |
583 | 420k | op: MQTTOperation::UNSUBACK(unsuback), |
584 | 420k | }; |
585 | 420k | Ok((&full[skiplen + len..], msg)) |
586 | | } |
587 | 23 | Err(e) => Err(e), |
588 | | }, |
589 | | MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => { |
590 | 1.14M | let msg = MQTTMessage { |
591 | 1.14M | header, |
592 | 1.14M | op: match message_type { |
593 | 1.09M | MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ, |
594 | 53.1k | MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP, |
595 | 0 | _ => MQTTOperation::UNASSIGNED, |
596 | | }, |
597 | | }; |
598 | 1.14M | Ok((&full[skiplen + len..], msg)) |
599 | | } |
600 | 1.11M | MQTTTypeCode::DISCONNECT => match parse_disconnect(len, protocol_version)(input) { |
601 | 1.11M | Ok((_rem, disco)) => { |
602 | 1.11M | let msg = MQTTMessage { |
603 | 1.11M | header, |
604 | 1.11M | op: MQTTOperation::DISCONNECT(disco), |
605 | 1.11M | }; |
606 | 1.11M | Ok((&full[skiplen + len..], msg)) |
607 | | } |
608 | 13 | Err(e) => Err(e), |
609 | | }, |
610 | 35.9k | MQTTTypeCode::AUTH => match parse_auth(input) { |
611 | 35.2k | Ok((_rem, auth)) => { |
612 | 35.2k | let msg = MQTTMessage { |
613 | 35.2k | header, |
614 | 35.2k | op: MQTTOperation::AUTH(auth), |
615 | 35.2k | }; |
616 | 35.2k | Ok((&full[skiplen + len..], msg)) |
617 | | } |
618 | 680 | Err(e) => Err(e), |
619 | | }, |
620 | | // Unassigned message type code. Unlikely to happen with |
621 | | // regular traffic, might be an indication for broken or |
622 | | // crafted MQTT traffic. |
623 | | _ => { |
624 | 25.1M | let msg = MQTTMessage { |
625 | 25.1M | header, |
626 | 25.1M | op: MQTTOperation::UNASSIGNED, |
627 | 25.1M | }; |
628 | 25.1M | return Ok((&full[skiplen + len..], msg)); |
629 | | } |
630 | | } |
631 | 31.0M | } |
632 | 31.0M | } |
633 | | |
634 | 31.2M | pub fn parse_message( |
635 | 31.2M | input: &[u8], |
636 | 31.2M | protocol_version: u8, |
637 | 31.2M | max_msg_size: usize, |
638 | 31.2M | ) -> IResult<&[u8], MQTTMessage> { |
639 | 31.2M | // Parse the fixed header first. This is identical across versions and can |
640 | 31.2M | // be between 2 and 5 bytes long. |
641 | 31.2M | match parse_fixed_header(input) { |
642 | 31.1M | Ok((fullrem, header)) => { |
643 | 31.1M | let len = header.remaining_length as usize; |
644 | 31.1M | // This is the length of the fixed header that we need to skip |
645 | 31.1M | // before returning the remainder. It is the sum of the length |
646 | 31.1M | // of the flag byte (1) and the length of the message length |
647 | 31.1M | // varint. |
648 | 31.1M | let skiplen = input.len() - fullrem.len(); |
649 | 31.1M | let message_type = header.message_type; |
650 | 31.1M | |
651 | 31.1M | // If the remaining length (message length) exceeds the specified |
652 | 31.1M | // limit, we return a special truncation message type, containing |
653 | 31.1M | // no parsed metadata but just the skipped length and the message |
654 | 31.1M | // type. |
655 | 31.1M | if len > max_msg_size { |
656 | 599 | let msg = MQTTMessage { |
657 | 599 | header, |
658 | 599 | op: MQTTOperation::TRUNCATED(MQTTTruncatedData { |
659 | 599 | original_message_type: message_type, |
660 | 599 | skipped_length: len + skiplen, |
661 | 599 | }), |
662 | 599 | }; |
663 | 599 | // In this case we return the full input buffer, since this is |
664 | 599 | // what the skipped_length value also refers to: header _and_ |
665 | 599 | // remaining length. |
666 | 599 | return Ok((input, msg)); |
667 | 31.1M | } |
668 | 31.1M | |
669 | 31.1M | // We have not exceeded the maximum length limit, but still do not |
670 | 31.1M | // have enough data in the input buffer to handle the full |
671 | 31.1M | // message. Signal this by returning an Incomplete IResult value. |
672 | 31.1M | if fullrem.len() < len { |
673 | 157k | return Err(Err::Incomplete(Needed::new(len - fullrem.len()))); |
674 | 31.0M | } |
675 | 31.0M | |
676 | 31.0M | // Parse the contents of the buffer into a single message. |
677 | 31.0M | // We reslice the remainder into the portion that we are interested |
678 | 31.0M | // in, according to the length we just parsed. This helps with the |
679 | 31.0M | // complete() parsers, where we would otherwise need to keep track |
680 | 31.0M | // of the already parsed length. |
681 | 31.0M | let rem = &fullrem[..len]; |
682 | 31.0M | |
683 | 31.0M | // Parse remaining message in buffer. We use complete() to ensure |
684 | 31.0M | // we do not request additional content in case of incomplete |
685 | 31.0M | // parsing, but raise an error instead as we should have all the |
686 | 31.0M | // data asked for in the header. |
687 | 31.0M | return complete(parse_remaining_message( |
688 | 31.0M | input, |
689 | 31.0M | len, |
690 | 31.0M | skiplen, |
691 | 31.0M | header, |
692 | 31.0M | message_type, |
693 | 31.0M | protocol_version, |
694 | 31.0M | ))(rem); |
695 | | } |
696 | 21.4k | Err(err) => { |
697 | 21.4k | return Err(err); |
698 | | } |
699 | | } |
700 | 31.2M | } |
701 | | |
702 | | #[cfg(test)] |
703 | | mod tests { |
704 | | use super::*; |
705 | | use nom7::error::ErrorKind; |
706 | | |
707 | | fn test_mqtt_parse_variable_fail(buf0: &[u8]) { |
708 | | let r0 = parse_mqtt_variable_integer(buf0); |
709 | | match r0 { |
710 | | Ok((_, _)) => { |
711 | | panic!("Result should not have been ok."); |
712 | | } |
713 | | Err(Err::Error(err)) => { |
714 | | assert_eq!(err.code, ErrorKind::Verify); |
715 | | } |
716 | | _ => { |
717 | | panic!("Result should be an error."); |
718 | | } |
719 | | } |
720 | | } |
721 | | |
722 | | fn test_mqtt_parse_variable_check(buf0: &[u8], expected: u32) { |
723 | | let r0 = parse_mqtt_variable_integer(buf0); |
724 | | match r0 { |
725 | | Ok((_, val)) => { |
726 | | assert_eq!(val, expected); |
727 | | } |
728 | | Err(_) => { |
729 | | panic!("Result should have been ok."); |
730 | | } |
731 | | } |
732 | | } |
733 | | |
734 | | #[test] |
735 | | fn test_mqtt_parse_variable_integer_largest_input() { |
736 | | test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0xFF]); |
737 | | } |
738 | | |
739 | | #[test] |
740 | | fn test_mqtt_parse_variable_integer_boundary() { |
741 | | test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0x80]); |
742 | | } |
743 | | |
744 | | #[test] |
745 | | fn test_mqtt_parse_variable_integer_largest_valid() { |
746 | | test_mqtt_parse_variable_check(&[0xFF, 0xFF, 0xFF, 0x7F], 268435455); |
747 | | } |
748 | | |
749 | | #[test] |
750 | | fn test_mqtt_parse_variable_integer_smallest_valid() { |
751 | | test_mqtt_parse_variable_check(&[0x0], 0); |
752 | | } |
753 | | |
754 | | #[test] |
755 | | fn test_parse_fixed_header() { |
756 | | let buf = [ |
757 | | 0x30, /* Header Flags: 0x30, Message Type: Publish Message, QoS Level: At most once delivery (Fire and Forget) */ |
758 | | 0xb7, 0x97, 0x02, /* Msg Len: 35767 */ |
759 | | 0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
760 | | 0x00, 0x01, 0xa0, |
761 | | ]; |
762 | | |
763 | | let result = parse_fixed_header(&buf); |
764 | | match result { |
765 | | Ok((remainder, message)) => { |
766 | | assert_eq!(message.message_type, MQTTTypeCode::PUBLISH); |
767 | | assert!(!message.dup_flag); |
768 | | assert_eq!(message.qos_level, 0); |
769 | | assert!(!message.retain); |
770 | | assert_eq!(message.remaining_length, 35767); |
771 | | assert_eq!(remainder.len(), 17); |
772 | | } |
773 | | Err(Err::Incomplete(_)) => { |
774 | | panic!("Result should not have been incomplete."); |
775 | | } |
776 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
777 | | panic!("Result should not be an error: {:?}.", err); |
778 | | } |
779 | | } |
780 | | } |
781 | | |
782 | | #[test] |
783 | | fn test_parse_properties() { |
784 | | let buf = [ |
785 | | 0x03, 0x21, 0x00, 0x14, /* Properties */ |
786 | | 0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
787 | | 0x00, 0x01, 0xa0, |
788 | | ]; |
789 | | |
790 | | let result = parse_properties(&buf, true); |
791 | | match result { |
792 | | Ok((remainder, message)) => { |
793 | | let res = message.unwrap(); |
794 | | assert_eq!(res[0], MQTTProperty::RECEIVE_MAXIMUM(20)); |
795 | | assert_eq!(remainder.len(), 17); |
796 | | } |
797 | | Err(Err::Incomplete(_)) => { |
798 | | panic!("Result should not have been incomplete."); |
799 | | } |
800 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
801 | | panic!("Result should not be an error: {:?}.", err); |
802 | | } |
803 | | } |
804 | | } |
805 | | #[test] |
806 | | fn test_parse_connect() { |
807 | | let buf = [ |
808 | | 0x00, 0x04, /* Protocol Name Length: 4 */ |
809 | | 0x4d, 0x51, 0x54, 0x54, /* Protocol Name: MQTT */ |
810 | | 0x05, /* Version: MQTT v5.0 (5) */ |
811 | | 0xc2, /*Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */ |
812 | | 0x00, 0x3c, /* Keep Alive: 60 */ |
813 | | 0x03, 0x21, 0x00, 0x14, /* Properties */ |
814 | | 0x00, 0x00, /* Client ID Length: 0 */ |
815 | | 0x00, 0x04, /* User Name Length: 4 */ |
816 | | 0x75, 0x73, 0x65, 0x72, /* User Name: user */ |
817 | | 0x00, 0x04, /* Password Length: 4 */ |
818 | | 0x71, 0x61, 0x71, 0x73, /* Password: pass */ |
819 | | ]; |
820 | | |
821 | | let result = parse_connect(&buf); |
822 | | match result { |
823 | | Ok((remainder, message)) => { |
824 | | assert_eq!(message.protocol_string, "MQTT"); |
825 | | assert_eq!(message.protocol_version, 5); |
826 | | assert!(message.username_flag); |
827 | | assert!(message.password_flag); |
828 | | assert!(!message.will_retain); |
829 | | assert_eq!(message.will_qos, 0); |
830 | | assert!(!message.will_flag); |
831 | | assert!(message.clean_session); |
832 | | assert_eq!(message.keepalive, 60); |
833 | | assert_eq!(remainder.len(), 0); |
834 | | } |
835 | | Err(Err::Incomplete(_)) => { |
836 | | panic!("Result should not have been incomplete."); |
837 | | } |
838 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
839 | | panic!("Result should not be an error: {:?}.", err); |
840 | | } |
841 | | } |
842 | | } |
843 | | |
844 | | #[test] |
845 | | fn test_parse_connack() { |
846 | | let buf = [ |
847 | | 0x00, /* Acknowledge Flags: 0x00 (0000 000. = Reserved: Not set )(.... ...0 = Session Present: Not set) */ |
848 | | 0x00, /* Reason Code: Success (0) */ |
849 | | 0x2f, /* Total Length: 47 */ |
850 | | 0x22, /* ID: Topic Alias Maximum (0x22) */ |
851 | | 0x00, 0x0a, /* Value: 10 */ |
852 | | 0x12, /* ID: Assigned Client Identifier (0x12) */ |
853 | | 0x00, 0x29, /* Length: 41 */ |
854 | | 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x31, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, 0x30, 0x2d, |
855 | | 0x30, 0x38, 0x45, 0x33, 0x2d, 0x33, 0x42, 0x41, 0x31, 0x2d, 0x32, 0x45, 0x39, 0x37, |
856 | | 0x2d, 0x45, 0x39, 0x41, 0x30, 0x42, 0x34, 0x30, 0x36, 0x34, 0x42, 0x46, |
857 | | 0x35, /* 41 byte Value: auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5 */ |
858 | | ]; |
859 | | let client_identifier = "auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5"; |
860 | | |
861 | | let result = parse_connack(5); |
862 | | let input = result(&buf); |
863 | | match input { |
864 | | Ok((remainder, message)) => { |
865 | | let props = message.properties.unwrap(); |
866 | | assert_eq!(props[0], MQTTProperty::TOPIC_ALIAS_MAXIMUM(10)); |
867 | | assert_eq!( |
868 | | props[1], |
869 | | MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(client_identifier.to_string()) |
870 | | ); |
871 | | assert_eq!(message.return_code, 0); |
872 | | assert!(!message.session_present); |
873 | | assert_eq!(remainder.len(), 0); |
874 | | } |
875 | | |
876 | | Err(Err::Incomplete(_)) => { |
877 | | panic!("Result should not have been incomplete."); |
878 | | } |
879 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
880 | | panic!("Result should not be an error: {:?}.", err); |
881 | | } |
882 | | } |
883 | | } |
884 | | |
885 | | #[test] |
886 | | fn test_parse_publish() { |
887 | | let buf = [ |
888 | | 0x00, 0x06, /* Topic Length: 6 */ |
889 | | 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */ |
890 | | 0x00, 0x01, /* Message Identifier: 1 */ |
891 | | 0x00, /* Properties 6 */ |
892 | | 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, |
893 | | ]; |
894 | | |
895 | | let result = parse_publish(5, true); |
896 | | let input = result(&buf); |
897 | | match input { |
898 | | Ok((remainder, message)) => { |
899 | | let message_id = message.message_id.unwrap(); |
900 | | assert_eq!(message.topic, "topicX"); |
901 | | assert_eq!(message_id, 1); |
902 | | assert_eq!(remainder.len(), 13); |
903 | | } |
904 | | |
905 | | Err(Err::Incomplete(_)) => { |
906 | | panic!("Result should not have been incomplete."); |
907 | | } |
908 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
909 | | panic!("Result should not be an error: {:?}.", err); |
910 | | } |
911 | | } |
912 | | } |
913 | | |
914 | | #[test] |
915 | | fn test_parse_msgidonly_v3() { |
916 | | let buf = [ |
917 | | 0x00, 0x01, /* Message Identifier: 1 */ |
918 | | 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, |
919 | | 0x33, 0x45, 0x38, 0x30, |
920 | | ]; |
921 | | |
922 | | let result = parse_msgidonly(3); |
923 | | let input = result(&buf); |
924 | | match input { |
925 | | Ok((remainder, message)) => { |
926 | | assert_eq!(message.message_id, 1); |
927 | | assert_eq!(remainder.len(), 18); |
928 | | } |
929 | | |
930 | | Err(Err::Incomplete(_)) => { |
931 | | panic!("Result should not have been incomplete."); |
932 | | } |
933 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
934 | | panic!("Result should not be an error: {:?}.", err); |
935 | | } |
936 | | } |
937 | | } |
938 | | |
939 | | #[test] |
940 | | fn test_parse_msgidonly_v5() { |
941 | | let buf = [ |
942 | | 0x00, 0x01, /* Message Identifier: 1 */ |
943 | | 0x00, /* Reason Code: 0 */ |
944 | | 0x00, /* Properties */ |
945 | | 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, |
946 | | ]; |
947 | | |
948 | | let result = parse_msgidonly(5); |
949 | | let input = result(&buf); |
950 | | match input { |
951 | | Ok((remainder, message)) => { |
952 | | let reason_code = message.reason_code.unwrap(); |
953 | | assert_eq!(message.message_id, 1); |
954 | | assert_eq!(reason_code, 0); |
955 | | assert_eq!(remainder.len(), 12); |
956 | | } |
957 | | |
958 | | Err(Err::Incomplete(_)) => { |
959 | | panic!("Result should not have been incomplete."); |
960 | | } |
961 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
962 | | panic!("Result should not be an error: {:?}.", err); |
963 | | } |
964 | | } |
965 | | } |
966 | | |
967 | | #[test] |
968 | | fn test_parse_subscribe() { |
969 | | let buf = [ |
970 | | 0x00, 0x01, /* Message Identifier: 1 */ |
971 | | 0x00, /* Properties 6 */ |
972 | | 0x00, 0x06, /* Topic Length: 6 */ |
973 | | 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */ |
974 | | 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */ |
975 | | 0x00, 0x06, /* Topic Length: 6 */ |
976 | | 0x74, 0x6f, 0x70, 0x69, 0x63, 0x59, /* Topic: topicY */ |
977 | | 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */ |
978 | | 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, |
979 | | ]; |
980 | | |
981 | | let result = parse_subscribe(5); |
982 | | let input = result(&buf); |
983 | | match input { |
984 | | Ok((remainder, message)) => { |
985 | | assert_eq!(message.topics[0].topic_name, "topicX"); |
986 | | assert_eq!(message.topics[1].topic_name, "topicY"); |
987 | | assert_eq!(message.topics[0].qos, 0); |
988 | | assert_eq!(message.message_id, 1); |
989 | | assert_eq!(remainder.len(), 12); |
990 | | } |
991 | | |
992 | | Err(Err::Incomplete(_)) => { |
993 | | panic!("Result should not have been incomplete."); |
994 | | } |
995 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
996 | | panic!("Result should not be an error: {:?}.", err); |
997 | | } |
998 | | } |
999 | | } |
1000 | | #[test] |
1001 | | fn test_parse_suback() { |
1002 | | let buf = [ |
1003 | | 0x00, 0x01, /* Message Identifier: 1 */ |
1004 | | 0x00, /* Properties 6 */ |
1005 | | 0x00, 0x00, /* Topic Length: 6 */ |
1006 | | ]; |
1007 | | |
1008 | | let result = parse_suback(5); |
1009 | | let input = result(&buf); |
1010 | | match input { |
1011 | | Ok((remainder, message)) => { |
1012 | | assert_eq!(message.qoss[0], 0); |
1013 | | assert_eq!(message.message_id, 1); |
1014 | | assert_eq!(remainder.len(), 3); |
1015 | | } |
1016 | | |
1017 | | Err(Err::Incomplete(_)) => { |
1018 | | panic!("Result should not have been incomplete."); |
1019 | | } |
1020 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
1021 | | panic!("Result should not be an error: {:?}.", err); |
1022 | | } |
1023 | | } |
1024 | | } |
1025 | | #[test] |
1026 | | fn test_parse_unsubscribe() { |
1027 | | let buf = [ |
1028 | | 0x00, 0x01, /* Message Identifier: 1 */ |
1029 | | 0x00, /* Properties 6 */ |
1030 | | 0x00, 0x06, /* Topic Length: 6 */ |
1031 | | 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */ |
1032 | | 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */ |
1033 | | ]; |
1034 | | |
1035 | | let result = parse_unsubscribe(5); |
1036 | | let input = result(&buf); |
1037 | | match input { |
1038 | | Ok((remainder, message)) => { |
1039 | | assert_eq!(message.topics[0], "topicX"); |
1040 | | assert_eq!(message.message_id, 1); |
1041 | | assert_eq!(remainder.len(), 1); |
1042 | | } |
1043 | | |
1044 | | Err(Err::Incomplete(_)) => { |
1045 | | panic!("Result should not have been incomplete."); |
1046 | | } |
1047 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
1048 | | panic!("Result should not be an error: {:?}.", err); |
1049 | | } |
1050 | | } |
1051 | | } |
1052 | | |
1053 | | #[test] |
1054 | | fn test_parse_unsuback() { |
1055 | | let buf = [ |
1056 | | 0x00, 0x01, /* Message Identifier: 1 */ |
1057 | | 0x00, /* Properties 6 */ |
1058 | | 0x00, /* Reason Code */ |
1059 | | ]; |
1060 | | |
1061 | | let result = parse_unsuback(5); |
1062 | | let input = result(&buf); |
1063 | | match input { |
1064 | | Ok((remainder, message)) => { |
1065 | | let reason_codes = message.reason_codes.unwrap(); |
1066 | | assert_eq!(reason_codes[0], 0); |
1067 | | assert_eq!(message.message_id, 1); |
1068 | | assert_eq!(remainder.len(), 0); |
1069 | | } |
1070 | | |
1071 | | Err(Err::Incomplete(_)) => { |
1072 | | panic!("Result should not have been incomplete."); |
1073 | | } |
1074 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
1075 | | panic!("Result should not be an error: {:?}.", err); |
1076 | | } |
1077 | | } |
1078 | | } |
1079 | | |
1080 | | #[test] |
1081 | | fn test_parse_disconnect() { |
1082 | | let buf = [ |
1083 | | 0xe0, /* Reason: 0 */ |
1084 | | 0x00, /* Message Identifier: 1 */ |
1085 | | ]; |
1086 | | |
1087 | | let result = parse_disconnect(0, 5); |
1088 | | let input = result(&buf); |
1089 | | match input { |
1090 | | Ok((remainder, message)) => { |
1091 | | let reason_code = message.reason_code.unwrap(); |
1092 | | assert_eq!(reason_code, 0); |
1093 | | assert_eq!(remainder.len(), 2); |
1094 | | } |
1095 | | |
1096 | | Err(Err::Incomplete(_)) => { |
1097 | | panic!("Result should not have been incomplete."); |
1098 | | } |
1099 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
1100 | | panic!("Result should not be an error: {:?}.", err); |
1101 | | } |
1102 | | } |
1103 | | } |
1104 | | |
1105 | | #[test] |
1106 | | fn test_parse_message() { |
1107 | | let buf = [ |
1108 | | 0x10, /* Message Identifier: 1 */ |
1109 | | 0x2f, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05, |
1110 | | 0xc2, /* Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */ |
1111 | | 0x00, 0x3c, 0x03, 0x21, 0x00, 0x14, /* Properties */ |
1112 | | 0x00, 0x13, 0x6d, 0x79, 0x76, 0x6f, 0x69, 0x63, 0x65, 0x69, 0x73, 0x6d, 0x79, 0x70, |
1113 | | 0x61, 0x73, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x00, 0x04, 0x75, 0x73, 0x65, 0x72, 0x00, |
1114 | | 0x04, 0x70, 0x61, 0x73, 0x73, |
1115 | | ]; |
1116 | | |
1117 | | let result = parse_message(&buf, 5, 40); |
1118 | | match result { |
1119 | | Ok((remainder, message)) => { |
1120 | | assert_eq!(message.header.message_type, MQTTTypeCode::CONNECT); |
1121 | | assert!(!message.header.dup_flag); |
1122 | | assert_eq!(message.header.qos_level, 0); |
1123 | | assert!(!message.header.retain); |
1124 | | assert_eq!(remainder.len(), 49); |
1125 | | } |
1126 | | |
1127 | | Err(Err::Incomplete(_)) => { |
1128 | | panic!("Result should not have been incomplete."); |
1129 | | } |
1130 | | Err(Err::Error(err)) | Err(Err::Failure(err)) => { |
1131 | | panic!("Result should not be an error: {:?}.", err); |
1132 | | } |
1133 | | } |
1134 | | } |
1135 | | } |