Coverage Report

Created: 2023-06-07 06:10

/src/mosquitto/lib/packet_mosq.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <string.h>
24
25
#ifdef WITH_BROKER
26
#  include "mosquitto_broker_internal.h"
27
#  if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
28
#    include <libwebsockets.h>
29
#  endif
30
#else
31
#  include "read_handle.h"
32
#endif
33
34
#include "callbacks.h"
35
#include "memory_mosq.h"
36
#include "mqtt_protocol.h"
37
#include "net_mosq.h"
38
#include "packet_mosq.h"
39
#include "read_handle.h"
40
#include "util_mosq.h"
41
#ifdef WITH_BROKER
42
#  include "sys_tree.h"
43
#  include "send_mosq.h"
44
#else
45
#  define metrics__int_inc(stat, val)
46
#  define metrics__int_dec(stat, val)
47
#endif
48
49
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
50
1.75k
{
51
1.75k
  uint8_t remaining_bytes[5], byte;
52
1.75k
  int8_t remaining_count;
53
1.75k
  uint32_t packet_length;
54
1.75k
  uint32_t remaining_length_stored;
55
1.75k
  int i;
56
57
1.75k
  assert(packet);
58
59
1.75k
  remaining_length_stored = remaining_length;
60
1.75k
  remaining_count = 0;
61
1.79k
  do{
62
1.79k
    byte = remaining_length % 128;
63
1.79k
    remaining_length = remaining_length / 128;
64
    /* If there are more digits to encode, set the top bit of this digit */
65
1.79k
    if(remaining_length > 0){
66
38
      byte = byte | 0x80;
67
38
    }
68
1.79k
    remaining_bytes[remaining_count] = byte;
69
1.79k
    remaining_count++;
70
1.79k
  }while(remaining_length > 0 && remaining_count < 5);
71
1.75k
  if(remaining_count == 5) return MOSQ_ERR_PAYLOAD_SIZE;
72
73
1.75k
  packet_length = remaining_length_stored + 1 + (uint8_t)remaining_count;
74
1.75k
  (*packet) = mosquitto__malloc(sizeof(struct mosquitto__packet) + packet_length + WS_PACKET_OFFSET);
75
1.75k
  if((*packet) == NULL) return MOSQ_ERR_NOMEM;
76
77
  /* Clear memory for everything but the payload - that will be set to valid
78
   * values when the actual payload is copied in. */
79
1.75k
  memset((*packet), 0, sizeof(struct mosquitto__packet));
80
1.75k
  (*packet)->command = command;
81
1.75k
  (*packet)->remaining_length = remaining_length_stored;
82
1.75k
  (*packet)->remaining_count = remaining_count;
83
1.75k
  (*packet)->packet_length = packet_length + WS_PACKET_OFFSET;
84
85
1.75k
  (*packet)->payload[WS_PACKET_OFFSET] = (*packet)->command;
86
3.55k
  for(i=0; i<(*packet)->remaining_count; i++){
87
1.79k
    (*packet)->payload[WS_PACKET_OFFSET+i+1] = remaining_bytes[i];
88
1.79k
  }
89
1.75k
  (*packet)->pos = WS_PACKET_OFFSET + 1U + (uint8_t)(*packet)->remaining_count;
90
91
1.75k
  return MOSQ_ERR_SUCCESS;
92
1.75k
}
93
94
void packet__cleanup(struct mosquitto__packet_in *packet)
95
6.29k
{
96
6.29k
  if(!packet) return;
97
98
  /* Free data and reset values */
99
6.29k
  packet->command = 0;
100
6.29k
  packet->remaining_count = 0;
101
6.29k
  packet->remaining_mult = 1;
102
6.29k
  packet->remaining_length = 0;
103
6.29k
  mosquitto__FREE(packet->payload);
104
6.29k
  packet->to_process = 0;
105
6.29k
  packet->pos = 0;
106
6.29k
}
107
108
109
void packet__cleanup_all_no_locks(struct mosquitto *mosq)
110
0
{
111
0
  struct mosquitto__packet *packet;
112
113
  /* Out packet cleanup */
114
0
  while(mosq->out_packet){
115
0
    packet = mosq->out_packet;
116
    /* Free data and reset values */
117
0
    mosq->out_packet = mosq->out_packet->next;
118
119
0
    mosquitto__FREE(packet);
120
0
  }
121
0
  metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count);
122
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes);
123
0
  mosq->out_packet_count = 0;
