Coverage Report

Created: 2026-02-14 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/lib/packet_mosq.c
Line
Count
Source
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <string.h>
24
25
#ifdef WITH_BROKER
26
#  include "mosquitto_broker_internal.h"
27
#  if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
28
#    include <libwebsockets.h>
29
#  endif
30
#else
31
#  include "read_handle.h"
32
#endif
33
34
#include "callbacks.h"
35
#include "mosquitto/mqtt_protocol.h"
36
#include "net_mosq.h"
37
#include "packet_mosq.h"
38
#include "read_handle.h"
39
#include "util_mosq.h"
40
#ifdef WITH_BROKER
41
#  include "sys_tree.h"
42
#  include "send_mosq.h"
43
#else
44
#  define metrics__int_inc(stat, val)
45
#  define metrics__int_dec(stat, val)
46
#endif
47
48
49
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
50
4.66k
{
51
4.66k
  uint8_t remaining_bytes[5] = {0}, byte;
52
4.66k
  int8_t remaining_count;
53
4.66k
  uint32_t packet_length;
54
4.66k
  uint32_t remaining_length_stored;
55
4.66k
  int i;
56
57
4.66k
  assert(packet);
58
59
4.66k
  remaining_length_stored = remaining_length;
60
4.66k
  remaining_count = 0;
61
4.88k
  do{
62
4.88k
    byte = remaining_length % 128;
63
4.88k
    remaining_length = remaining_length / 128;
64
    /* If there are more digits to encode, set the top bit of this digit */
65
4.88k
    if(remaining_length > 0){
66
228
      byte = byte | 0x80;
67
228
    }
68
4.88k
    remaining_bytes[remaining_count] = byte;
69
4.88k
    remaining_count++;
70
4.88k
  }while(remaining_length > 0 && remaining_count < 5);
71
4.66k
  if(remaining_count == 5){
72
0
    return MOSQ_ERR_PAYLOAD_SIZE;
73
0
  }
74
75
4.66k
  packet_length = remaining_length_stored + 1 + (uint8_t)remaining_count;
76
4.66k
  (*packet) = mosquitto_malloc(sizeof(struct mosquitto__packet) + packet_length + WS_PACKET_OFFSET);
77
4.66k
  if((*packet) == NULL){
78
0
    return MOSQ_ERR_NOMEM;
79
0
  }
80
81
  /* Clear memory for everything but the payload - that will be set to valid
82
   * values when the actual payload is copied in. */
83
4.66k
  memset((*packet), 0, sizeof(struct mosquitto__packet));
84
4.66k
  (*packet)->command = command;
85
4.66k
  (*packet)->remaining_length = remaining_length_stored;
86
4.66k
  (*packet)->remaining_count = remaining_count;
87
4.66k
  (*packet)->packet_length = packet_length + WS_PACKET_OFFSET;
88
89
4.66k
  (*packet)->payload[WS_PACKET_OFFSET] = (*packet)->command;
90
9.54k
  for(i=0; i<(*packet)->remaining_count; i++){
91
4.88k
    (*packet)->payload[WS_PACKET_OFFSET+i+1] = remaining_bytes[i];
92
4.88k
  }
93
4.66k
  (*packet)->pos = WS_PACKET_OFFSET + 1U + (uint8_t)(*packet)->remaining_count;
94
95
4.66k
  return MOSQ_ERR_SUCCESS;
96
4.66k
}
97
98
99
void packet__cleanup(struct mosquitto__packet_in *packet)
100
21.9k
{
101
21.9k
  if(!packet){
102
0
    return;
103
0
  }
104
105
  /* Free data and reset values */
106
21.9k
  packet->command = 0;
107
21.9k
  packet->remaining_count = 0;
108
21.9k
  packet->remaining_mult = 1;
109
21.9k
  packet->remaining_length = 0;
110
21.9k
  mosquitto_FREE(packet->payload);
111
21.9k
  packet->to_process = 0;
112
21.9k
  packet->pos = 0;
113
21.9k
}
114
115
116
void packet__cleanup_all_no_locks(struct mosquitto *mosq)
117
0
{
118
0
  struct mosquitto__packet *packet;
119
120
  /* Out packet cleanup */
121
0
  while(mosq->out_packet){
122
0
    packet = mosq->out_packet;
123
    /* Free data and reset values */
124
0
    mosq->out_packet = mosq->out_packet->next;
125
126
0
    mosquitto_FREE(packet);
127
0
  }
128
0
  metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count);
