Coverage Report

Created: 2025-07-23 07:29

/src/suricata7/rust/src/mqtt/parser.rs
Line
Count
Source (jump to first uncovered line)
1
/* Copyright (C) 2020-2022 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17
18
// written by Sascha Steinbiss <sascha@steinbiss.name>
19
20
use crate::common::nom7::bits;
21
use crate::mqtt::mqtt_message::*;
22
use crate::mqtt::mqtt_property::*;
23
use nom7::bits::streaming::take as take_bits;
24
use nom7::bytes::complete::take;
25
use nom7::bytes::streaming::take_while_m_n;
26
use nom7::combinator::{complete, cond, verify};
27
use nom7::multi::{length_data, many0, many1};
28
use nom7::number::streaming::*;
29
use nom7::sequence::tuple;
30
use nom7::{Err, IResult, Needed};
31
use num_traits::FromPrimitive;
32
33
#[derive(Copy, Clone, Debug)]
34
pub struct FixedHeader {
35
    pub message_type: MQTTTypeCode,
36
    pub dup_flag: bool,
37
    pub qos_level: u8,
38
    pub retain: bool,
39
    pub remaining_length: u32,
40
}
41
42
// PARSING HELPERS
43
44
#[inline]
45
84.6M
fn is_continuation_bit_set(b: u8) -> bool {
46
84.6M
    return (b & 128) != 0;
47
84.6M
}
48
49
#[inline]
50
36.0M
fn convert_varint(continued: Vec<u8>, last: u8) -> u32 {
51
36.0M
    let mut multiplier = 1u32;
52
36.0M
    let mut value = 0u32;
53
37.1M
    for val in &continued {
54
1.06M
        value += (val & 127) as u32 * multiplier;
55
1.06M
        multiplier *= 128u32;
56
1.06M
    }
57
36.0M
    value += (last & 127) as u32 * multiplier;
58
36.0M
    return value;
59
36.0M
}
60
61
// DATA TYPES
62
63
#[inline]
64
1.27M
pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> {
65
1.27M
    let (i, content) = length_data(be_u16)(i)?;
66
984k
    Ok((i, String::from_utf8_lossy(content).to_string()))
67
1.27M
}
68
69
#[inline]
70
36.1M
pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> {
71
36.1M
    let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?;
72
36.0M
    let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?;
73
36.0M
    Ok((
74
36.0M
        i,
75
36.0M
        convert_varint(continued_part.to_vec(), non_continued_part),
76
36.0M
    ))
77
36.1M
}
78
79
#[inline]
80
147k
pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec<u8>> {
81
147k
    let (i, data) = length_data(be_u16)(i)?;
82
146k
    Ok((i, data.to_vec()))
83
147k
}
84
85
#[inline]
86
935
pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> {
87
935
    let (i, name) = parse_mqtt_string(i)?;
88
922
    let (i, value) = parse_mqtt_string(i)?;
89
905
    Ok((i, (name, value)))
90
935
}
91
92
// MESSAGE COMPONENTS
93
94
#[inline]
95
4.78M
fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> {
96
4.78M
    let (i, identifier) = parse_mqtt_variable_integer(i)?;
97
4.78M
    let (i, value) = parse_qualified_property(i, identifier)?;
98
4.78M
    Ok((i, value))
99
4.78M
}
100
101
#[inline]
102
1.64M
fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQTTProperty>>> {
103
1.64M
    // do not try to parse anything when precondition is not met
104
1.64M
    if !precond {
105
1.57M
        return Ok((input, None));
106
71.5k
    }
107
71.5k
    // parse properties length
108
71.5k
    match parse_mqtt_variable_integer(input) {
109
71.4k
        Ok((rem, proplen)) => {
110
71.4k
            if proplen == 0 {
111
                // no properties
112
26.7k
                return Ok((rem, None));
113
44.7k
            }
114
44.7k
            // parse properties
115
44.7k
            let mut props = Vec::<MQTTProperty>::new();
116
44.7k
            let (rem, mut newrem) = take(proplen as usize)(rem)?;
117
4.82M
            while !newrem.is_empty() {
118
4.78M
                match parse_property(newrem) {
119
4.78M
                    Ok((rem2, val)) => {
120
4.78M
                        props.push(val);
121
4.78M
                        newrem = rem2;
122
4.78M
                    }
123
679
                    Err(e) => return Err(e),
124
                }
125
            }
126
43.9k
            return Ok((rem, Some(props)));
127
        }
128
38
        Err(e) => return Err(e),
129
    }
130
1.64M
}
131
132
#[inline]
133
31.2M
fn parse_fixed_header_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8)> {
134
31.2M
    bits(tuple((
135
31.2M
        take_bits(4u8),
136
31.2M
        take_bits(1u8),
137
31.2M
        take_bits(2u8),
138
31.2M
        take_bits(1u8),
139
31.2M
    )))(i)
140
31.2M
}
141
142
#[inline]
143
31.2M
fn parse_message_type(code: u8) -> MQTTTypeCode {
144
31.2M
    match code {
145
31.2M
        0..=15 => {
146
31.2M
            if let Some(t) = FromPrimitive::from_u8(code) {
147
31.2M
                return t;
148
            } else {
149
0
                return MQTTTypeCode::UNASSIGNED;
150
            }
151
        }
152
        _ => {
153
            // unreachable state in parser: we only pass values parsed from take_bits!(4u8)
154
0
            debug_validate_fail!("can't have message codes >15 from 4 bits");
155
0
            MQTTTypeCode::UNASSIGNED
156
        }
157
    }
158
31.2M
}
159
160
#[inline]
161
31.2M
pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> {
162
31.2M
    let (i, flags) = parse_fixed_header_flags(i)?;
163
31.2M
    let (i, remaining_length) = parse_mqtt_variable_integer(i)?;
164
31.2M
    Ok((
165
31.2M
        i,
166
31.2M
        FixedHeader {
167
31.2M
            message_type: parse_message_type(flags.0),
168
31.2M
            dup_flag: flags.1 != 0,
169
31.2M
            qos_level: flags.2,
170
31.2M
            retain: flags.3 != 0,
171
31.2M
            remaining_length,
172
31.2M
        },
173
31.2M
    ))
174
31.2M
}
175
176
#[inline]
177
#[allow(clippy::type_complexity)]
178
41.4k
fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> {
179
41.4k
    bits(tuple((
180
41.4k
        take_bits(1u8),
181
41.4k
        take_bits(1u8),
182
41.4k
        take_bits(1u8),
183
41.4k
        take_bits(2u8),
184
41.4k
        take_bits(1u8),
185
41.4k
        take_bits(1u8),
186
41.4k
        take_bits(1u8),
187
41.4k
    )))(i)
188
41.4k
}
189
190
#[inline]
191
41.5k
fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> {
192
41.5k
    let (i, protocol_string) = parse_mqtt_string(i)?;
193
41.4k
    let (i, protocol_version) = be_u8(i)?;
194
41.4k
    let (i, flags) = parse_connect_variable_flags(i)?;
195
41.4k
    let (i, keepalive) = be_u16(i)?;
196
41.4k
    let (i, properties) = parse_properties(i, protocol_version == 5)?;
197
41.4k
    let (i, client_id) = parse_mqtt_string(i)?;
198
41.3k
    let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?;
199
41.3k
    let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?;
200
41.3k
    let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?;
201
41.2k
    let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?;
202
41.2k
    let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?;
203
41.2k
    Ok((
204
41.2k
        i,
205
41.2k
        MQTTConnectData {
206
41.2k
            protocol_string,
207
41.2k
            protocol_version,
208
41.2k
            username_flag: flags.0 != 0,
209
41.2k
            password_flag: flags.1 != 0,
210
41.2k
            will_retain: flags.2 != 0,
211
41.2k
            will_qos: flags.3,
212
41.2k
            will_flag: flags.4 != 0,
213
41.2k
            clean_session: flags.5 != 0,
214
41.2k
            keepalive,
215
41.2k
            client_id,
216
41.2k
            will_topic,
217
41.2k
            will_message,
218
41.2k
            username,
219
41.2k
            password,
220
41.2k
            properties,
221
41.2k
            will_properties,
222
41.2k
        },
223
41.2k
    ))
224
41.5k
}
225
226
#[inline]
227
314k
fn parse_connack(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTConnackData> {
228
314k
    move |i: &[u8]| {
229
314k
        let (i, topic_name_compression_response) = be_u8(i)?;
230
313k
        let (i, return_code) = be_u8(i)?;
231
313k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
232
313k
        Ok((
233
313k
            i,
234
313k
            MQTTConnackData {
235
313k
                session_present: (topic_name_compression_response & 1) != 0,
236
313k
                return_code,
237
313k
                properties,
238
313k
            },
239
313k
        ))
240
314k
    }
241
314k
}
242
243
#[inline]
244
112k
fn parse_publish(
245
112k
    protocol_version: u8,
246
112k
    has_id: bool,
247
112k
) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTPublishData> {
248
112k
    move |i: &[u8]| {
249
112k
        let (i, topic) = parse_mqtt_string(i)?;
250
112k
        let (i, message_id) = cond(has_id, be_u16)(i)?;
251
112k
        let (message, properties) = parse_properties(i, protocol_version == 5)?;
252
112k
        Ok((
253
112k
            i,
254
112k
            MQTTPublishData {
255
112k
                topic,
256
112k
                message_id,
257
112k
                message: message.to_vec(),
258
112k
                properties,
259
112k
            },
260
112k
        ))
261
112k
    }
262
112k
}
263
264
#[inline]
265
1.99M
fn parse_msgidonly(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
266
1.99M
    move |input: &[u8]| {
267
1.99M
        if protocol_version < 5 {
268
            // before v5 we don't even have to care about reason codes
269
            // and properties, lucky us
270
1.96M
            return parse_msgidonly_v3(input);
271
22.9k
        }
272
22.9k
        let remaining_len = input.len();
273
22.9k
        match be_u16(input) {
274
22.8k
            Ok((rem, message_id)) => {
275
22.8k
                if remaining_len == 2 {
276
                    // from the spec: " The Reason Code and Property Length can be
277
                    // omitted if the Reason Code is 0x00 (Success) and there are
278
                    // no Properties. In this case the message has a Remaining
279
                    // Length of 2."
280
12.2k
                    return Ok((
281
12.2k
                        rem,
282
12.2k
                        MQTTMessageIdOnly {
283
12.2k
                            message_id,
284
12.2k
                            reason_code: Some(0),
285
12.2k
                            properties: None,
286
12.2k
                        },
287
12.2k
                    ));
288
10.6k
                }
289
10.6k
                match be_u8(rem) {
290
10.6k
                    Ok((rem, reason_code)) => {
291
10.6k
                        // We are checking for 3 because in that case we have a
292
10.6k
                        // header plus reason code, but no properties.
293
10.6k
                        if remaining_len == 3 {
294
                            // no properties
295
929
                            return Ok((
296
929
                                rem,
297
929
                                MQTTMessageIdOnly {
298
929
                                    message_id,
299
929
                                    reason_code: Some(reason_code),
300
929
                                    properties: None,
301
929
                                },
302
929
                            ));
303
9.68k
                        }
304
9.68k
                        match parse_properties(rem, true) {
305
9.57k
                            Ok((rem, properties)) => {
306
9.57k
                                return Ok((
307
9.57k
                                    rem,
308
9.57k
                                    MQTTMessageIdOnly {
309
9.57k
                                        message_id,
310
9.57k
                                        reason_code: Some(reason_code),
311
9.57k
                                        properties,
312
9.57k
                                    },
313
9.57k
                                ));
314
                            }
315
102
                            Err(e) => return Err(e),
316
                        }
317
                    }
318
0
                    Err(e) => return Err(e),
319
                }
320
            }
321
36
            Err(e) => return Err(e),
322
        }
323
1.99M
    }
324
1.99M
}
325
326
#[inline]
327
1.96M
fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
328
1.96M
    let (i, message_id) = be_u16(i)?;
329
1.96M
    Ok((
330
1.96M
        i,
331
1.96M
        MQTTMessageIdOnly {
332
1.96M
            message_id,
333
1.96M
            reason_code: None,
334
1.96M
            properties: None,
335
1.96M
        },
336
1.96M
    ))
337
1.96M
}
338
339
#[inline]
340
388k
fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> {
341
388k
    let (i, topic_name) = parse_mqtt_string(i)?;
342
343k
    let (i, qos) = be_u8(i)?;
343
302k
    Ok((i, MQTTSubscribeTopicData { topic_name, qos }))
344
388k
}
345
346
#[inline]
347
85.9k
fn parse_subscribe(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubscribeData> {
348
85.9k
    move |i: &[u8]| {
349
85.9k
        let (i, message_id) = be_u16(i)?;
350
85.9k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
351
85.9k
        let (i, topics) = many1(complete(parse_subscribe_topic))(i)?;
352
85.8k
        Ok((
353
85.8k
            i,
354
85.8k
            MQTTSubscribeData {
355
85.8k
                message_id,
356
85.8k
                topics,
357
85.8k
                properties,
358
85.8k
            },
359
85.8k
        ))
360
85.9k
    }
361
85.9k
}
362
363
#[inline]
364
332k
fn parse_suback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubackData> {
365
332k
    move |i: &[u8]| {
366
332k
        let (i, message_id) = be_u16(i)?;
367
332k
        let (qoss, properties) = parse_properties(i, protocol_version == 5)?;
368
332k
        Ok((
369
332k
            i,
370
332k
            MQTTSubackData {
371
332k
                message_id,
372
332k
                qoss: qoss.to_vec(),
373
332k
                properties,
374
332k
            },
375
332k
        ))
376
332k
    }
377
332k
}
378
379
#[inline]
380
249k
fn parse_unsubscribe(
381
249k
    protocol_version: u8,
382
249k
) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubscribeData> {
383
249k
    move |i: &[u8]| {
384
249k
        let (i, message_id) = be_u16(i)?;
385
249k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
386
249k
        let (i, topics) = many0(complete(parse_mqtt_string))(i)?;
387
249k
        Ok((
388
249k
            i,
389
249k
            MQTTUnsubscribeData {
390
249k
                message_id,
391
249k
                topics,
392
249k
                properties,
393
249k
            },
394
249k
        ))
395
249k
    }
396
249k
}
397
398
#[inline]
399
420k
fn parse_unsuback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubackData> {
400
420k
    move |i: &[u8]| {
401
420k
        let (i, message_id) = be_u16(i)?;
402
420k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
403
420k
        let (i, reason_codes) = many0(complete(be_u8))(i)?;
404
420k
        Ok((
405
420k
            i,
406
420k
            MQTTUnsubackData {
407
420k
                message_id,
408
420k
                properties,
409
420k
                reason_codes: Some(reason_codes),
410
420k
            },
411
420k
        ))
412
420k
    }
413
420k
}
414
415
#[inline]
416
1.11M
fn parse_disconnect(
417
1.11M
    remaining_len: usize,
418
1.11M
    protocol_version: u8,
419
1.11M
) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTDisconnectData> {
420
1.11M
    move |input: &[u8]| {
421
1.11M
        if protocol_version < 5 {
422
940k
            return Ok((
423
940k
                input,
424
940k
                MQTTDisconnectData {
425
940k
                    reason_code: None,
426
940k
                    properties: None,
427
940k
                },
428
940k
            ));
429
171k
        }
430
171k
        if remaining_len == 0 {
431
            // The Reason Code and Property Length can be omitted if the Reason
432
            // Code is 0x00 (Normal disconnection) and there are no Properties.
433
            // In this case the DISCONNECT has a Remaining Length of 0.
434
100k
            return Ok((
435
100k
                input,
436
100k
                MQTTDisconnectData {
437
100k
                    reason_code: Some(0),
438
100k
                    properties: None,
439
100k
                },
440
100k
            ));
441
70.1k
        }
442
70.1k
        match be_u8(input) {
443
70.1k
            Ok((rem, reason_code)) => {
444
70.1k
                // We are checking for 1 because in that case we have a
445
70.1k
                // header plus reason code, but no properties.
446
70.1k
                if remaining_len == 1 {
447
                    // no properties
448
69.2k
                    return Ok((
449
69.2k
                        rem,
450
69.2k
                        MQTTDisconnectData {
451
69.2k
                            reason_code: Some(0),
452
69.2k
                            properties: None,
453
69.2k
                        },
454
69.2k
                    ));
455
962
                }
456
962
                match parse_properties(rem, true) {
457
949
                    Ok((rem, properties)) => {
458
949
                        return Ok((
459
949
                            rem,
460
949
                            MQTTDisconnectData {
461
949
                                reason_code: Some(reason_code),
462
949
                                properties,
463
949
                            },
464
949
                        ));
465
                    }
466
13
                    Err(e) => return Err(e),
467
                }
468
            }
469
0
            Err(e) => return Err(e),
470
        }
471
1.11M
    }
472
1.11M
}
473
474
#[inline]
475
35.9k
fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> {
476
35.9k
    let (i, reason_code) = be_u8(i)?;
477
35.9k
    let (i, properties) = parse_properties(i, true)?;
478
35.2k
    Ok((
479
35.2k
        i,
480
35.2k
        MQTTAuthData {
481
35.2k
            reason_code,
482
35.2k
            properties,
483
35.2k
        },
484
35.2k
    ))
485
35.9k
}
486
487
#[inline]
488
31.0M
fn parse_remaining_message<'a>(
489
31.0M
    full: &'a [u8],
490
31.0M
    len: usize,
491
31.0M
    skiplen: usize,
492
31.0M
    header: FixedHeader,
493
31.0M
    message_type: MQTTTypeCode,
494
31.0M
    protocol_version: u8,
495
31.0M
) -> impl Fn(&'a [u8]) -> IResult<&'a [u8], MQTTMessage> {
496
31.0M
    move |input: &'a [u8]| {
497
31.0M
        match message_type {
498
41.5k
            MQTTTypeCode::CONNECT => match parse_connect(input) {
499
41.2k
                Ok((_rem, conn)) => {
500
41.2k
                    let msg = MQTTMessage {
501
41.2k
                        header,
502
41.2k
                        op: MQTTOperation::CONNECT(conn),
503
41.2k
                    };
504
41.2k
                    Ok((&full[skiplen + len..], msg))
505
                }
506
366
                Err(e) => Err(e),
507
            },
508
314k
            MQTTTypeCode::CONNACK => match parse_connack(protocol_version)(input) {
509
313k
                Ok((_rem, connack)) => {
510
313k
                    let msg = MQTTMessage {
511
313k
                        header,
512
313k
                        op: MQTTOperation::CONNACK(connack),
513
313k
                    };
514
313k
                    Ok((&full[skiplen + len..], msg))
515
                }
516
92
                Err(e) => Err(e),
517
            },
518
            MQTTTypeCode::PUBLISH => {
519
112k
                match parse_publish(protocol_version, header.qos_level > 0)(input) {
520
112k
                    Ok((_rem, publish)) => {
521
112k
                        let msg = MQTTMessage {
522
112k
                            header,
523
112k
                            op: MQTTOperation::PUBLISH(publish),
524
112k
                        };
525
112k
                        Ok((&full[skiplen + len..], msg))
526
                    }
527
179
                    Err(e) => Err(e),
528
                }
529
            }
530
            MQTTTypeCode::PUBACK
531
            | MQTTTypeCode::PUBREC
532
            | MQTTTypeCode::PUBREL
533
1.99M
            | MQTTTypeCode::PUBCOMP => match parse_msgidonly(protocol_version)(input) {
534
1.99M
                Ok((_rem, msgidonly)) => {
535
1.99M
                    let msg = MQTTMessage {
536
1.99M
                        header,
537
1.99M
                        op: match message_type {
538
120k
                            MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly),
539
1.50M
                            MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly),
540
201k
                            MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly),
541
162k
                            MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly),
542
0
                            _ => MQTTOperation::UNASSIGNED,
543
                        },
544
                    };
545
1.99M
                    Ok((&full[skiplen + len..], msg))
546
                }
547
269
                Err(e) => Err(e),
548
            },
549
85.9k
            MQTTTypeCode::SUBSCRIBE => match parse_subscribe(protocol_version)(input) {
550
85.8k
                Ok((_rem, subs)) => {
551
85.8k
                    let msg = MQTTMessage {
552
85.8k
                        header,
553
85.8k
                        op: MQTTOperation::SUBSCRIBE(subs),
554
85.8k
                    };
555
85.8k
                    Ok((&full[skiplen + len..], msg))
556
                }
557
129
                Err(e) => Err(e),
558
            },
559
332k
            MQTTTypeCode::SUBACK => match parse_suback(protocol_version)(input) {
560
332k
                Ok((_rem, suback)) => {
561
332k
                    let msg = MQTTMessage {
562
332k
                        header,
563
332k
                        op: MQTTOperation::SUBACK(suback),
564
332k
                    };
565
332k
                    Ok((&full[skiplen + len..], msg))
566
                }
567
25
                Err(e) => Err(e),
568
            },
569
249k
            MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(protocol_version)(input) {
570
249k
                Ok((_rem, unsub)) => {
571
249k
                    let msg = MQTTMessage {
572
249k
                        header,
573
249k
                        op: MQTTOperation::UNSUBSCRIBE(unsub),
574
249k
                    };
575
249k
                    Ok((&full[skiplen + len..], msg))
576
                }
577
25
                Err(e) => Err(e),
578
            },
579
420k
            MQTTTypeCode::UNSUBACK => match parse_unsuback(protocol_version)(input) {
580
420k
                Ok((_rem, unsuback)) => {
581
420k
                    let msg = MQTTMessage {
582
420k
                        header,
583
420k
                        op: MQTTOperation::UNSUBACK(unsuback),
584
420k
                    };
585
420k
                    Ok((&full[skiplen + len..], msg))
586
                }
587
23
                Err(e) => Err(e),
588
            },
589
            MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => {
590
1.14M
                let msg = MQTTMessage {
591
1.14M
                    header,
592
1.14M
                    op: match message_type {
593
1.09M
                        MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ,
594
53.1k
                        MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP,
595
0
                        _ => MQTTOperation::UNASSIGNED,
596
                    },
597
                };
598
1.14M
                Ok((&full[skiplen + len..], msg))
599
            }
600
1.11M
            MQTTTypeCode::DISCONNECT => match parse_disconnect(len, protocol_version)(input) {
601
1.11M
                Ok((_rem, disco)) => {
602
1.11M
                    let msg = MQTTMessage {
603
1.11M
                        header,
604
1.11M
                        op: MQTTOperation::DISCONNECT(disco),
605
1.11M
                    };
606
1.11M
                    Ok((&full[skiplen + len..], msg))
607
                }
608
13
                Err(e) => Err(e),
609
            },
610
35.9k
            MQTTTypeCode::AUTH => match parse_auth(input) {
611
35.2k
                Ok((_rem, auth)) => {
612
35.2k
                    let msg = MQTTMessage {
613
35.2k
                        header,
614
35.2k
                        op: MQTTOperation::AUTH(auth),
615
35.2k
                    };
616
35.2k
                    Ok((&full[skiplen + len..], msg))
617
                }
618
680
                Err(e) => Err(e),
619
            },
620
            // Unassigned message type code. Unlikely to happen with
621
            // regular traffic, might be an indication for broken or
622
            // crafted MQTT traffic.
623
            _ => {
624
25.1M
                let msg = MQTTMessage {
625
25.1M
                    header,
626
25.1M
                    op: MQTTOperation::UNASSIGNED,
627
25.1M
                };
628
25.1M
                return Ok((&full[skiplen + len..], msg));
629
            }
630
        }
631
31.0M
    }
632
31.0M
}
633
634
31.2M
pub fn parse_message(
635
31.2M
    input: &[u8],
636
31.2M
    protocol_version: u8,
637
31.2M
    max_msg_size: usize,
638
31.2M
) -> IResult<&[u8], MQTTMessage> {
639
31.2M
    // Parse the fixed header first. This is identical across versions and can
640
31.2M
    // be between 2 and 5 bytes long.
641
31.2M
    match parse_fixed_header(input) {
642
31.1M
        Ok((fullrem, header)) => {
643
31.1M
            let len = header.remaining_length as usize;
644
31.1M
            // This is the length of the fixed header that we need to skip
645
31.1M
            // before returning the remainder. It is the sum of the length
646
31.1M
            // of the flag byte (1) and the length of the message length
647
31.1M
            // varint.
648
31.1M
            let skiplen = input.len() - fullrem.len();
649
31.1M
            let message_type = header.message_type;
650
31.1M
651
31.1M
            // If the remaining length (message length) exceeds the specified
652
31.1M
            // limit, we return a special truncation message type, containing
653
31.1M
            // no parsed metadata but just the skipped length and the message
654
31.1M
            // type.
655
31.1M
            if len > max_msg_size {
656
599
                let msg = MQTTMessage {
657
599
                    header,
658
599
                    op: MQTTOperation::TRUNCATED(MQTTTruncatedData {
659
599
                        original_message_type: message_type,
660
599
                        skipped_length: len + skiplen,
661
599
                    }),
662
599
                };
663
599
                // In this case we return the full input buffer, since this is
664
599
                // what the skipped_length value also refers to: header _and_
665
599
                // remaining length.
666
599
                return Ok((input, msg));
667
31.1M
            }
668
31.1M
669
31.1M
            // We have not exceeded the maximum length limit, but still do not
670
31.1M
            // have enough data in the input buffer to handle the full
671
31.1M
            // message. Signal this by returning an Incomplete IResult value.
672
31.1M
            if fullrem.len() < len {
673
157k
                return Err(Err::Incomplete(Needed::new(len - fullrem.len())));
674
31.0M
            }
675
31.0M
676
31.0M
            // Parse the contents of the buffer into a single message.
677
31.0M
            // We reslice the remainder into the portion that we are interested
678
31.0M
            // in, according to the length we just parsed. This helps with the
679
31.0M
            // complete() parsers, where we would otherwise need to keep track
680
31.0M
            // of the already parsed length.
681
31.0M
            let rem = &fullrem[..len];
682
31.0M
683
31.0M
            // Parse remaining message in buffer. We use complete() to ensure
684
31.0M
            // we do not request additional content in case of incomplete
685
31.0M
            // parsing, but raise an error instead as we should have all the
686
31.0M
            // data asked for in the header.
687
31.0M
            return complete(parse_remaining_message(
688
31.0M
                input,
689
31.0M
                len,
690
31.0M
                skiplen,
691
31.0M
                header,
692
31.0M
                message_type,
693
31.0M
                protocol_version,
694
31.0M
            ))(rem);
695
        }
696
21.4k
        Err(err) => {
697
21.4k
            return Err(err);
698
        }
699
    }
700
31.2M
}
701
702
#[cfg(test)]
703
mod tests {
704
    use super::*;
705
    use nom7::error::ErrorKind;
706
707
    fn test_mqtt_parse_variable_fail(buf0: &[u8]) {
708
        let r0 = parse_mqtt_variable_integer(buf0);
709
        match r0 {
710
            Ok((_, _)) => {
711
                panic!("Result should not have been ok.");
712
            }
713
            Err(Err::Error(err)) => {
714
                assert_eq!(err.code, ErrorKind::Verify);
715
            }
716
            _ => {
717
                panic!("Result should be an error.");
718
            }
719
        }
720
    }
721
722
    fn test_mqtt_parse_variable_check(buf0: &[u8], expected: u32) {
723
        let r0 = parse_mqtt_variable_integer(buf0);
724
        match r0 {
725
            Ok((_, val)) => {
726
                assert_eq!(val, expected);
727
            }
728
            Err(_) => {
729
                panic!("Result should have been ok.");
730
            }
731
        }
732
    }
733
734
    #[test]
735
    fn test_mqtt_parse_variable_integer_largest_input() {
736
        test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0xFF]);
737
    }
738
739
    #[test]
740
    fn test_mqtt_parse_variable_integer_boundary() {
741
        test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0x80]);
742
    }
743
744
    #[test]
745
    fn test_mqtt_parse_variable_integer_largest_valid() {
746
        test_mqtt_parse_variable_check(&[0xFF, 0xFF, 0xFF, 0x7F], 268435455);
747
    }
748
749
    #[test]
750
    fn test_mqtt_parse_variable_integer_smallest_valid() {
751
        test_mqtt_parse_variable_check(&[0x0], 0);
752
    }
753
754
    #[test]
755
    fn test_parse_fixed_header() {
756
        let buf = [
757
            0x30, /* Header Flags: 0x30, Message Type: Publish Message, QoS Level: At most once delivery (Fire and Forget) */