124
0
  mosq->out_packet_bytes = 0;
125
0
  mosq->out_packet_last = NULL;
126
127
0
  packet__cleanup(&mosq->in_packet);
128
0
}
129
130
void packet__cleanup_all(struct mosquitto *mosq)
131
0
{
132
0
  pthread_mutex_lock(&mosq->out_packet_mutex);
133
0
  packet__cleanup_all_no_locks(mosq);
134
0
  pthread_mutex_unlock(&mosq->out_packet_mutex);
135
0
}
136
137
138
static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packet *packet)
139
1.75k
{
140
1.75k
  pthread_mutex_lock(&mosq->out_packet_mutex);
141
1.75k
  if(mosq->out_packet){
142
18
    mosq->out_packet_last->next = packet;
143
1.73k
  }else{
144
1.73k
    mosq->out_packet = packet;
145
1.73k
  }
146
1.75k
  mosq->out_packet_last = packet;
147
1.75k
  mosq->out_packet_count++;
148
1.75k
  mosq->out_packet_bytes += packet->packet_length;
149
1.75k
  metrics__int_inc(mosq_gauge_out_packets, 1);
150
1.75k
  metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length);
151
1.75k
  pthread_mutex_unlock(&mosq->out_packet_mutex);
152
1.75k
}
153
154
155
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
156
1.75k
{
157
#ifndef WITH_BROKER
158
  char sockpair_data = 0;
159
#endif
160
1.75k
  assert(mosq);
161
1.75k
  assert(packet);
162
163
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
164
  if(mosq->wsi){
165
    packet->next = NULL;
166
    packet->pos = WS_PACKET_OFFSET;
167
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
168
169
    packet__queue_append(mosq, packet);
170
171
    lws_callback_on_writable(mosq->wsi);
172
    return MOSQ_ERR_SUCCESS;
173
  }else
174
#elif defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
175
1.75k
  if(mosq->transport == mosq_t_ws){
176
0
    ws__prepare_packet(mosq, packet);
177
0
  }else
178
1.75k
#endif
179
1.75k
  {
180
    /* Normal TCP */
181
1.75k
    packet->next = NULL;
182
1.75k
    packet->pos = WS_PACKET_OFFSET;
183
1.75k
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
184
1.75k
  }
185
186
1.75k
  packet__queue_append(mosq, packet);
187
188
1.75k
#ifdef WITH_BROKER
189
1.75k
  return packet__write(mosq);
190
#else
191
  /* Write a single byte to sockpairW (connected to sockpairR) to break out
192
   * of select() if in threaded mode. */
193
  if(mosq->sockpairW != INVALID_SOCKET){
194
#  ifndef WIN32
195
    if(write(mosq->sockpairW, &sockpair_data, 1)){
196
    }
197
#  else
198
    send(mosq->sockpairW, &sockpair_data, 1, 0);
199
#  endif
200
  }
201
202
  if(mosq->callback_depth == 0 && mosq->threaded == mosq_ts_none){
203
    return packet__write(mosq);
204
  }else{
205
    return MOSQ_ERR_SUCCESS;
206
  }
207
#endif
208
1.75k
}
209
210
211
int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
212
187
{
213
187
  uint32_t len;
214
215
187
  if(mosq->maximum_packet_size == 0) return MOSQ_ERR_SUCCESS;
216
217
66
  len = remaining_length + packet__varint_bytes(remaining_length);
218
66
  if(len > mosq->maximum_packet_size){
219
2
    return MOSQ_ERR_OVERSIZE_PACKET;
220
64
  }else{
221
64
    return MOSQ_ERR_SUCCESS;
222
64
  }
223
66
}
224
225
struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
226
0
{
227
0
  struct mosquitto__packet *packet = NULL;
228
229
0
  pthread_mutex_lock(&mosq->out_packet_mutex);
230
0
  if(mosq->out_packet){
231
0
    mosq->out_packet_count--;
232
0
    mosq->out_packet_bytes -= mosq->out_packet->packet_length;
233
0
    metrics__int_dec(mosq_gauge_out_packets, 1);
234
0
    metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length);
235
236
0
    mosq->out_packet = mosq->out_packet->next;
237
0
    if(!mosq->out_packet){
238
0
      mosq->out_packet_last = NULL;
239
0
    }
240
0
    packet = mosq->out_packet;
241
0
  }
242
0
  pthread_mutex_unlock(&mosq->out_packet_mutex);
243
244
0
  return packet;
