Coverage Report

Created: 2026-03-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/suricata7/rust/src/mqtt/parser.rs
Line
Count
Source
1
/* Copyright (C) 2020-2022 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17
18
// written by Sascha Steinbiss <sascha@steinbiss.name>
19
20
use crate::common::nom7::bits;
21
use crate::mqtt::mqtt_message::*;
22
use crate::mqtt::mqtt_property::*;
23
use nom7::bits::streaming::take as take_bits;
24
use nom7::bytes::complete::take;
25
use nom7::bytes::streaming::take_while_m_n;
26
use nom7::combinator::{complete, cond, verify};
27
use nom7::multi::{length_data, many0, many1};
28
use nom7::number::streaming::*;
29
use nom7::sequence::tuple;
30
use nom7::{Err, IResult, Needed};
31
use num_traits::FromPrimitive;
32
33
#[derive(Copy, Clone, Debug)]
34
pub struct FixedHeader {
35
    pub message_type: MQTTTypeCode,
36
    pub dup_flag: bool,
37
    pub qos_level: u8,
38
    pub retain: bool,
39
    pub remaining_length: u32,
40
}
41
42
// PARSING HELPERS
43
44
#[inline]
45
97.5M
fn is_continuation_bit_set(b: u8) -> bool {
46
97.5M
    return (b & 128) != 0;
47
97.5M
}
48
49
#[inline]
50
41.0M
fn convert_varint(continued: Vec<u8>, last: u8) -> u32 {
51
41.0M
    let mut multiplier = 1u32;
52
41.0M
    let mut value = 0u32;
53
42.1M
    for val in &continued {
54
1.09M
        value += (val & 127) as u32 * multiplier;
55
1.09M
        multiplier *= 128u32;
56
1.09M
    }
57
41.0M
    value += (last & 127) as u32 * multiplier;
58
41.0M
    return value;
59
41.0M
}
60
61
// DATA TYPES
62
63
#[inline]
64
1.84M
pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> {
65
1.84M
    let (i, content) = length_data(be_u16)(i)?;
66
1.49M
    Ok((i, String::from_utf8_lossy(content).to_string()))
67
1.84M
}
68
69
#[inline]
70
41.1M
pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> {
71
41.1M
    let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?;
72
41.0M
    let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?;
73
41.0M
    Ok((
74
41.0M
        i,
75
41.0M
        convert_varint(continued_part.to_vec(), non_continued_part),
76
41.0M
    ))
77
41.1M
}
78
79
#[inline]
80
118k
pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec<u8>> {
81
118k
    let (i, data) = length_data(be_u16)(i)?;
82
118k
    Ok((i, data.to_vec()))
83
118k
}
84
85
#[inline]
86
12.5k
pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> {
87
12.5k
    let (i, name) = parse_mqtt_string(i)?;
88
12.5k
    let (i, value) = parse_mqtt_string(i)?;
89
12.5k
    Ok((i, (name, value)))
90
12.5k
}
91
92
// MESSAGE COMPONENTS
93
94
#[inline]
95
7.29M
fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> {
96
7.29M
    let (i, identifier) = parse_mqtt_variable_integer(i)?;
97
7.29M
    let (i, value) = parse_qualified_property(i, identifier)?;
98
7.29M
    Ok((i, value))
99
7.29M
}
100
101
#[inline]
102
1.46M
fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQTTProperty>>> {
103
    // do not try to parse anything when precondition is not met
104
1.46M
    if !precond {
105
1.36M
        return Ok((input, None));
106
106k
    }
107
    // parse properties length
108
106k
    match parse_mqtt_variable_integer(input) {
109
106k
        Ok((rem, proplen)) => {
110
106k
            if proplen == 0 {
111
                // no properties
112
72.8k
                return Ok((rem, None));
113
33.9k
            }
114
            // parse properties
115
33.9k
            let mut props = Vec::<MQTTProperty>::new();
116
33.9k
            let (rem, mut newrem) = take(proplen as usize)(rem)?;
117
7.32M
            while !newrem.is_empty() {
118
7.29M
                match parse_property(newrem) {
119
7.29M
                    Ok((rem2, val)) => {
120
7.29M
                        props.push(val);
121
7.29M
                        newrem = rem2;
122
7.29M
                    }
123
685
                    Err(e) => return Err(e),
124
                }
125
            }
126
33.0k
            return Ok((rem, Some(props)));
127
        }
128
50
        Err(e) => return Err(e),
129
    }
130
1.46M
}
131
132
#[inline]
133
33.6M
fn parse_fixed_header_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8)> {
134
33.6M
    bits(tuple((
135
33.6M
        take_bits(4u8),
136
33.6M
        take_bits(1u8),
137
33.6M
        take_bits(2u8),
138
33.6M
        take_bits(1u8),
139
33.6M
    )))(i)
140
33.6M
}
141
142
#[inline]
143
33.6M
fn parse_message_type(code: u8) -> MQTTTypeCode {
144
33.6M
    match code {
145
33.6M
        0..=15 => {
146
33.6M
            if let Some(t) = FromPrimitive::from_u8(code) {
147
33.6M
                return t;
148
            } else {
149
0
                return MQTTTypeCode::UNASSIGNED;
150
            }
151
        }
152
        _ => {
153
            // unreachable state in parser: we only pass values parsed from take_bits!(4u8)
154
0
            debug_validate_fail!("can't have message codes >15 from 4 bits");
155
0
            MQTTTypeCode::UNASSIGNED
156
        }
157
    }
158
33.6M
}
159
160
#[inline]
161
33.6M
pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> {
162
33.6M
    let (i, flags) = parse_fixed_header_flags(i)?;
163
33.6M
    let (i, remaining_length) = parse_mqtt_variable_integer(i)?;
164
33.6M
    Ok((
165
33.6M
        i,
166
33.6M
        FixedHeader {
167
33.6M
            message_type: parse_message_type(flags.0),
168
33.6M
            dup_flag: flags.1 != 0,
169
33.6M
            qos_level: flags.2,
170
33.6M
            retain: flags.3 != 0,
171
33.6M
            remaining_length,
172
33.6M
        },
173
33.6M
    ))
174
33.6M
}
175
176
#[inline]
177
#[allow(clippy::type_complexity)]
178
39.2k
fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> {
179
39.2k
    bits(tuple((
180
39.2k
        take_bits(1u8),
181
39.2k
        take_bits(1u8),
182
39.2k
        take_bits(1u8),
183
39.2k
        take_bits(2u8),
184
39.2k
        take_bits(1u8),
185
39.2k
        take_bits(1u8),
186
39.2k
        take_bits(1u8),
187
39.2k
    )))(i)
188
39.2k
}
189
190
#[inline]
191
39.4k
fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> {
192
39.4k
    let (i, protocol_string) = parse_mqtt_string(i)?;
193
39.2k
    let (i, protocol_version) = be_u8(i)?;
194
39.2k
    let (i, flags) = parse_connect_variable_flags(i)?;
195
39.2k
    let (i, keepalive) = be_u16(i)?;
196
39.2k
    let (i, properties) = parse_properties(i, protocol_version == 5)?;
197
39.1k
    let (i, client_id) = parse_mqtt_string(i)?;
198
39.0k
    let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?;
199
38.9k
    let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?;
200
38.8k
    let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?;
201
38.8k
    let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?;
202
38.7k
    let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?;
203
38.7k
    Ok((
204
38.7k
        i,
205
38.7k
        MQTTConnectData {
206
38.7k
            protocol_string,
207
38.7k
            protocol_version,
208
38.7k
            username_flag: flags.0 != 0,
209
38.7k
            password_flag: flags.1 != 0,
210
38.7k
            will_retain: flags.2 != 0,
211
38.7k
            will_qos: flags.3,
212
38.7k
            will_flag: flags.4 != 0,
213
38.7k
            clean_session: flags.5 != 0,
214
38.7k
            keepalive,
215
38.7k
            client_id,
216
38.7k
            will_topic,
217
38.7k
            will_message,
218
38.7k
            username,
219
38.7k
            password,
220
38.7k
            properties,
221
38.7k
            will_properties,
222
38.7k
        },
223
38.7k
    ))
224
39.4k
}
225
226
#[inline]
227
277k
fn parse_connack(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTConnackData> {
228
277k
    move |i: &[u8]| {
229
277k
        let (i, topic_name_compression_response) = be_u8(i)?;
230
277k
        let (i, return_code) = be_u8(i)?;
231
277k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
232
277k
        Ok((
233
277k
            i,
234
277k
            MQTTConnackData {
235
277k
                session_present: (topic_name_compression_response & 1) != 0,
236
277k
                return_code,
237
277k
                properties,
238
277k
            },
239
277k
        ))
240
277k
    }
241
277k
}
242
243
#[inline]
244
152k
fn parse_publish(
245
152k
    protocol_version: u8,
246
152k
    has_id: bool,
247
152k
) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTPublishData> {
248
152k
    move |i: &[u8]| {
249
152k
        let (i, topic) = parse_mqtt_string(i)?;
250
152k
        let (i, message_id) = cond(has_id, be_u16)(i)?;
251
152k
        let (message, properties) = parse_properties(i, protocol_version == 5)?;
252
152k
        Ok((
253
152k
            i,
254
152k
            MQTTPublishData {
255
152k
                topic,
256
152k
                message_id,
257
152k
                message: message.to_vec(),
258
152k
                properties,
259
152k
            },
260
152k
        ))
261
152k
    }
262
152k
}
263
264
#[inline]
265
1.50M
fn parse_msgidonly(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
266
1.50M
    move |input: &[u8]| {
267
1.50M
        if protocol_version < 5 {
268
            // before v5 we don't even have to care about reason codes
269
            // and properties, lucky us
270
1.45M
            return parse_msgidonly_v3(input);
271
54.2k
        }
272
54.2k
        let remaining_len = input.len();
273
54.2k
        match be_u16(input) {
274
54.2k
            Ok((rem, message_id)) => {
275
54.2k
                if remaining_len == 2 {
276
                    // from the spec: " The Reason Code and Property Length can be
277
                    // omitted if the Reason Code is 0x00 (Success) and there are
278
                    // no Properties. In this case the message has a Remaining
279
                    // Length of 2."
280
40.6k
                    return Ok((
281
40.6k
                        rem,
282
40.6k
                        MQTTMessageIdOnly {
283
40.6k
                            message_id,
284
40.6k
                            reason_code: Some(0),
285
40.6k
                            properties: None,
286
40.6k
                        },
287
40.6k
                    ));
288
13.5k
                }
289
13.5k
                match be_u8(rem) {
290
13.5k
                    Ok((rem, reason_code)) => {
291
                        // We are checking for 3 because in that case we have a
292
                        // header plus reason code, but no properties.
293
13.5k
                        if remaining_len == 3 {
294
                            // no properties
295
1.16k
                            return Ok((
296
1.16k
                                rem,
297
1.16k
                                MQTTMessageIdOnly {
298
1.16k
                                    message_id,
299
1.16k
                                    reason_code: Some(reason_code),
300
1.16k
                                    properties: None,
301
1.16k
                                },
302
1.16k
                            ));
303
12.3k
                        }
304
12.3k
                        match parse_properties(rem, true) {
305
12.2k
                            Ok((rem, properties)) => {
306
12.2k
                                return Ok((
307
12.2k
                                    rem,
308
12.2k
                                    MQTTMessageIdOnly {
309
12.2k
                                        message_id,
310
12.2k
                                        reason_code: Some(reason_code),
311
12.2k
                                        properties,
312
12.2k
                                    },
313
12.2k
                                ));
314
                            }
315
165
                            Err(e) => return Err(e),
316
                        }
317
                    }
318
0
                    Err(e) => return Err(e),
319
                }
320
            }
321
43
            Err(e) => return Err(e),
322
        }
323
1.50M
    }
324
1.50M
}
325
326
#[inline]
327
1.45M
fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
328
1.45M
    let (i, message_id) = be_u16(i)?;
329
1.45M
    Ok((
330
1.45M
        i,
331
1.45M
        MQTTMessageIdOnly {
332
1.45M
            message_id,
333
1.45M
            reason_code: None,
334
1.45M
            properties: None,
335
1.45M
        },
336
1.45M
    ))
337
1.45M
}
338
339
#[inline]
340
590k
fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> {
341
590k
    let (i, topic_name) = parse_mqtt_string(i)?;
342
528k
    let (i, qos) = be_u8(i)?;
343
399k
    Ok((i, MQTTSubscribeTopicData { topic_name, qos }))
344
590k
}
345
346
#[inline]
347
191k
fn parse_subscribe(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubscribeData> {
348
191k
    move |i: &[u8]| {
349
191k
        let (i, message_id) = be_u16(i)?;
350
190k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
351
190k
        let (i, topics) = many1(complete(parse_subscribe_topic))(i)?;
352
190k
        Ok((
353
190k
            i,
354
190k
            MQTTSubscribeData {
355
190k
                message_id,
356
190k
                topics,
357
190k
                properties,
358
190k
            },
359
190k
        ))
360
191k
    }
361
191k
}
362
363
#[inline]
364
207k
fn parse_suback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubackData> {
365
207k
    move |i: &[u8]| {
366
207k
        let (i, message_id) = be_u16(i)?;
367
207k
        let (qoss, properties) = parse_properties(i, protocol_version == 5)?;
368
207k
        Ok((
369
207k
            i,
370
207k
            MQTTSubackData {
371
207k
                message_id,
372
207k
                qoss: qoss.to_vec(),
373
207k
                properties,
374
207k
            },
375
207k
        ))
376
207k
    }
377
207k
}
378
379
#[inline]
380
290k
fn parse_unsubscribe(
381
290k
    protocol_version: u8,
382
290k
) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubscribeData> {
383
290k
    move |i: &[u8]| {
384
290k
        let (i, message_id) = be_u16(i)?;
385
290k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
386
290k
        let (i, topics) = many0(complete(parse_mqtt_string))(i)?;
387
290k
        Ok((
388
290k
            i,
389
290k
            MQTTUnsubscribeData {
390
290k
                message_id,
391
290k
                topics,
392
290k
                properties,
393
290k
            },
394
290k
        ))
395
290k
    }
396
290k
}
397
398
#[inline]
399
246k
fn parse_unsuback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubackData> {
400
246k
    move |i: &[u8]| {
401
246k
        let (i, message_id) = be_u16(i)?;
402
246k
        let (i, properties) = parse_properties(i, protocol_version == 5)?;
403
246k
        let (i, reason_codes) = many0(complete(be_u8))(i)?;
404
246k
        Ok((
405
246k
            i,
406
246k
            MQTTUnsubackData {
407
246k
                message_id,
408
246k
                properties,
409
246k
                reason_codes: Some(reason_codes),
410
246k
            },
411
246k
        ))
412
246k
    }
413
246k
}
414
415
#[inline]
416
1.03M
fn parse_disconnect(
417
1.03M
    remaining_len: usize,
418
1.03M
    protocol_version: u8,
419
1.03M
) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTDisconnectData> {
420
1.03M
    move |input: &[u8]| {
421
1.03M
        if protocol_version < 5 {
422
823k
            return Ok((
423
823k
                input,
424
823k
                MQTTDisconnectData {
425
823k
                    reason_code: None,
426
823k
                    properties: None,
427
823k
                },
428
823k
            ));
429
210k
        }
430
210k
        if remaining_len == 0 {
431
            // The Reason Code and Property Length can be omitted if the Reason
432
            // Code is 0x00 (Normal disconnection) and there are no Properties.
433
            // In this case the DISCONNECT has a Remaining Length of 0.
434
204k
            return Ok((
435
204k
                input,
436
204k
                MQTTDisconnectData {
437
204k
                    reason_code: Some(0),
438
204k
                    properties: None,
439
204k
                },
440
204k
            ));
441
5.35k
        }
442
5.35k
        match be_u8(input) {
443
5.35k
            Ok((rem, reason_code)) => {
444
                // We are checking for 1 because in that case we have a
445
                // header plus reason code, but no properties.
446
5.35k
                if remaining_len == 1 {
447
                    // no properties
448
3.67k
                    return Ok((
449
3.67k
                        rem,
450
3.67k
                        MQTTDisconnectData {
451
3.67k
                            reason_code: Some(0),
452
3.67k
                            properties: None,
453
3.67k
                        },
454
3.67k
                    ));
455
1.67k
                }
456
1.67k
                match parse_properties(rem, true) {
457
1.66k
                    Ok((rem, properties)) => {
458
1.66k
                        return Ok((
459
1.66k
                            rem,
460
1.66k
                            MQTTDisconnectData {
461
1.66k
                                reason_code: Some(reason_code),
462
1.66k
                                properties,
463
1.66k
                            },
464
1.66k
                        ));
465
                    }
466
18
                    Err(e) => return Err(e),
467
                }
468
            }
469
0
            Err(e) => return Err(e),
470
        }
471
1.03M
    }
472
1.03M
}
473
474
#[inline]
475
11.6k
fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> {
476
11.6k
    let (i, reason_code) = be_u8(i)?;
477
11.6k
    let (i, properties) = parse_properties(i, true)?;
478
11.0k
    Ok((
479
11.0k
        i,
480
11.0k
        MQTTAuthData {
481
11.0k
            reason_code,
482
11.0k
            properties,
483
11.0k
        },
484
11.0k
    ))
485
11.6k
}
486
487
#[inline]
488
33.5M
fn parse_remaining_message<'a>(
489
33.5M
    full: &'a [u8],
490
33.5M
    len: usize,
491
33.5M
    skiplen: usize,
492
33.5M
    header: FixedHeader,
493
33.5M
    message_type: MQTTTypeCode,
494
33.5M
    protocol_version: u8,
495
33.5M
) -> impl Fn(&'a [u8]) -> IResult<&'a [u8], MQTTMessage> {
496
33.5M
    move |input: &'a [u8]| {
497
33.5M
        match message_type {
498
39.4k
            MQTTTypeCode::CONNECT => match parse_connect(input) {
499
38.7k
                Ok((_rem, conn)) => {
500
38.7k
                    let msg = MQTTMessage {
501
38.7k
                        header,
502
38.7k
                        op: MQTTOperation::CONNECT(conn),
503
38.7k
                    };
504
38.7k
                    Ok((&full[skiplen + len..], msg))
505
                }
506
683
                Err(e) => Err(e),
507
            },
508
277k
            MQTTTypeCode::CONNACK => match parse_connack(protocol_version)(input) {
509
277k
                Ok((_rem, connack)) => {
510
277k
                    let msg = MQTTMessage {
511
277k
                        header,
512
277k
                        op: MQTTOperation::CONNACK(connack),
513
277k
                    };
514
277k
                    Ok((&full[skiplen + len..], msg))
515
                }
516
133
                Err(e) => Err(e),
517
            },
518
            MQTTTypeCode::PUBLISH => {
519
152k
                match parse_publish(protocol_version, header.qos_level > 0)(input) {
520
152k
                    Ok((_rem, publish)) => {
521
152k
                        let msg = MQTTMessage {
522
152k
                            header,
523
152k
                            op: MQTTOperation::PUBLISH(publish),
524
152k
                        };
525
152k
                        Ok((&full[skiplen + len..], msg))
526
                    }
527
180
                    Err(e) => Err(e),
528
                }
529
            }
530
            MQTTTypeCode::PUBACK
531
            | MQTTTypeCode::PUBREC
532
            | MQTTTypeCode::PUBREL
533
1.50M
            | MQTTTypeCode::PUBCOMP => match parse_msgidonly(protocol_version)(input) {
534
1.50M
                Ok((_rem, msgidonly)) => {
535
1.50M
                    let msg = MQTTMessage {
536
1.50M
                        header,
537
1.50M
                        op: match message_type {
538
97.3k
                            MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly),
539
1.27M
                            MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly),
540
24.7k
                            MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly),
541
110k
                            MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly),
542
0
                            _ => MQTTOperation::UNASSIGNED,
543
                        },
544
                    };
545
1.50M
                    Ok((&full[skiplen + len..], msg))
546
                }
547
328
                Err(e) => Err(e),
548
            },
549
191k
            MQTTTypeCode::SUBSCRIBE => match parse_subscribe(protocol_version)(input) {
550
190k
                Ok((_rem, subs)) => {
551
190k
                    let msg = MQTTMessage {
552
190k
                        header,
553
190k
                        op: MQTTOperation::SUBSCRIBE(subs),
554
190k
                    };
555
190k
                    Ok((&full[skiplen + len..], msg))
556
                }
557
261
                Err(e) => Err(e),
558
            },
559
207k
            MQTTTypeCode::SUBACK => match parse_suback(protocol_version)(input) {
560
207k
                Ok((_rem, suback)) => {
561
207k
                    let msg = MQTTMessage {
562
207k
                        header,
563
207k
                        op: MQTTOperation::SUBACK(suback),
564
207k
                    };
565
207k
                    Ok((&full[skiplen + len..], msg))
566
                }
567
52
                Err(e) => Err(e),
568
            },
569
290k
            MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(protocol_version)(input) {
570
290k
                Ok((_rem, unsub)) => {
571
290k
                    let msg = MQTTMessage {
572
290k
                        header,
573
290k
                        op: MQTTOperation::UNSUBSCRIBE(unsub),
574
290k
                    };
575
290k
                    Ok((&full[skiplen + len..], msg))
576
                }
577
38
                Err(e) => Err(e),
578
            },
579
246k
            MQTTTypeCode::UNSUBACK => match parse_unsuback(protocol_version)(input) {
580
246k
                Ok((_rem, unsuback)) => {
581
246k
                    let msg = MQTTMessage {
582
246k
                        header,
583
246k
                        op: MQTTOperation::UNSUBACK(unsuback),
584
246k
                    };
585
246k
                    Ok((&full[skiplen + len..], msg))
586
                }
587
28
                Err(e) => Err(e),
588
            },
589
            MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => {
590
1.53M
                let msg = MQTTMessage {
591
1.53M
                    header,
592
1.53M
                    op: match message_type {
593
1.43M
                        MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ,
594
102k
                        MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP,
595
0
                        _ => MQTTOperation::UNASSIGNED,
596
                    },
597
                };
598
1.53M
                Ok((&full[skiplen + len..], msg))
599
            }
600
1.03M
            MQTTTypeCode::DISCONNECT => match parse_disconnect(len, protocol_version)(input) {
601
1.03M
                Ok((_rem, disco)) => {
602
1.03M
                    let msg = MQTTMessage {
603
1.03M
                        header,
604
1.03M
                        op: MQTTOperation::DISCONNECT(disco),
605
1.03M
                    };
606
1.03M
                    Ok((&full[skiplen + len..], msg))
607
                }
608
18
                Err(e) => Err(e),
609
            },
610
11.6k
            MQTTTypeCode::AUTH => match parse_auth(input) {
611
11.0k
                Ok((_rem, auth)) => {
612
11.0k
                    let msg = MQTTMessage {
613
11.0k
                        header,
614
11.0k
                        op: MQTTOperation::AUTH(auth),
615
11.0k
                    };
616
11.0k
                    Ok((&full[skiplen + len..], msg))
617
                }
618
575
                Err(e) => Err(e),
619
            },
620
            // Unassigned message type code. Unlikely to happen with
621
            // regular traffic, might be an indication for broken or
622
            // crafted MQTT traffic.
623
            _ => {
624
28.0M
                let msg = MQTTMessage {
625
28.0M
                    header,
626
28.0M
                    op: MQTTOperation::UNASSIGNED,
627
28.0M
                };
628
28.0M
                return Ok((&full[skiplen + len..], msg));
629
            }
630
        }
631
33.5M
    }
632
33.5M
}
633
634
33.6M
pub fn parse_message(
635
33.6M
    input: &[u8],
636
33.6M
    protocol_version: u8,
637
33.6M
    max_msg_size: usize,
638
33.6M
) -> IResult<&[u8], MQTTMessage> {
639
    // Parse the fixed header first. This is identical across versions and can
640
    // be between 2 and 5 bytes long.
641
33.6M
    match parse_fixed_header(input) {
642
33.6M
        Ok((fullrem, header)) => {
643
33.6M
            let len = header.remaining_length as usize;
644
            // This is the length of the fixed header that we need to skip
645
            // before returning the remainder. It is the sum of the length
646
            // of the flag byte (1) and the length of the message length
647
            // varint.
648
33.6M
            let skiplen = input.len() - fullrem.len();
649
33.6M
            let message_type = header.message_type;
650
651
            // If the remaining length (message length) exceeds the specified
652
            // limit, we return a special truncation message type, containing
653
            // no parsed metadata but just the skipped length and the message
654
            // type.
655
33.6M
            if len > max_msg_size {
656
602
                let msg = MQTTMessage {
657
602
                    header,
658
602
                    op: MQTTOperation::TRUNCATED(MQTTTruncatedData {
659
602
                        original_message_type: message_type,
660
602
                        skipped_length: len + skiplen,
661
602
                    }),
662
602
                };
663
                // In this case we return the full input buffer, since this is
664
                // what the skipped_length value also refers to: header _and_
665
                // remaining length.
666
602
                return Ok((input, msg));
667
33.6M
            }
668
669
            // We have not exceeded the maximum length limit, but still do not
670
            // have enough data in the input buffer to handle the full
671
            // message. Signal this by returning an Incomplete IResult value.
672
33.6M
            if fullrem.len() < len {
673
78.8k
                return Err(Err::Incomplete(Needed::new(len - fullrem.len())));
674
33.5M
            }
675
676
            // Parse the contents of the buffer into a single message.
677
            // We reslice the remainder into the portion that we are interested
678
            // in, according to the length we just parsed. This helps with the
679
            // complete() parsers, where we would otherwise need to keep track
680
            // of the already parsed length.
681
33.5M
            let rem = &fullrem[..len];
682
683
            // Parse remaining message in buffer. We use complete() to ensure
684
            // we do not request additional content in case of incomplete
685
            // parsing, but raise an error instead as we should have all the
686
            // data asked for in the header.
687
33.5M
            return complete(parse_remaining_message(
688
33.5M
                input,
689
33.5M
                len,
690
33.5M
                skiplen,
691
33.5M
                header,
692
33.5M
                message_type,
693
33.5M
                protocol_version,
694
33.5M
            ))(rem);
695
        }
696
25.3k
        Err(err) => {
697
25.3k
            return Err(err);
698
        }
699
    }
700
33.6M
}
701
702
#[cfg(test)]
703
mod tests {
704
    use super::*;
705
    use nom7::error::ErrorKind;
706
707
    fn test_mqtt_parse_variable_fail(buf0: &[u8]) {
708
        let r0 = parse_mqtt_variable_integer(buf0);
709
        match r0 {
710
            Ok((_, _)) => {
711
                panic!("Result should not have been ok.");
712
            }
713
            Err(Err::Error(err)) => {
714
                assert_eq!(err.code, ErrorKind::Verify);
715
            }
716
            _ => {
717
                panic!("Result should be an error.");
718
            }
719
        }
720
    }
721
722
    fn test_mqtt_parse_variable_check(buf0: &[u8], expected: u32) {
723
        let r0 = parse_mqtt_variable_integer(buf0);
724
        match r0 {
725
            Ok((_, val)) => {
726
                assert_eq!(val, expected);
727
            }
728
            Err(_) => {
729
                panic!("Result should have been ok.");
730
            }
731
        }
732
    }
733
734
    #[test]
735
    fn test_mqtt_parse_variable_integer_largest_input() {
736
        test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0xFF]);
737
    }
738
739
    #[test]
740
    fn test_mqtt_parse_variable_integer_boundary() {
741
        test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0x80]);
742
    }
743
744
    #[test]
745
    fn test_mqtt_parse_variable_integer_largest_valid() {
746
        test_mqtt_parse_variable_check(&[0xFF, 0xFF, 0xFF, 0x7F], 268435455);
747
    }
748
749
    #[test]
750
    fn test_mqtt_parse_variable_integer_smallest_valid() {
751
        test_mqtt_parse_variable_check(&[0x0], 0);
752
    }
753
754
    #[test]
755
    fn test_parse_fixed_header() {
756
        let buf = [
757
            0x30, /* Header Flags: 0x30, Message Type: Publish Message, QoS Level: At most once delivery (Fire and Forget) */