758
            0xb7, 0x97, 0x02, /* Msg Len: 35767 */
759
            0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
760
            0x00, 0x01, 0xa0,
761
        ];
762
763
        let result = parse_fixed_header(&buf);
764
        match result {
765
            Ok((remainder, message)) => {
766
                assert_eq!(message.message_type, MQTTTypeCode::PUBLISH);
767
                assert!(!message.dup_flag);
768
                assert_eq!(message.qos_level, 0);
769
                assert!(!message.retain);
770
                assert_eq!(message.remaining_length, 35767);
771
                assert_eq!(remainder.len(), 17);
772
            }
773
            Err(Err::Incomplete(_)) => {
774
                panic!("Result should not have been incomplete.");
775
            }
776
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
777
                panic!("Result should not be an error: {:?}.", err);
778
            }
779
        }
780
    }
781
782
    #[test]
783
    fn test_parse_properties() {
784
        let buf = [
785
            0x03, 0x21, 0x00, 0x14, /* Properties */
786
            0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
787
            0x00, 0x01, 0xa0,
788
        ];
789
790
        let result = parse_properties(&buf, true);
791
        match result {
792
            Ok((remainder, message)) => {
793
                let res = message.unwrap();
794
                assert_eq!(res[0], MQTTProperty::RECEIVE_MAXIMUM(20));
795
                assert_eq!(remainder.len(), 17);
796
            }
797
            Err(Err::Incomplete(_)) => {
798
                panic!("Result should not have been incomplete.");
799
            }
800
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
801
                panic!("Result should not be an error: {:?}.", err);
802
            }
803
        }