245
0
}
246
247
248
int packet__write(struct mosquitto *mosq)
249
1.75k
{
250
1.75k
  ssize_t write_length;
251
1.75k
  struct mosquitto__packet *packet, *next_packet;
252
1.75k
  enum mosquitto_client_state state;
253
254
1.75k
  if(!mosq) return MOSQ_ERR_INVAL;
255
1.75k
  if(!net__is_connected(mosq)){
256
1.75k
    return MOSQ_ERR_NO_CONN;
257
1.75k
  }
258
259
0
  pthread_mutex_lock(&mosq->out_packet_mutex);
260
0
  packet = mosq->out_packet;
261
0
  pthread_mutex_unlock(&mosq->out_packet_mutex);
262
263
0
  if(packet == NULL){
264
0
    return MOSQ_ERR_SUCCESS;
265
0
  }
266
267
0
#ifdef WITH_BROKER
268
0
  mux__add_out(mosq);
269
0
#endif
270
271
0
  state = mosquitto__get_state(mosq);
272
0
  if(state == mosq_cs_connect_pending){
273
0
    return MOSQ_ERR_SUCCESS;
274
0
  }
275
276
0
  while(packet){
277
0
    while(packet->to_process > 0){
278
0
      write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
279
0
      if(write_length > 0){
280
0
        metrics__int_inc(mosq_counter_bytes_sent, write_length);
281
0
        packet->to_process -= (uint32_t)write_length;
282
0
        packet->pos += (uint32_t)write_length;
283
0
      }else{
284
#ifdef WIN32
285
        errno = WSAGetLastError();
286
#endif
287
0
        if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK
288
#ifdef WIN32
289
            || errno == WSAENOTCONN
290
#endif
291
0
            ){
292
0
          return MOSQ_ERR_SUCCESS;
293
0
        }else{
294
0
          switch(errno){
295
0
            case COMPAT_ECONNRESET:
296
0
              return MOSQ_ERR_CONN_LOST;
297
0
            case COMPAT_EINTR:
298
0
              return MOSQ_ERR_SUCCESS;
299
0
            default:
300
0
              return MOSQ_ERR_ERRNO;
301
0
          }
302
0
        }
303
0
      }
304
0
    }
305
306
0
    metrics__int_inc(mosq_counter_messages_sent, 1);
307
0
    if(((packet->command)&0xF6) == CMD_PUBLISH){
308
#ifndef WITH_BROKER
309
      callback__on_publish(mosq, packet->mid, 0, NULL);
310
    }else if(((packet->command)&0xF0) == CMD_DISCONNECT){
311
      do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL);
312
      return MOSQ_ERR_SUCCESS;
313
#endif
314
0
    }
315
316
0
    next_packet = packet__get_next_out(mosq);
317
0
    mosquitto__FREE(packet);
318
0
    packet = next_packet;
319
320
0
#ifdef WITH_BROKER
321
0
    mosq->next_msg_out = db.now_s + mosq->keepalive;
322
#else
323
    pthread_mutex_lock(&mosq->msgtime_mutex);
324
    mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
325
    pthread_mutex_unlock(&mosq->msgtime_mutex);
326
#endif
327
0
  }
328
0
#ifdef WITH_BROKER
329
0
  if (mosq->out_packet == NULL) {
330
0
    mux__remove_out(mosq);
331
0
  }
332
0
#endif
333
0
  return MOSQ_ERR_SUCCESS;