758
            0xb7, 0x97, 0x02, /* Msg Len: 35767 */
759
            0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
760
            0x00, 0x01, 0xa0,
761
        ];
762
763
        let result = parse_fixed_header(&buf);
764
        match result {
765
            Ok((remainder, message)) => {
766
                assert_eq!(message.message_type, MQTTTypeCode::PUBLISH);
767
                assert!(!message.dup_flag);
768
                assert_eq!(message.qos_level, 0);
769
                assert!(!message.retain);
770
                assert_eq!(message.remaining_length, 35767);
771
                assert_eq!(remainder.len(), 17);
772
            }
773
            Err(Err::Incomplete(_)) => {
774
                panic!("Result should not have been incomplete.");
775
            }
776
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
777
                panic!("Result should not be an error: {:?}.", err);
778
            }
779
        }
780
    }
781
782
    #[test]
783
    fn test_parse_properties() {
784
        let buf = [
785
            0x03, 0x21, 0x00, 0x14, /* Properties */
786
            0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
787
            0x00, 0x01, 0xa0,
788
        ];
789
790
        let result = parse_properties(&buf, true);
791
        match result {
792
            Ok((remainder, message)) => {
793
                let res = message.unwrap();
794
                assert_eq!(res[0], MQTTProperty::RECEIVE_MAXIMUM(20));
795
                assert_eq!(remainder.len(), 17);
796
            }
797
            Err(Err::Incomplete(_)) => {
798
                panic!("Result should not have been incomplete.");
799
            }
800
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
801
                panic!("Result should not be an error: {:?}.", err);
802
            }
803
        }