804
    }
805
    #[test]
806
    fn test_parse_connect() {
807
        let buf = [
808
            0x00, 0x04, /* Protocol Name Length: 4 */
809
            0x4d, 0x51, 0x54, 0x54, /* Protocol Name: MQTT */
810
            0x05, /* Version: MQTT v5.0 (5) */
811
            0xc2, /*Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */
812
            0x00, 0x3c, /* Keep Alive: 60 */
813
            0x03, 0x21, 0x00, 0x14, /* Properties */
814
            0x00, 0x00, /* Client ID Length: 0 */
815
            0x00, 0x04, /* User Name Length: 4 */
816
            0x75, 0x73, 0x65, 0x72, /* User Name: user */
817
            0x00, 0x04, /* Password Length: 4 */
818
            0x71, 0x61, 0x71, 0x73, /* Password: pass */
819
        ];
820
821
        let result = parse_connect(&buf);
822
        match result {
823
            Ok((remainder, message)) => {
824
                assert_eq!(message.protocol_string, "MQTT");
825
                assert_eq!(message.protocol_version, 5);
826
                assert!(message.username_flag);
827
                assert!(message.password_flag);
828
                assert!(!message.will_retain);
829
                assert_eq!(message.will_qos, 0);
830
                assert!(!message.will_flag);
831
                assert!(message.clean_session);
832
                assert_eq!(message.keepalive, 60);
833
                assert_eq!(remainder.len(), 0);
834
            }
835
            Err(Err::Incomplete(_)) => {
836
                panic!("Result should not have been incomplete.");
837
            }
838
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
839
                panic!("Result should not be an error: {:?}.", err);
840
            }
841
        }
