Coverage Report

Created: 2025-11-02 06:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ndpi/src/lib/protocols/mqtt.c
Line
Count
Source
1
/*
2
 * mqtt.c
3
 *
4
 * Copyright (C) 2016 Sorin Zamfir <sorin.zamfir@yahoo.com>
5
 *
6
 * This file is part of nDPI, an open source deep packet inspection
7
 * library based on the OpenDPI and PACE technology by ipoque GmbH
8
 *
9
 * nDPI is free software: you can redistribute it and/or modify
10
 * it under the terms of the GNU Lesser General Public License as published by
11
 * the Free Software Foundation, either version 3 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * nDPI is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU Lesser General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public License
20
 * along with nDPI.  If not, see <http://www.gnu.org/licenses/>.
21
 *
22
 */
23
24
#include "ndpi_protocol_ids.h"
25
26
#define NDPI_CURRENT_PROTO NDPI_PROTOCOL_MQTT
27
28
#include "ndpi_api.h"
29
#include "ndpi_private.h"
30
31
32
/**
33
 * The type of control messages in mqtt version 3.1.1
34
 * see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1
35
 */
36
enum MQTT_PACKET_TYPES {
37
  CONNECT = 1,
38
  CONNACK = 2,
39
  PUBLISH = 3,
40
  PUBACK = 4,
41
  PUBREC = 5,
42
  PUBREL = 6,
43
  PUBCOMP = 7,
44
  SUBSCRIBE = 8,
45
  SUBACK = 9,
46
  UNSUBSCRIBE = 10,
47
  UNSUBACK = 11,
48
  PINGREQ = 12,
49
  PINGRESP = 13,
50
  DISCONNECT = 14
51
};
52
53
/**
54
 * Entry point when protocol is identified.
55
 */
56
static void ndpi_int_mqtt_add_connection (struct ndpi_detection_module_struct *ndpi_struct,
57
    struct ndpi_flow_struct *flow)