804
    }
805
    #[test]
806
    fn test_parse_connect() {
807
        let buf = [
808
            0x00, 0x04, /* Protocol Name Length: 4 */
809
            0x4d, 0x51, 0x54, 0x54, /* Protocol Name: MQTT */
810
            0x05, /* Version: MQTT v5.0 (5) */
811
            0xc2, /*Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */
812
            0x00, 0x3c, /* Keep Alive: 60 */
813
            0x03, 0x21, 0x00, 0x14, /* Properties */
814
            0x00, 0x00, /* Client ID Length: 0 */
815
            0x00, 0x04, /* User Name Length: 4 */
816
            0x75, 0x73, 0x65, 0x72, /* User Name: user */
817
            0x00, 0x04, /* Password Length: 4 */
818
            0x71, 0x61, 0x71, 0x73, /* Password: pass */
819
        ];
820
821
        let result = parse_connect(&buf);
822
        match result {
823
            Ok((remainder, message)) => {
824
                assert_eq!(message.protocol_string, "MQTT");
825
                assert_eq!(message.protocol_version, 5);
826
                assert!(message.username_flag);
827
                assert!(message.password_flag);
828
                assert!(!message.will_retain);
829
                assert_eq!(message.will_qos, 0);
830
                assert!(!message.will_flag);
831
                assert!(message.clean_session);
832
                assert_eq!(message.keepalive, 60);
833
                assert_eq!(remainder.len(), 0);
834
            }
835
            Err(Err::Incomplete(_)) => {
836
                panic!("Result should not have been incomplete.");
837
            }
838
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
839
                panic!("Result should not be an error: {:?}.", err);
840
            }
841
        }