842
    }
843
844
    #[test]
845
    fn test_parse_connack() {
846
        let buf = [
847
            0x00, /* Acknowledge Flags: 0x00 (0000 000. = Reserved: Not set )(.... ...0 = Session Present: Not set) */
848
            0x00, /* Reason Code: Success (0) */
849
            0x2f, /* Total Length: 47 */
850
            0x22, /* ID: Topic Alias Maximum (0x22) */
851
            0x00, 0x0a, /* Value: 10 */
852
            0x12, /* ID: Assigned Client Identifier (0x12) */
853
            0x00, 0x29, /* Length: 41 */
854
            0x61, 0x75, 0x74, 0x6f, 0x2d, 0x31, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, 0x30, 0x2d,
855
            0x30, 0x38, 0x45, 0x33, 0x2d, 0x33, 0x42, 0x41, 0x31, 0x2d, 0x32, 0x45, 0x39, 0x37,
856
            0x2d, 0x45, 0x39, 0x41, 0x30, 0x42, 0x34, 0x30, 0x36, 0x34, 0x42, 0x46,
857
            0x35, /* 41 byte Value: auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5 */
858
        ];
859
        let client_identifier = "auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5";
860
861
        let result = parse_connack(5);
862
        let input = result(&buf);
863
        match input {
864
            Ok((remainder, message)) => {
865
                let props = message.properties.unwrap();
866
                assert_eq!(props[0], MQTTProperty::TOPIC_ALIAS_MAXIMUM(10));
867
                assert_eq!(
868
                    props[1],
869
                    MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(client_identifier.to_string())
870
                );
871
                assert_eq!(message.return_code, 0);
872
                assert!(!message.session_present);
873
                assert_eq!(remainder.len(), 0);
874
            }
875
876
            Err(Err::Incomplete(_)) => {
877
                panic!("Result should not have been incomplete.");
878
            }
879
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
880
                panic!("Result should not be an error: {:?}.", err);
881
            }
882
        }