129
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes);
130
0
  mosq->out_packet_count = 0;
131
0
  mosq->out_packet_bytes = 0;
132
0
  mosq->out_packet_last = NULL;
133
134
0
  packet__cleanup(&mosq->in_packet);
135
0
}
136
137
138
void packet__cleanup_all(struct mosquitto *mosq)
139
0
{
140
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
141
0
  packet__cleanup_all_no_locks(mosq);
142
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
143
0
}
144
145
146
static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packet *packet)
147
4.66k
{
148
4.66k
#ifdef WITH_BROKER
149
4.66k
  if(db.config->max_queued_messages > 0 && mosq->out_packet_count >= db.config->max_queued_messages){
150
0
    mosquitto_free(packet);
151
0
    if(mosq->is_dropping == false){
152
0
      mosq->is_dropping = true;
153
0
      log__printf(NULL, MOSQ_LOG_NOTICE,
154
0
          "Outgoing messages are being dropped for client %s.",
155
0
          mosq->id);
156
0
    }
157
0
    metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
158
0
    return;
159
0
  }
160
4.66k
#endif
161
162
4.66k
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
163
4.66k
  if(mosq->out_packet){
164
9
    mosq->out_packet_last->next = packet;
165
4.65k
  }else{
166
4.65k
    mosq->out_packet = packet;
167
4.65k
  }
168
4.66k
  mosq->out_packet_last = packet;
169
4.66k
  mosq->out_packet_count++;
170
4.66k
  mosq->out_packet_bytes += packet->packet_length;
171
4.66k
  metrics__int_inc(mosq_gauge_out_packets, 1);
172
4.66k
  metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length);
173
4.66k
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
174
4.66k
}
175
176
177
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
178
4.66k
{
179
#ifndef WITH_BROKER
180
  char sockpair_data = 0;
181
#endif
182
4.66k
  assert(mosq);
183
4.66k
  assert(packet);
184
185
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
186
  if(mosq->wsi){
187
    packet->next = NULL;
188
    packet->pos = WS_PACKET_OFFSET;
189
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
190
191
    packet__queue_append(mosq, packet);
192
193
    lws_callback_on_writable(mosq->wsi);
194
    return MOSQ_ERR_SUCCESS;
195
  }else
196
#elif defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
197
4.66k
  if(mosq->transport == mosq_t_ws){
198
0
    ws__prepare_packet(mosq, packet);
199
0
  }else
200
4.66k
#endif
201
4.66k
  {
202
    /* Normal TCP */
203
4.66k
    packet->next = NULL;
204
4.66k
    packet->pos = WS_PACKET_OFFSET;
205
4.66k
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
206
4.66k
  }
207
208
4.66k
  packet__queue_append(mosq, packet);
209
210
4.66k
#ifdef WITH_BROKER
211
4.66k
  return packet__write(mosq);
212
#else
213
  /* Write a single byte to sockpairW (connected to sockpairR) to break out
214
   * of select() if in threaded mode. */
215
  if(mosq->sockpairW != INVALID_SOCKET){
216
#  ifndef WIN32
217
    if(write(mosq->sockpairW, &sockpair_data, 1)){
218
    }
219
#  else
220
    send(mosq->sockpairW, &sockpair_data, 1, 0);
221
#  endif
222
  }
223
224
  if(mosq->callback_depth == 0 && mosq->threaded == mosq_ts_none){
225
    return packet__write(mosq);
226
  }else{
227
    return MOSQ_ERR_SUCCESS;
228
  }
229
#endif
230
4.66k
}
231
232
233
int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
234
1.23k
{
235
1.23k
  uint32_t len;
236
237
1.23k
  if(mosq->maximum_packet_size == 0){
238
1.10k
    return MOSQ_ERR_SUCCESS;
239
1.10k
  }
240
241
135
  len = remaining_length + mosquitto_varint_bytes(remaining_length);
242
135
  if(len > mosq->maximum_packet_size){
243
8
    return MOSQ_ERR_OVERSIZE_PACKET;
244
127
  }else{
245
127
    return MOSQ_ERR_SUCCESS;
246
127
  }
247
135
}
248
249
struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
250
0
{
251
0
  struct mosquitto__packet *packet = NULL;
252
253
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
254
0
  if(mosq->out_packet){
255
0
    mosq->out_packet_count--;
256
0
    mosq->out_packet_bytes -= mosq->out_packet->packet_length;
257
0
    metrics__int_dec(mosq_gauge_out_packets, 1);
258
0
    metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length);
259
260
0
    mosq->out_packet = mosq->out_packet->next;
261
0
    if(!mosq->out_packet){
262
0
      mosq->out_packet_last = NULL;
263
0
    }
264
0
    packet = mosq->out_packet;
265
0
  }