842
    }
843
844
    #[test]
845
    fn test_parse_connack() {
846
        let buf = [
847
            0x00, /* Acknowledge Flags: 0x00 (0000 000. = Reserved: Not set )(.... ...0 = Session Present: Not set) */
848
            0x00, /* Reason Code: Success (0) */
849
            0x2f, /* Total Length: 47 */
850
            0x22, /* ID: Topic Alias Maximum (0x22) */
851
            0x00, 0x0a, /* Value: 10 */
852
            0x12, /* ID: Assigned Client Identifier (0x12) */
853
            0x00, 0x29, /* Length: 41 */
854
            0x61, 0x75, 0x74, 0x6f, 0x2d, 0x31, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, 0x30, 0x2d,
855
            0x30, 0x38, 0x45, 0x33, 0x2d, 0x33, 0x42, 0x41, 0x31, 0x2d, 0x32, 0x45, 0x39, 0x37,
856
            0x2d, 0x45, 0x39, 0x41, 0x30, 0x42, 0x34, 0x30, 0x36, 0x34, 0x42, 0x46,
857
            0x35, /* 41 byte Value: auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5 */
858
        ];
859
        let client_identifier = "auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5";
860
861
        let result = parse_connack(5);
862
        let input = result(&buf);
863
        match input {
864
            Ok((remainder, message)) => {
865
                let props = message.properties.unwrap();
866
                assert_eq!(props[0], MQTTProperty::TOPIC_ALIAS_MAXIMUM(10));
867
                assert_eq!(
868
                    props[1],
869
                    MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(client_identifier.to_string())
870
                );
871
                assert_eq!(message.return_code, 0);
872
                assert!(!message.session_present);
873
                assert_eq!(remainder.len(), 0);
874
            }
875
876
            Err(Err::Incomplete(_)) => {
877
                panic!("Result should not have been incomplete.");
878
            }
879
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
880
                panic!("Result should not be an error: {:?}.", err);
881
            }
882
        }