883
    }
884
885
    #[test]
886
    fn test_parse_publish() {
887
        let buf = [
888
            0x00, 0x06, /* Topic Length: 6 */
889
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
890
            0x00, 0x01, /* Message Identifier: 1 */
891
            0x00, /* Properties 6 */
892
            0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
893
        ];
894
895
        let result = parse_publish(5, true);
896
        let input = result(&buf);
897
        match input {
898
            Ok((remainder, message)) => {
899
                let message_id = message.message_id.unwrap();
900
                assert_eq!(message.topic, "topicX");
901
                assert_eq!(message_id, 1);
902
                assert_eq!(remainder.len(), 13);
903
            }
904
905
            Err(Err::Incomplete(_)) => {
906
                panic!("Result should not have been incomplete.");
907
            }
908
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
909
                panic!("Result should not be an error: {:?}.", err);
910
            }
911
        }
912
    }
913
914
    #[test]
915
    fn test_parse_msgidonly_v3() {
916
        let buf = [
917
            0x00, 0x01, /* Message Identifier: 1 */
918
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34,
919
            0x33, 0x45, 0x38, 0x30,
920
        ];
921
922
        let result = parse_msgidonly(3);
923
        let input = result(&buf);
924
        match input {
925
            Ok((remainder, message)) => {
926
                assert_eq!(message.message_id, 1);
927
                assert_eq!(remainder.len(), 18);
928
            }
929
930
            Err(Err::Incomplete(_)) => {
931
                panic!("Result should not have been incomplete.");
932
            }
933
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
934
                panic!("Result should not be an error: {:?}.", err);
935
            }
936
        }