58
1.58k
{
59
1.58k
  ndpi_set_detected_protocol(ndpi_struct,flow,NDPI_PROTOCOL_MQTT,NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
60
1.58k
  NDPI_LOG_INFO(ndpi_struct, "found Mqtt\n");
61
1.58k
}
62
63
static int64_t get_var_int(const unsigned char *buf, int buf_len, u_int8_t *num_bytes)
64
2.01M
{
65
2.01M
  int i, multiplier = 1;
66
2.01M
  u_int32_t value = 0;
67
2.01M
  u_int8_t encodedByte;
68
69
2.01M
  *num_bytes= 0;
70
4.03M
  for (i = 0; i < 4; i++) {
71
3.61M
    if (i >= buf_len)
72
36.2k
      return -1;
73
3.57M
    (*num_bytes)++;
74
3.57M
    encodedByte = buf[i];
75
3.57M
    value += ((encodedByte & 127) * multiplier);
76
3.57M
    if ((encodedByte & 128) == 0)
77
1.55M
      break;
78
2.02M
    multiplier *= 128;
79
2.02M
  }
80
1.97M
  return value;
81
2.01M
}
82
83
/**
84
 * Dissector function that searches Mqtt headers
85
 */
86
static void ndpi_search_mqtt(struct ndpi_detection_module_struct *ndpi_struct,
87
           struct ndpi_flow_struct *flow)
88
2.29M
{
89
2.29M
  u_int8_t pt,flags, rl_len;
90
2.29M
  int64_t rl;
91
92
2.29M
  NDPI_LOG_DBG(ndpi_struct, "search Mqtt\n");
93
2.29M
  struct ndpi_packet_struct *packet = &ndpi_struct->packet;
94
2.29M
  if (flow->packet_counter > 10) {
95
59
    NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt .. mandatory header not found!\n");
96
59
    NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
97
59
    return;
98
59
  }
99
100
2.29M
  NDPI_LOG_DBG2(ndpi_struct, "====>>>> Mqtt header: %4x%4x%4x%4x [len: %u]\n",
101
2.29M
          packet->payload_packet_len > 0 ? packet->payload[0] : '.',
102
2.29M
          packet->payload_packet_len > 1 ? packet->payload[1] : '.',
103
2.29M
          packet->payload_packet_len > 2 ? packet->payload[2] : '.',
104
2.29M
          packet->payload_packet_len > 3 ? packet->payload[3] : '.',
105
2.29M
          packet->payload_packet_len);
106
2.29M
  if (packet->payload_packet_len < 2) {
107
280k
    NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt .. mandatory header not found!\n");
108
280k
    NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
109
280k
    return;
110
280k
  }
111
  // we extract the remaining length
112
2.01M
  rl = get_var_int(&packet->payload[1], packet->payload_packet_len - 1, &rl_len);
113
2.01M
  if (rl < 0) {
114
36.2k
    NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt .. invalid length!\n");
115
36.2k
    NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
116
36.2k
    return;
117
36.2k
  }
118
1.97M
  NDPI_LOG_DBG(ndpi_struct, "Mqtt: msg_len %d\n", (unsigned long long)rl);
119
1.97M
  if (packet->payload_packet_len != rl + 1 + rl_len) {
120
1.94M
    NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt .. maximum packet size exceeded!\n");
121
1.94M
    NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
122
1.94M
    return;
123
1.94M
  }
124
  // we extract the packet type
125
27.9k
  pt = (u_int8_t) ((packet->payload[0] & 0xF0) >> 4);
126
27.9k
  NDPI_LOG_DBG2(ndpi_struct,"====>>>> Mqtt packet type: [%d]\n",pt);
127
27.9k
  if ((pt == 0) || (pt == 15)) {
128
8.79k
    NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt .. invalid packet type!\n");
129
8.79k
    NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
130
8.79k
    return;
131
8.79k
  }
132
  // we extract the flags
133
19.1k
  flags = (u_int8_t) (packet->payload[0] & 0x0F);
134
19.1k
  NDPI_LOG_DBG2(ndpi_struct,"====>>>> Mqtt flags type: [%d]\n",flags);
135
  // first stage verification
136
19.1k
  if (((pt == CONNECT) || (pt == CONNACK) || (pt == PUBACK) || (pt == PUBREC) ||
137
12.1k
          (pt == PUBCOMP) || (pt == SUBACK) || (pt == UNSUBACK) || (pt == PINGREQ) ||
138
11.5k
          (pt == PINGRESP) || (pt == DISCONNECT)) && (flags > 0)) {
139
9.77k
    NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid Packet-Flag combination flag!=0\n");
140
9.77k
    NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
141
9.77k
    return;
142
9.77k
  }
143
9.36k
  if (((pt == PUBREL) || (pt == SUBSCRIBE) || (pt == UNSUBSCRIBE)) && (flags != 2)) {
144
4.39k
    NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid Packet-Flag combination flag!=2\n");
145
4.39k
    NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
146
4.39k
    return;
147
4.39k
  }
148
4.96k
  NDPI_LOG_DBG2(ndpi_struct,"====>>>> Passed first stage of identification\n");
149
  // second stage verification (no payload, just variable headers)
150
4.96k
  if ((pt == CONNACK) || (pt == PUBACK) || (pt == PUBREL) ||
151
4.20k
      (pt == PUBREC) || (pt == PUBCOMP) || (pt == UNSUBACK)) {
152
1.17k
    if (packet->payload_packet_len != 4) { // these packets are always 4 bytes long
153
852
      NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid Packet-Length < 4 \n");
154
852
      NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
155
852
      return;
156
852
    } else {
157
325
      NDPI_LOG_INFO(ndpi_struct, "found Mqtt CONNACK/PUBACK/PUBREL/PUBREC/PUBCOMP/UNSUBACK\n");
158
325
      ndpi_int_mqtt_add_connection(ndpi_struct,flow);
159
325
      return;
160
325
    }
161
1.17k
  }
162
3.79k
  if ((pt == PINGREQ) || (pt == PINGRESP) || (pt == DISCONNECT)) {
163
437
    if (packet->payload_packet_len != 2) { // these packets are always 2 bytes long
164
134
      NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid Packet-Length <2 \n");
165
134
      NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
166
134
      return;
167
303
    } else {
168
303
      NDPI_LOG_INFO(ndpi_struct, "found Mqtt PING/PINGRESP/DISCONNECT\n");
169
303
      ndpi_int_mqtt_add_connection(ndpi_struct,flow);
170
303
      return;
171
303
    }
172
437
  }
173
3.35k
  NDPI_LOG_DBG2(ndpi_struct,"====>>>> Passed second stage of identification\n");
174
  // third stage verification (payload)
175
3.35k
  if (pt == CONNECT) {
176
148
    NDPI_LOG_DBG(ndpi_struct, "found Mqtt CONNECT\n");
177
148
    ndpi_int_mqtt_add_connection(ndpi_struct,flow);
178
148
    return;
179
148
  }
180
3.20k
  if (pt == PUBLISH) {
181
    // payload CAN be zero bytes length (section 3.3.3 of MQTT standard)
182
2.59k
    u_int8_t qos = (u_int8_t) (flags & 0x06) >> 1;
183
2.59k
    u_int8_t dup = (u_int8_t) (flags & 0x08) >> 3;
184
2.59k
    if (qos > 2) { // qos values possible are 0,1,2
185
393
      NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid PUBLISH qos\n");
186
393
      NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
187
393
      return;
188
393
    }
189
2.20k
    if (qos == 0) {
190
735
      if (dup != 0) {
191
244
        NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid PUBLISH qos0 and dup combination\n");
192
244
        NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
193
244
        return;
194
244
      }
195
491
      if (packet->payload_packet_len < 5) { // at least topic (3Bytes + 2Bytes fixed header)
196
336
        NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid PUBLISH qos0 size\n");
197
336
        NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
198
336
        return;
199
336
      }
200
491
    }
201
1.62k
    if ((qos == 1) || (qos == 2)) {
202
1.46k
      if (packet->payload_packet_len < 7 ) { // at least topic + pkt identifier (3Bytes + 2Bytes + 2Bytes fixed header)
203
1.01k
        NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid PUBLISH qos1&2\n");
204
1.01k
        NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
205
1.01k
        return;
206
1.01k
      }
207
1.46k
    }
208
608
    NDPI_LOG_INFO(ndpi_struct, "found Mqtt PUBLISH\n");
209
608
    ndpi_int_mqtt_add_connection(ndpi_struct,flow);
210
608
    return;
211
1.62k
  }
212
613
  if (pt == SUBSCRIBE) {
213
218
    if (packet->payload_packet_len < 8) { // at least one topic+filter is required in the payload
214
139
      NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid SUBSCRIBE\n");
215
139
      NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
216
139
      return;
217
139
    } else {
218
79
      NDPI_LOG_INFO(ndpi_struct, "found Mqtt SUBSCRIBE\n");
219
79
      ndpi_int_mqtt_add_connection(ndpi_struct,flow);
220
79
      return;
221
79
    }
222
218
  }
223
395
  if (pt == SUBACK ) {
224
193
    if (packet->payload_packet_len <5 ) { // must have at least a response code
225
129
      NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid SUBACK\n");
226
129
      NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
227
129
      return;
228
129
    } else {
229
64
      NDPI_LOG_INFO(ndpi_struct, "found Mqtt SUBACK\n");
230
64
      ndpi_int_mqtt_add_connection(ndpi_struct,flow);
231
64
      return;
232
64
    }
233
193
  }
234
202
  if (pt == UNSUBSCRIBE) {
235
202
    if (packet->payload_packet_len < 7) { // at least a topic
236
146
      NDPI_LOG_DBG(ndpi_struct, "Excluding Mqtt invalid UNSUBSCRIBE\n");
237
146
      NDPI_EXCLUDE_DISSECTOR(ndpi_struct, flow);
238
146
      return;
239
146
    } else {
240
56
      NDPI_LOG_INFO(ndpi_struct, "found Mqtt UNSUBSCRIBE\n");
241
56
      ndpi_int_mqtt_add_connection(ndpi_struct,flow);
242
56
      return;
243
56
    }
244
202
  }
245
  /* We already checked every possible values of pt: we are never here */
246
202
}
247
/**
248
 * Entry point for the ndpi library
249
 */
250
void init_mqtt_dissector (struct ndpi_detection_module_struct *ndpi_struct)
251
8.03k
{
252
8.03k
  register_dissector("MQTT", ndpi_struct,
253
8.03k
                     ndpi_search_mqtt,
254
8.03k
                     NDPI_SELECTION_BITMASK_PROTOCOL_V4_V6_TCP_WITH_PAYLOAD_WITHOUT_RETRANSMISSION,
255
8.03k
                      1, NDPI_PROTOCOL_MQTT);
256
8.03k
}
257
258