883
    }
884
885
    #[test]
886
    fn test_parse_publish() {
887
        let buf = [
888
            0x00, 0x06, /* Topic Length: 6 */
889
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
890
            0x00, 0x01, /* Message Identifier: 1 */
891
            0x00, /* Properties 6 */
892
            0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
893
        ];
894
895
        let result = parse_publish(5, true);
896
        let input = result(&buf);
897
        match input {
898
            Ok((remainder, message)) => {
899
                let message_id = message.message_id.unwrap();
900
                assert_eq!(message.topic, "topicX");
901
                assert_eq!(message_id, 1);
902
                assert_eq!(remainder.len(), 13);
903
            }
904
905
            Err(Err::Incomplete(_)) => {
906
                panic!("Result should not have been incomplete.");
907
            }
908
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
909
                panic!("Result should not be an error: {:?}.", err);
910
            }
911
        }
912
    }
913
914
    #[test]
915
    fn test_parse_msgidonly_v3() {
916
        let buf = [
917
            0x00, 0x01, /* Message Identifier: 1 */
918
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34,
919
            0x33, 0x45, 0x38, 0x30,
920
        ];
921
922
        let result = parse_msgidonly(3);
923
        let input = result(&buf);
924
        match input {
925
            Ok((remainder, message)) => {
926
                assert_eq!(message.message_id, 1);
927
                assert_eq!(remainder.len(), 18);
928
            }
929
930
            Err(Err::Incomplete(_)) => {
931
                panic!("Result should not have been incomplete.");
932
            }
933
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
934
                panic!("Result should not be an error: {:?}.", err);
935
            }
936
        }