937
    }
938
939
    #[test]
940
    fn test_parse_msgidonly_v5() {
941
        let buf = [
942
            0x00, 0x01,   /* Message Identifier: 1 */
943
            0x00, /* Reason Code: 0 */
944
            0x00, /* Properties */
945
            0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
946
        ];
947
948
        let result = parse_msgidonly(5);
949
        let input = result(&buf);
950
        match input {
951
            Ok((remainder, message)) => {
952
                let reason_code = message.reason_code.unwrap();
953
                assert_eq!(message.message_id, 1);
954
                assert_eq!(reason_code, 0);
955
                assert_eq!(remainder.len(), 12);
956
            }
957
958
            Err(Err::Incomplete(_)) => {
959
                panic!("Result should not have been incomplete.");
960
            }
961
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
962
                panic!("Result should not be an error: {:?}.", err);
963
            }
964
        }
965
    }
966
967
    #[test]
968
    fn test_parse_subscribe() {
969
        let buf = [
970
            0x00, 0x01, /* Message Identifier: 1 */
971
            0x00, /* Properties 6 */
972
            0x00, 0x06, /* Topic Length: 6  */
973
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
974
            0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
975
            0x00, 0x06, /* Topic Length: 6  */
976
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x59, /* Topic: topicY */
977
            0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
978
            0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
979
        ];
980
981
        let result = parse_subscribe(5);
982
        let input = result(&buf);
983
        match input {
984
            Ok((remainder, message)) => {
985
                assert_eq!(message.topics[0].topic_name, "topicX");
986
                assert_eq!(message.topics[1].topic_name, "topicY");
987
                assert_eq!(message.topics[0].qos, 0);
988
                assert_eq!(message.message_id, 1);
989
                assert_eq!(remainder.len(), 12);
990
            }
991
992
            Err(Err::Incomplete(_)) => {
993
                panic!("Result should not have been incomplete.");
994
            }
995
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
996
                panic!("Result should not be an error: {:?}.", err);
997
            }
998
        }