266
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
267
268
0
  return packet;
269
0
}
270
271
272
int packet__write(struct mosquitto *mosq)
273
4.66k
{
274
4.66k
  ssize_t write_length;
275
4.66k
  struct mosquitto__packet *packet, *next_packet;
276
4.66k
  enum mosquitto_client_state state;
277
278
4.66k
  if(!mosq){
279
0
    return MOSQ_ERR_INVAL;
280
0
  }
281
4.66k
  if(!net__is_connected(mosq)){
282
4.66k
    return MOSQ_ERR_NO_CONN;
283
4.66k
  }
284
285
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
286
0
  packet = mosq->out_packet;
287
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
288
289
0
  if(packet == NULL){
290
0
    return MOSQ_ERR_SUCCESS;
291
0
  }
292
293
0
#ifdef WITH_BROKER
294
0
  mux__add_out(mosq);
295
0
#endif
296
297
0
  state = mosquitto__get_state(mosq);
298
0
  if(state == mosq_cs_connect_pending){
299
0
    return MOSQ_ERR_SUCCESS;
300
0
  }
301
302
0
  while(packet){
303
0
    while(packet->to_process > 0){
304
0
      write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
305
0
      if(write_length > 0){
306
0
        metrics__int_inc(mosq_counter_bytes_sent, write_length);
307
0
        packet->to_process -= (uint32_t)write_length;
308
0
        packet->pos += (uint32_t)write_length;
309
0
      }else{
310
0
        WINDOWS_SET_ERRNO_RW();
311
0
        if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK
312
#ifdef WIN32
313
            || errno == WSAENOTCONN
314
#endif
315
0
            ){
316
0
          return MOSQ_ERR_SUCCESS;
317
0
        }else{
318
0
          switch(errno){
319
0
            case COMPAT_ECONNRESET:
320
0
              return MOSQ_ERR_CONN_LOST;
321
0
            case COMPAT_EINTR:
322
0
              return MOSQ_ERR_SUCCESS;
323
0
            case EPROTO:
324
0
              return MOSQ_ERR_TLS;
325
0
            default:
326
0
              return MOSQ_ERR_ERRNO;
327
0
          }
328
0
        }
329
0
      }
330
0
    }
331
332
0
    metrics__int_inc(mosq_counter_messages_sent, 1);
333
0
    if(((packet->command)&0xF6) == CMD_PUBLISH){
334
#ifndef WITH_BROKER
335
      callback__on_publish(mosq, packet->mid, 0, NULL);
336
    }else if(((packet->command)&0xF0) == CMD_DISCONNECT){
337
      net__socket_shutdown(mosq);
338
      return MOSQ_ERR_SUCCESS;
339
#endif
340
0
    }
341
342
0
    next_packet = packet__get_next_out(mosq);
343
0
    mosquitto_FREE(packet);
344
0
    packet = next_packet;
345
346
0
#ifdef WITH_BROKER
347
0
    mosq->next_msg_out = db.now_s + mosq->keepalive;
348
#else
349
    COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
350
    mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
351
    COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
352
#endif
353
0
  }
354
0
#ifdef WITH_BROKER
355
0
  if(mosq->out_packet == NULL){
356
0
    mux__remove_out(mosq);
357
0
  }
358
0
#endif
359
0
  return MOSQ_ERR_SUCCESS;
360
0
}
361
362
363
static int read_header(struct mosquitto *mosq, ssize_t (*func_read)(struct mosquitto *, void *, size_t))
364
0
{
365
0
  ssize_t read_length;
366
367
0
  mosq->in_packet.packet_buffer_pos = 0;
368
0
  mosq->in_packet.packet_buffer_to_process = 0;
369
0
  read_length = func_read(mosq, mosq->in_packet.packet_buffer, mosq->in_packet.packet_buffer_size);
370
0
  if(read_length > 0){
371
0
    mosq->in_packet.packet_buffer_to_process = (uint16_t)read_length;
372
0
#ifdef WITH_BROKER
373
0
    metrics__int_inc(mosq_counter_bytes_received, read_length);
374
0
#endif
375
0
  }else{
376
0
    if(read_length == 0){
377
0
      return MOSQ_ERR_CONN_LOST; /* EOF */
378
0
    }
379
0
    WINDOWS_SET_ERRNO_RW();
380
0
    if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
381
0
      return MOSQ_ERR_SUCCESS;
382
0
    }else{
383
0
      switch(errno){
384
0
        case COMPAT_ECONNRESET:
385
0
          return MOSQ_ERR_CONN_LOST;
386
0
        case COMPAT_EINTR:
387
0
          return MOSQ_ERR_SUCCESS;
388
0
        default:
389
0
          return MOSQ_ERR_ERRNO;
390
0
      }
391
0
    }
392
0
  }