937
    }
938
939
    #[test]
940
    fn test_parse_msgidonly_v5() {
941
        let buf = [
942
            0x00, 0x01,   /* Message Identifier: 1 */
943
            0x00, /* Reason Code: 0 */
944
            0x00, /* Properties */
945
            0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
946
        ];
947
948
        let result = parse_msgidonly(5);
949
        let input = result(&buf);
950
        match input {
951
            Ok((remainder, message)) => {
952
                let reason_code = message.reason_code.unwrap();
953
                assert_eq!(message.message_id, 1);
954
                assert_eq!(reason_code, 0);
955
                assert_eq!(remainder.len(), 12);
956
            }
957
958
            Err(Err::Incomplete(_)) => {
959
                panic!("Result should not have been incomplete.");
960
            }
961
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
962
                panic!("Result should not be an error: {:?}.", err);
963
            }
964
        }
965
    }
966
967
    #[test]
968
    fn test_parse_subscribe() {
969
        let buf = [
970
            0x00, 0x01, /* Message Identifier: 1 */
971
            0x00, /* Properties 6 */
972
            0x00, 0x06, /* Topic Length: 6  */
973
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
974
            0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
975
            0x00, 0x06, /* Topic Length: 6  */
976
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x59, /* Topic: topicY */
977
            0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
978
            0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
979
        ];
980
981
        let result = parse_subscribe(5);
982
        let input = result(&buf);
983
        match input {
984
            Ok((remainder, message)) => {
985
                assert_eq!(message.topics[0].topic_name, "topicX");
986
                assert_eq!(message.topics[1].topic_name, "topicY");
987
                assert_eq!(message.topics[0].qos, 0);
988
                assert_eq!(message.message_id, 1);
989
                assert_eq!(remainder.len(), 12);
990
            }
991
992
            Err(Err::Incomplete(_)) => {
993
                panic!("Result should not have been incomplete.");
994
            }
995
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
996
                panic!("Result should not be an error: {:?}.", err);
997
            }
998
        }