999
    }
1000
    #[test]
1001
    fn test_parse_suback() {
1002
        let buf = [
1003
            0x00, 0x01, /* Message Identifier: 1 */
1004
            0x00, /* Properties 6 */
1005
            0x00, 0x00, /* Topic Length: 6  */
1006
        ];
1007
1008
        let result = parse_suback(5);
1009
        let input = result(&buf);
1010
        match input {
1011
            Ok((remainder, message)) => {
1012
                assert_eq!(message.qoss[0], 0);
1013
                assert_eq!(message.message_id, 1);
1014
                assert_eq!(remainder.len(), 3);
1015
            }
1016
1017
            Err(Err::Incomplete(_)) => {
1018
                panic!("Result should not have been incomplete.");
1019
            }
1020
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1021
                panic!("Result should not be an error: {:?}.", err);
1022
            }
1023
        }
1024
    }
1025
    #[test]
1026
    fn test_parse_unsubscribe() {
1027
        let buf = [
1028
            0x00, 0x01, /* Message Identifier: 1 */
1029
            0x00, /* Properties 6 */
1030
            0x00, 0x06, /* Topic Length: 6  */
1031
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
1032
            0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
1033
        ];
1034
1035
        let result = parse_unsubscribe(5);
1036
        let input = result(&buf);
1037
        match input {
1038
            Ok((remainder, message)) => {
1039
                assert_eq!(message.topics[0], "topicX");
1040
                assert_eq!(message.message_id, 1);
1041
                assert_eq!(remainder.len(), 1);
1042
            }
1043
1044
            Err(Err::Incomplete(_)) => {
1045
                panic!("Result should not have been incomplete.");
1046
            }
1047
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1048
                panic!("Result should not be an error: {:?}.", err);
1049
            }
1050
        }