393
0
  return MOSQ_ERR_SUCCESS;
394
0
}
395
396
397
#ifdef WITH_BROKER
398
399
400
static int packet__check_in_packet_oversize(struct mosquitto *mosq)
401
0
{
402
0
  switch(mosq->in_packet.command & 0xF0){
403
0
    case CMD_CONNECT:
404
0
      if(mosq->in_packet.remaining_length > db.config->packet_max_connect){
405
0
        return MOSQ_ERR_OVERSIZE_PACKET;
406
0
      }
407
0
      break;
408
409
0
    case CMD_PUBACK:
410
0
    case CMD_PUBREC:
411
0
    case CMD_PUBREL:
412
0
    case CMD_PUBCOMP:
413
0
    case CMD_UNSUBACK:
414
0
      if(mosq->protocol == mosq_p_mqtt5){
415
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
416
0
          return MOSQ_ERR_OVERSIZE_PACKET;
417
0
        }
418
0
      }else{
419
0
        if(mosq->in_packet.remaining_length != 2){
420
0
          return MOSQ_ERR_MALFORMED_PACKET;
421
0
        }
422
0
      }
423
0
      break;
424
425
0
    case CMD_PINGREQ:
426
0
    case CMD_PINGRESP:
427
0
      if(mosq->in_packet.remaining_length != 0){
428
0
        return MOSQ_ERR_MALFORMED_PACKET;
429
0
      }
430
0
      break;
431
432
0
    case CMD_DISCONNECT:
433
0
      if(mosq->protocol == mosq_p_mqtt5){
434
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
435
0
          return MOSQ_ERR_OVERSIZE_PACKET;
436
0
        }
437
0
      }else{
438
0
        if(mosq->in_packet.remaining_length != 0){
439
0
          return MOSQ_ERR_MALFORMED_PACKET;
440
0
        }
441
0
      }
442
0
      break;
443
444
0
    case CMD_SUBSCRIBE:
445
0
    case CMD_UNSUBSCRIBE:
446
0
      if(mosq->protocol == mosq_p_mqtt5 && mosq->in_packet.remaining_length > db.config->packet_max_sub){
447
0
        return MOSQ_ERR_OVERSIZE_PACKET;
448
0
      }
449
0
      break;
450
451
0
    case CMD_AUTH:
452
0
      if(mosq->in_packet.remaining_length > db.config->packet_max_auth){
453
0
        return MOSQ_ERR_OVERSIZE_PACKET;
454
0
      }
455
0
      break;
456
457
0
  }
458
459
0
  if(db.config->max_packet_size > 0 && mosq->in_packet.remaining_length+1 > db.config->max_packet_size){
460
0
    if(mosq->protocol == mosq_p_mqtt5){
461
0
      send__disconnect(mosq, MQTT_RC_PACKET_TOO_LARGE, NULL);
462
0
    }
463
0
    return MOSQ_ERR_OVERSIZE_PACKET;
464
0
  }
465
466
0
  return MOSQ_ERR_SUCCESS;