999
    }
1000
    #[test]
1001
    fn test_parse_suback() {
1002
        let buf = [
1003
            0x00, 0x01, /* Message Identifier: 1 */
1004
            0x00, /* Properties 6 */
1005
            0x00, 0x00, /* Topic Length: 6  */
1006
        ];
1007
1008
        let result = parse_suback(5);
1009
        let input = result(&buf);
1010
        match input {
1011
            Ok((remainder, message)) => {
1012
                assert_eq!(message.qoss[0], 0);
1013
                assert_eq!(message.message_id, 1);
1014
                assert_eq!(remainder.len(), 3);
1015
            }
1016
1017
            Err(Err::Incomplete(_)) => {
1018
                panic!("Result should not have been incomplete.");
1019
            }
1020
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1021
                panic!("Result should not be an error: {:?}.", err);
1022
            }
1023
        }
1024
    }
1025
    #[test]
1026
    fn test_parse_unsubscribe() {
1027
        let buf = [
1028
            0x00, 0x01, /* Message Identifier: 1 */
1029
            0x00, /* Properties 6 */
1030
            0x00, 0x06, /* Topic Length: 6  */
1031
            0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
1032
            0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
1033
        ];
1034
1035
        let result = parse_unsubscribe(5);
1036
        let input = result(&buf);
1037
        match input {
1038
            Ok((remainder, message)) => {
1039
                assert_eq!(message.topics[0], "topicX");
1040
                assert_eq!(message.message_id, 1);
1041
                assert_eq!(remainder.len(), 1);
1042
            }
1043
1044
            Err(Err::Incomplete(_)) => {
1045
                panic!("Result should not have been incomplete.");
1046
            }
1047
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1048
                panic!("Result should not be an error: {:?}.", err);
1049
            }
1050
        }