1051
    }
1052
1053
    #[test]
1054
    fn test_parse_unsuback() {
1055
        let buf = [
1056
            0x00, 0x01, /* Message Identifier: 1 */
1057
            0x00, /* Properties 6 */
1058
            0x00, /* Reason Code */
1059
        ];
1060
1061
        let result = parse_unsuback(5);
1062
        let input = result(&buf);
1063
        match input {
1064
            Ok((remainder, message)) => {
1065
                let reason_codes = message.reason_codes.unwrap();
1066
                assert_eq!(reason_codes[0], 0);
1067
                assert_eq!(message.message_id, 1);
1068
                assert_eq!(remainder.len(), 0);
1069
            }
1070
1071
            Err(Err::Incomplete(_)) => {
1072
                panic!("Result should not have been incomplete.");
1073
            }
1074
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1075
                panic!("Result should not be an error: {:?}.", err);
1076
            }
1077
        }
1078
    }
1079
1080
    #[test]
1081
    fn test_parse_disconnect() {
1082
        let buf = [
1083
            0xe0, /* Reason: 0 */
1084
            0x00, /* Message Identifier: 1 */
1085
        ];
1086
1087
        let result = parse_disconnect(0, 5);
1088
        let input = result(&buf);
1089
        match input {
1090
            Ok((remainder, message)) => {
1091
                let reason_code = message.reason_code.unwrap();
1092
                assert_eq!(reason_code, 0);
1093
                assert_eq!(remainder.len(), 2);
1094
            }
1095
1096
            Err(Err::Incomplete(_)) => {
1097
                panic!("Result should not have been incomplete.");
1098
            }
1099
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1100
                panic!("Result should not be an error: {:?}.", err);
1101
            }
1102
        }
1103
    }
1104
1105
    #[test]
1106
    fn test_parse_message() {
1107
        let buf = [
1108
            0x10, /* Message Identifier: 1 */
1109
            0x2f, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05,
1110
            0xc2, /* Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */
1111
            0x00, 0x3c, 0x03, 0x21, 0x00, 0x14, /* Properties */
1112
            0x00, 0x13, 0x6d, 0x79, 0x76, 0x6f, 0x69, 0x63, 0x65, 0x69, 0x73, 0x6d, 0x79, 0x70,
1113
            0x61, 0x73, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x00, 0x04, 0x75, 0x73, 0x65, 0x72, 0x00,
1114
            0x04, 0x70, 0x61, 0x73, 0x73,
1115
        ];
1116
1117
        let result = parse_message(&buf, 5, 40);
1118
        match result {
1119
            Ok((remainder, message)) => {
1120
                assert_eq!(message.header.message_type, MQTTTypeCode::CONNECT);
1121
                assert!(!message.header.dup_flag);
1122
                assert_eq!(message.header.qos_level, 0);
1123
                assert!(!message.header.retain);
1124
                assert_eq!(remainder.len(), 49);
1125
            }
1126
1127
            Err(Err::Incomplete(_)) => {
1128
                panic!("Result should not have been incomplete.");
1129
            }
1130
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1131
                panic!("Result should not be an error: {:?}.", err);
1132
            }
1133
        }
1134
    }
1135
}