467
0
}
468
#endif
469
470
471
static int packet__read_single(struct mosquitto *mosq, enum mosquitto_client_state state, ssize_t (*local__read)(struct mosquitto *, void *, size_t))
472
0
{
473
0
  ssize_t read_length;
474
0
  int rc = 0;
475
476
0
  if(!mosq->in_packet.command){
477
0
    if(mosq->in_packet.packet_buffer_to_process == 0){
478
0
      rc = read_header(mosq, local__read);
479
0
      if(rc){
480
0
        return rc;
481
0
      }
482
0
    }
483
484
0
    if(mosq->in_packet.packet_buffer_to_process > 0){
485
0
      mosq->in_packet.command = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
486
0
      mosq->in_packet.packet_buffer_to_process--;
487
0
      mosq->in_packet.packet_buffer_pos++;
488
0
#ifdef WITH_BROKER
489
      /* Clients must send CONNECT as their first command. */
490
0
      if(!(mosq->bridge) && state == mosq_cs_new && (mosq->in_packet.command&0xF0) != CMD_CONNECT){
491
0
        log__printf(NULL, MOSQ_LOG_INFO, "Protocol error from %s:%d: First packet not CONNECT (%02X).",
492
0
            mosq->address, mosq->remote_port, mosq->in_packet.command);
493
0
        return MOSQ_ERR_PROTOCOL;
494
0
      }else if((mosq->in_packet.command&0xF0) == CMD_RESERVED){
495
0
        if(mosq->protocol == mosq_p_mqtt5){
496
0
          send__disconnect(mosq, MQTT_RC_PROTOCOL_ERROR, NULL);
497
0
        }
498
0
        log__printf(NULL, MOSQ_LOG_INFO, "Protocol error from %s:%d: RESERVED packet.",
499
0
            mosq->address, mosq->remote_port);
500
0
        return MOSQ_ERR_PROTOCOL;
501
0
      }
502
#else
503
      UNUSED(state);
504
#endif
505
0
    }else{
506
0
      return MOSQ_ERR_SUCCESS;
507
0
    }
508
0
  }
509
  /* remaining_count is the number of bytes that the remaining_length
510
   * parameter occupied in this incoming packet. We don't use it here as such
511
   * (it is used when allocating an outgoing packet), but we must be able to
512
   * determine whether all of the remaining_length parameter has been read.
513
   * remaining_count has three states here:
514
   *   0 means that we haven't read any remaining_length bytes
515
   *   <0 means we have read some remaining_length bytes but haven't finished
516
   *   >0 means we have finished reading the remaining_length bytes.
517
   */
518
0
  if(mosq->in_packet.remaining_count <= 0){
519
0
    uint8_t byte;
520
0
    do{
521
0
      if(mosq->in_packet.packet_buffer_to_process == 0){
522
0
        rc = read_header(mosq, local__read);
523
0
        if(rc){
524
0
          return rc;
525
0
        }
526
0
      }
527
528
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
529
0
        mosq->in_packet.remaining_count--;
530
        /* Max 4 bytes length for remaining length as defined by protocol.
531
         * Anything more likely means a broken/malicious client.
532
         */
533
0
        if(mosq->in_packet.remaining_count < -4){
534
0
          return MOSQ_ERR_MALFORMED_PACKET;
535
0
        }
536
537
0
        byte = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
538
0
        mosq->in_packet.packet_buffer_pos++;
539
0
        mosq->in_packet.packet_buffer_to_process--;
540
0
        mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
541
0
        mosq->in_packet.remaining_mult *= 128;
542
0
      }else{
543
0
        return MOSQ_ERR_SUCCESS;
544
0
      }
545
0
    }while((byte & 128) != 0);
546
    /* We have finished reading remaining_length, so make remaining_count
547
     * positive. */
548
0
    mosq->in_packet.remaining_count = (int8_t)(mosq->in_packet.remaining_count * -1);
549
550
0
#ifdef WITH_BROKER
551
0
    rc = packet__check_in_packet_oversize(mosq);
552
0
    if(rc){
553
0
      return rc;
554
0
    }
555
#else
556
    /* FIXME - client case for incoming message received from broker too large */
557
#endif
558
0
    if(mosq->in_packet.remaining_length > 0){
559
0
      mosq->in_packet.payload = mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
560
0
      if(!mosq->in_packet.payload){
561
0
        return MOSQ_ERR_NOMEM;
562
0
      }
563
564
0
      mosq->in_packet.pos = 0;
565
0
      mosq->in_packet.to_process = mosq->in_packet.remaining_length;
566
567
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
568
0
        uint32_t len;
569
0
        if(mosq->in_packet.packet_buffer_to_process > mosq->in_packet.remaining_length){
570
0
          len = mosq->in_packet.remaining_length;
571
0
        }else{
572
0
          len = mosq->in_packet.packet_buffer_to_process;
573
0
        }
574
0
        memcpy(mosq->in_packet.payload, &mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos], len);
575
0
        if(len < mosq->in_packet.packet_buffer_to_process){
576
0
          mosq->in_packet.packet_buffer_pos = (uint16_t)(mosq->in_packet.packet_buffer_pos + len);
577
0
          mosq->in_packet.packet_buffer_to_process = (uint16_t)(mosq->in_packet.packet_buffer_to_process - len);
578
0
        }else{
579
0
          mosq->in_packet.packet_buffer_pos = 0;
580
0
          mosq->in_packet.packet_buffer_to_process = 0;
581
0
        }
582
0
        mosq->in_packet.pos += len;
583
0
        mosq->in_packet.to_process -= len;
584
0
      }