1051
    }
1052
1053
    #[test]
1054
    fn test_parse_unsuback() {
1055
        let buf = [
1056
            0x00, 0x01, /* Message Identifier: 1 */
1057
            0x00, /* Properties 6 */
1058
            0x00, /* Reason Code */
1059
        ];
1060
1061
        let result = parse_unsuback(5);
1062
        let input = result(&buf);
1063
        match input {
1064
            Ok((remainder, message)) => {
1065
                let reason_codes = message.reason_codes.unwrap();
1066
                assert_eq!(reason_codes[0], 0);
1067
                assert_eq!(message.message_id, 1);
1068
                assert_eq!(remainder.len(), 0);
1069
            }
1070
1071
            Err(Err::Incomplete(_)) => {
1072
                panic!("Result should not have been incomplete.");
1073
            }
1074
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1075
                panic!("Result should not be an error: {:?}.", err);
1076
            }
1077
        }
1078
    }
1079
1080
    #[test]
1081
    fn test_parse_disconnect() {
1082
        let buf = [
1083
            0xe0, /* Reason: 0 */
1084
            0x00, /* Message Identifier: 1 */
1085
        ];
1086
1087
        let result = parse_disconnect(0, 5);
1088
        let input = result(&buf);
1089
        match input {
1090
            Ok((remainder, message)) => {
1091
                let reason_code = message.reason_code.unwrap();
1092
                assert_eq!(reason_code, 0);
1093
                assert_eq!(remainder.len(), 2);
1094
            }
1095
1096
            Err(Err::Incomplete(_)) => {
1097
                panic!("Result should not have been incomplete.");
1098
            }
1099
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1100
                panic!("Result should not be an error: {:?}.", err);
1101
            }
1102
        }
1103
    }
1104
1105
    #[test]
1106
    fn test_parse_message() {
1107
        let buf = [
1108
            0x10, /* Message Identifier: 1 */
1109
            0x2f, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05,
1110
            0xc2, /* Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */
1111
            0x00, 0x3c, 0x03, 0x21, 0x00, 0x14, /* Properties */
1112
            0x00, 0x13, 0x6d, 0x79, 0x76, 0x6f, 0x69, 0x63, 0x65, 0x69, 0x73, 0x6d, 0x79, 0x70,
1113
            0x61, 0x73, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x00, 0x04, 0x75, 0x73, 0x65, 0x72, 0x00,
1114
            0x04, 0x70, 0x61, 0x73, 0x73,
1115
        ];
1116
1117
        let result = parse_message(&buf, 5, 40);
1118
        match result {
1119
            Ok((remainder, message)) => {
1120
                assert_eq!(message.header.message_type, MQTTTypeCode::CONNECT);
1121
                assert!(!message.header.dup_flag);
1122
                assert_eq!(message.header.qos_level, 0);
1123
                assert!(!message.header.retain);
1124
                assert_eq!(remainder.len(), 49);
1125
            }
1126
1127
            Err(Err::Incomplete(_)) => {
1128
                panic!("Result should not have been incomplete.");
1129
            }
1130
            Err(Err::Error(err)) | Err(Err::Failure(err)) => {
1131
                panic!("Result should not be an error: {:?}.", err);
1132
            }
1133
        }
1134
    }
1135
}