334
0
}
335
336
337
int packet__read(struct mosquitto *mosq)
338
0
{
339
0
  uint8_t byte;
340
0
  ssize_t read_length;
341
0
  int rc = 0;
342
0
  enum mosquitto_client_state state;
343
0
  ssize_t (*local__read)(struct mosquitto *, void *, size_t);
344
345
0
  if(!mosq){
346
0
    return MOSQ_ERR_INVAL;
347
0
  }
348
0
  if(!net__is_connected(mosq)){
349
0
    return MOSQ_ERR_NO_CONN;
350
0
  }
351
352
0
  state = mosquitto__get_state(mosq);
353
0
  if(state == mosq_cs_connect_pending){
354
0
    return MOSQ_ERR_SUCCESS;
355
0
  }
356
0
#if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
357
0
  if(mosq->transport == mosq_t_ws){
358
0
    local__read = net__read_ws;
359
0
  }else
360
0
#endif
361
0
  {
362
0
    local__read = net__read;
363
0
  }
364
365
  /* This gets called if pselect() indicates that there is network data
366
   * available - ie. at least one byte.  What we do depends on what data we
367
   * already have.
368
   * If we've not got a command, attempt to read one and save it. This should
369
   * always work because it's only a single byte.
370
   * Then try to read the remaining length. This may fail because it is may
371
   * be more than one byte - will need to save data pending next read if it
372
   * does fail.
373
   * Then try to read the remaining payload, where 'payload' here means the
374
   * combined variable header and actual payload. This is the most likely to
375
   * fail due to longer length, so save current data and current position.
376
   * After all data is read, send to mosquitto__handle_packet() to deal with.
377
   * Finally, free the memory and reset everything to starting conditions.
378
   */
379
0
  if(!mosq->in_packet.command){
380
0
    read_length = local__read(mosq, &byte, 1);
381
0
    if(read_length == 1){
382
0
      mosq->in_packet.command = byte;
383
0
#ifdef WITH_BROKER
384
0
      metrics__int_inc(mosq_counter_bytes_received, 1);
385
      /* Clients must send CONNECT as their first command. */
386
0
      if(!(mosq->bridge) && state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){
387
0
        return MOSQ_ERR_PROTOCOL;
388
0
      }
389
0
#endif
390
0
    }else{
391
0
      if(read_length == 0){
392
0
        return MOSQ_ERR_CONN_LOST; /* EOF */
393
0
      }
394
#ifdef WIN32
395
      errno = WSAGetLastError();
396
#endif
397
0
      if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
398
0
        return MOSQ_ERR_SUCCESS;
399
0
      }else{
400
0
        switch(errno){
401
0
          case COMPAT_ECONNRESET:
402
0
            return MOSQ_ERR_CONN_LOST;
403
0
          case COMPAT_EINTR:
404
0
            return MOSQ_ERR_SUCCESS;
405
0
          default:
406
0
            return MOSQ_ERR_ERRNO;
407
0
        }
408
0
      }
409
0
    }
410
0
  }
411
  /* remaining_count is the number of bytes that the remaining_length
412
   * parameter occupied in this incoming packet. We don't use it here as such
413
   * (it is used when allocating an outgoing packet), but we must be able to
414
   * determine whether all of the remaining_length parameter has been read.
415
   * remaining_count has three states here:
416
   *   0 means that we haven't read any remaining_length bytes
417
   *   <0 means we have read some remaining_length bytes but haven't finished
418
   *   >0 means we have finished reading the remaining_length bytes.
419
   */
420
0
  if(mosq->in_packet.remaining_count <= 0){
421
0
    do{
422
0
      read_length = local__read(mosq, &byte, 1);
423
0
      if(read_length == 1){
424
0
        mosq->in_packet.remaining_count--;
425
        /* Max 4 bytes length for remaining length as defined by protocol.
426
         * Anything more likely means a broken/malicious client.
427
         */
428
0
        if(mosq->in_packet.remaining_count < -4){
429
0
          return MOSQ_ERR_MALFORMED_PACKET;
430
0
        }
431
432
0
        metrics__int_inc(mosq_counter_bytes_received, 1);
433
0
        mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
434
0
        mosq->in_packet.remaining_mult *= 128;
435
0
      }else{
436
0
        if(read_length == 0){
437
0
          return MOSQ_ERR_CONN_LOST; /* EOF */
438
0
        }
439
#ifdef WIN32
440
        errno = WSAGetLastError();
441
#endif
442
0
        if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
443
0
          return MOSQ_ERR_SUCCESS;
444
0
        }else{
445
0
          switch(errno){
446
0
            case COMPAT_ECONNRESET:
447
0
              return MOSQ_ERR_CONN_LOST;
448
0
            case COMPAT_EINTR:
449
0
              return MOSQ_ERR_SUCCESS;
450
0
            default:
451
0
              return MOSQ_ERR_ERRNO;
452
0
          }
453
0
        }
454
0
      }
455
0
    }while((byte & 128) != 0);
456
    /* We have finished reading remaining_length, so make remaining_count
457
     * positive. */
458
0
    mosq->in_packet.remaining_count = (int8_t)(mosq->in_packet.remaining_count * -1);