585
0
    }
586
0
  }
587
0
  while(mosq->in_packet.to_process>0){
588
0
    read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
589
0
    if(read_length > 0){
590
0
      metrics__int_inc(mosq_counter_bytes_received, read_length);
591
0
      mosq->in_packet.to_process -= (uint32_t)read_length;
592
0
      mosq->in_packet.pos += (uint32_t)read_length;
593
0
    }else{
594
0
      WINDOWS_SET_ERRNO_RW();
595
0
      if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
596
0
        if(mosq->in_packet.to_process > 1000){
597
          /* Update last_msg_in time if more than 1000 bytes left to
598
           * receive. Helps when receiving large messages.
599
           * This is an arbitrary limit, but with some consideration.
600
           * If a client can't send 1000 bytes in a second it
601
           * probably shouldn't be using a 1 second keep alive. */
602
0
#ifdef WITH_BROKER
603
0
          keepalive__update(mosq);
604
#else
605
          COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
606
          mosq->last_msg_in = mosquitto_time();
607
          COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
608
#endif
609
0
        }
610
0
        return MOSQ_ERR_SUCCESS;
611
0
      }else{
612
0
        switch(errno){
613
0
          case COMPAT_ECONNRESET:
614
0
            return MOSQ_ERR_CONN_LOST;
615
0
          case COMPAT_EINTR:
616
0
            return MOSQ_ERR_SUCCESS;
617
0
          default:
618
0
            return MOSQ_ERR_ERRNO;
619
0
        }
620
0
      }
621
0
    }
622
0
  }
623
624
  /* All data for this packet is read. */
625
0
  mosq->in_packet.pos = 0;
626
0
#ifdef WITH_BROKER
627
0
  metrics__int_inc(mosq_counter_messages_received, 1);
628
0
#endif
629
0
  rc = handle__packet(mosq);
630
631
  /* Free data and reset values */
632
0
  packet__cleanup(&mosq->in_packet);
633
634
0
#ifdef WITH_BROKER
635
0
  keepalive__update(mosq);
636
#else
637
  COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
638
  mosq->last_msg_in = mosquitto_time();
639
  COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
640
#endif
641
0
  return rc;
642
0
}
643
644
645
int packet__read(struct mosquitto *mosq)
646
0
{
647
0
  int rc = 0;
648
0
  enum mosquitto_client_state state;
649
0
  ssize_t (*local__read)(struct mosquitto *, void *, size_t);
650
651
0
  if(!mosq){
652
0
    return MOSQ_ERR_INVAL;
653
0
  }
654
0
  if(!net__is_connected(mosq)){
655
0
    return MOSQ_ERR_NO_CONN;
656
0
  }
657
658
0
#if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
659
0
  if(mosq->transport == mosq_t_ws){
660
0
    local__read = net__read_ws;
661
0
  }else
662
0
#endif
663
0
  {
664
0
    local__read = net__read;
665
0
  }
666
667
  /* This gets called if pselect() indicates that there is network data
668
   * available - ie. at least one byte.  What we do depends on what data we
669
   * already have.
670
   * If we've not got a command, attempt to read one and save it. This should
671
   * always work because it's only a single byte.
672
   * Then try to read the remaining length. This may fail because it is may
673
   * be more than one byte - will need to save data pending next read if it
674
   * does fail.
675
   * Then try to read the remaining payload, where 'payload' here means the
676
   * combined variable packet_buffer and actual payload. This is the most likely to
677
   * fail due to longer length, so save current data and current position.
678
   * After all data is read, send to mosquitto__handle_packet() to deal with.
679
   * Finally, free the memory and reset everything to starting conditions.
680
   */
681
0
  do{
682
0
    state = mosquitto__get_state(mosq);
683
0
    if(state == mosq_cs_connect_pending){
684
0
      return MOSQ_ERR_SUCCESS;
685
0
    }
686
0
    rc = packet__read_single(mosq, state, local__read);
687
0
    if(rc){
688
0
      return rc;
689
0
    }
690
0
  }while(mosq->in_packet.packet_buffer_to_process > 0);
691
692
0
  return MOSQ_ERR_SUCCESS;
693
0
}