459
460
0
#ifdef WITH_BROKER
461
0
    switch(mosq->in_packet.command & 0xF0){
462
0
      case CMD_CONNECT:
463
0
        if(mosq->in_packet.remaining_length > 100000){ /* Arbitrary limit, make configurable */
464
0
          return MOSQ_ERR_MALFORMED_PACKET;
465
0
        }
466
0
        break;
467
468
0
      case CMD_PUBACK:
469
0
      case CMD_PUBREC:
470
0
      case CMD_PUBREL:
471
0
      case CMD_PUBCOMP:
472
0
      case CMD_UNSUBACK:
473
0
        if(mosq->protocol != mosq_p_mqtt5 && mosq->in_packet.remaining_length != 2){
474
0
          return MOSQ_ERR_MALFORMED_PACKET;
475
0
        }
476
0
        break;
477
478
0
      case CMD_PINGREQ:
479
0
      case CMD_PINGRESP:
480
0
        if(mosq->in_packet.remaining_length != 0){
481
0
          return MOSQ_ERR_MALFORMED_PACKET;
482
0
        }
483
0
        break;
484
485
0
      case CMD_DISCONNECT:
486
0
        if(mosq->protocol != mosq_p_mqtt5 && mosq->in_packet.remaining_length != 0){
487
0
          return MOSQ_ERR_MALFORMED_PACKET;
488
0
        }
489
0
        break;
490
0
    }
491
492
0
    if(db.config->max_packet_size > 0 && mosq->in_packet.remaining_length+1 > db.config->max_packet_size){
493
0
      if(mosq->protocol == mosq_p_mqtt5){
494
0
        send__disconnect(mosq, MQTT_RC_PACKET_TOO_LARGE, NULL);
495
0
      }
496
0
      return MOSQ_ERR_OVERSIZE_PACKET;
497
0
    }
498
#else
499
    /* FIXME - client case for incoming message received from broker too large */
500
#endif
501
0
    if(mosq->in_packet.remaining_length > 0){
502
0
      mosq->in_packet.payload = mosquitto__malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
503
0
      if(!mosq->in_packet.payload){
504
0
        return MOSQ_ERR_NOMEM;
505
0
      }
506
0
      mosq->in_packet.to_process = mosq->in_packet.remaining_length;
507
0
    }
508
0
  }
509
0
  while(mosq->in_packet.to_process>0){
510
0
    read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
511
0
    if(read_length > 0){
512
0
      metrics__int_inc(mosq_counter_bytes_received, read_length);
513
0
      mosq->in_packet.to_process -= (uint32_t)read_length;
514
0
      mosq->in_packet.pos += (uint32_t)read_length;
515
0
    }else{
516
#ifdef WIN32
517
      errno = WSAGetLastError();
518
#endif
519
0
      if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
520
0
        if(mosq->in_packet.to_process > 1000){
521
          /* Update last_msg_in time if more than 1000 bytes left to
522
           * receive. Helps when receiving large messages.
523
           * This is an arbitrary limit, but with some consideration.
524
           * If a client can't send 1000 bytes in a second it
525
           * probably shouldn't be using a 1 second keep alive. */
526
0
#ifdef WITH_BROKER
527
0
          keepalive__update(mosq);
528
#else
529
          pthread_mutex_lock(&mosq->msgtime_mutex);
530
          mosq->last_msg_in = mosquitto_time();
531
          pthread_mutex_unlock(&mosq->msgtime_mutex);
532
#endif
533
0
        }
534
0
        return MOSQ_ERR_SUCCESS;
535
0
      }else{
536
0
        switch(errno){
537
0
          case COMPAT_ECONNRESET:
538
0
            return MOSQ_ERR_CONN_LOST;
539
0
          case COMPAT_EINTR:
540
0
            return MOSQ_ERR_SUCCESS;
541
0
          default:
542
0
            return MOSQ_ERR_ERRNO;
543
0
        }
544
0
      }
545
0
    }
546
0
  }
547
548
  /* All data for this packet is read. */
549
0
  mosq->in_packet.pos = 0;
550
0
#ifdef WITH_BROKER
551
0
  metrics__int_inc(mosq_counter_messages_received, 1);
552
0
#endif
553
0
  rc = handle__packet(mosq);
554
555
  /* Free data and reset values */
556
0
  packet__cleanup(&mosq->in_packet);
557
558
0
#ifdef WITH_BROKER
559
0
  keepalive__update(mosq);
560
#else
561
  pthread_mutex_lock(&mosq->msgtime_mutex);
562
  mosq->last_msg_in = mosquitto_time();
563
  pthread_mutex_unlock(&mosq->msgtime_mutex);
564
#endif
565
0
  return rc;
566
0
}