Coverage Report

Created: 2026-01-13 07:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/lib/packet_mosq.c
Line
Count
Source
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <string.h>
24
25
#ifdef WITH_BROKER
26
#  include "mosquitto_broker_internal.h"
27
#  if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
28
#    include <libwebsockets.h>
29
#  endif
30
#else
31
#  include "read_handle.h"
32
#endif
33
34
#include "callbacks.h"
35
#include "mosquitto/mqtt_protocol.h"
36
#include "net_mosq.h"
37
#include "packet_mosq.h"
38
#include "read_handle.h"
39
#include "util_mosq.h"
40
#ifdef WITH_BROKER
41
#  include "sys_tree.h"
42
#  include "send_mosq.h"
43
#else
44
#  define metrics__int_inc(stat, val)
45
#  define metrics__int_dec(stat, val)
46
#endif
47
48
49
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
50
4.44k
{
51
4.44k
  uint8_t remaining_bytes[5], byte;
52
4.44k
  int8_t remaining_count;
53
4.44k
  uint32_t packet_length;
54
4.44k
  uint32_t remaining_length_stored;
55
4.44k
  int i;
56
57
4.44k
  assert(packet);
58
59
4.44k
  remaining_length_stored = remaining_length;
60
4.44k
  remaining_count = 0;
61
4.64k
  do{
62
4.64k
    byte = remaining_length % 128;
63
4.64k
    remaining_length = remaining_length / 128;
64
    /* If there are more digits to encode, set the top bit of this digit */
65
4.64k
    if(remaining_length > 0){
66
204
      byte = byte | 0x80;
67
204
    }
68
4.64k
    remaining_bytes[remaining_count] = byte;
69
4.64k
    remaining_count++;
70
4.64k
  }while(remaining_length > 0 && remaining_count < 5);
71
4.44k
  if(remaining_count == 5){
72
0
    return MOSQ_ERR_PAYLOAD_SIZE;
73
0
  }
74
75
4.44k
  packet_length = remaining_length_stored + 1 + (uint8_t)remaining_count;
76
4.44k
  (*packet) = mosquitto_malloc(sizeof(struct mosquitto__packet) + packet_length + WS_PACKET_OFFSET);
77
4.44k
  if((*packet) == NULL){
78
0
    return MOSQ_ERR_NOMEM;
79
0
  }
80
81
  /* Clear memory for everything but the payload - that will be set to valid
82
   * values when the actual payload is copied in. */
83
4.44k
  memset((*packet), 0, sizeof(struct mosquitto__packet));
84
4.44k
  (*packet)->command = command;
85
4.44k
  (*packet)->remaining_length = remaining_length_stored;
86
4.44k
  (*packet)->remaining_count = remaining_count;
87
4.44k
  (*packet)->packet_length = packet_length + WS_PACKET_OFFSET;
88
89
4.44k
  (*packet)->payload[WS_PACKET_OFFSET] = (*packet)->command;
90
9.09k
  for(i=0; i<(*packet)->remaining_count; i++){
91
4.64k
    (*packet)->payload[WS_PACKET_OFFSET+i+1] = remaining_bytes[i];
92
4.64k
  }
93
4.44k
  (*packet)->pos = WS_PACKET_OFFSET + 1U + (uint8_t)(*packet)->remaining_count;
94
95
4.44k
  return MOSQ_ERR_SUCCESS;
96
4.44k
}
97
98
99
void packet__cleanup(struct mosquitto__packet_in *packet)
100
21.0k
{
101
21.0k
  if(!packet){
102
0
    return;
103
0
  }
104
105
  /* Free data and reset values */
106
21.0k
  packet->command = 0;
107
21.0k
  packet->remaining_count = 0;
108
21.0k
  packet->remaining_mult = 1;
109
21.0k
  packet->remaining_length = 0;
110
21.0k
  mosquitto_FREE(packet->payload);
111
21.0k
  packet->to_process = 0;
112
21.0k
  packet->pos = 0;
113
21.0k
}
114
115
116
void packet__cleanup_all_no_locks(struct mosquitto *mosq)
117
0
{
118
0
  struct mosquitto__packet *packet;
119
120
  /* Out packet cleanup */
121
0
  while(mosq->out_packet){
122
0
    packet = mosq->out_packet;
123
    /* Free data and reset values */
124
0
    mosq->out_packet = mosq->out_packet->next;
125
126
0
    mosquitto_FREE(packet);
127
0
  }
128
0
  metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count);
129
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes);
130
0
  mosq->out_packet_count = 0;
131
0
  mosq->out_packet_bytes = 0;
132
0
  mosq->out_packet_last = NULL;
133
134
0
  packet__cleanup(&mosq->in_packet);
135
0
}
136
137
138
void packet__cleanup_all(struct mosquitto *mosq)
139
0
{
140
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
141
0
  packet__cleanup_all_no_locks(mosq);
142
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
143
0
}
144
145
146
static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packet *packet)
147
4.44k
{
148
4.44k
#ifdef WITH_BROKER
149
4.44k
  if(db.config->max_queued_messages > 0 && mosq->out_packet_count >= db.config->max_queued_messages){
150
0
    mosquitto_free(packet);
151
0
    if(mosq->is_dropping == false){
152
0
      mosq->is_dropping = true;
153
0
      log__printf(NULL, MOSQ_LOG_NOTICE,
154
0
          "Outgoing messages are being dropped for client %s.",
155
0
          mosq->id);
156
0
    }
157
0
    metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
158
0
    return;
159
0
  }
160
4.44k
#endif
161
162
4.44k
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
163
4.44k
  if(mosq->out_packet){
164
8
    mosq->out_packet_last->next = packet;
165
4.43k
  }else{
166
4.43k
    mosq->out_packet = packet;
167
4.43k
  }
168
4.44k
  mosq->out_packet_last = packet;
169
4.44k
  mosq->out_packet_count++;
170
4.44k
  mosq->out_packet_bytes += packet->packet_length;
171
4.44k
  metrics__int_inc(mosq_gauge_out_packets, 1);
172
4.44k
  metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length);
173
4.44k
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
174
4.44k
}
175
176
177
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
178
4.44k
{
179
#ifndef WITH_BROKER
180
  char sockpair_data = 0;
181
#endif
182
4.44k
  assert(mosq);
183
4.44k
  assert(packet);
184
185
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
186
  if(mosq->wsi){
187
    packet->next = NULL;
188
    packet->pos = WS_PACKET_OFFSET;
189
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
190
191
    packet__queue_append(mosq, packet);
192
193
    lws_callback_on_writable(mosq->wsi);
194
    return MOSQ_ERR_SUCCESS;
195
  }else
196
#elif defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
197
4.44k
  if(mosq->transport == mosq_t_ws){
198
0
    ws__prepare_packet(mosq, packet);
199
0
  }else
200
4.44k
#endif
201
4.44k
  {
202
    /* Normal TCP */
203
4.44k
    packet->next = NULL;
204
4.44k
    packet->pos = WS_PACKET_OFFSET;
205
4.44k
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
206
4.44k
  }
207
208
4.44k
  packet__queue_append(mosq, packet);
209
210
4.44k
#ifdef WITH_BROKER
211
4.44k
  return packet__write(mosq);
212
#else
213
  /* Write a single byte to sockpairW (connected to sockpairR) to break out
214
   * of select() if in threaded mode. */
215
  if(mosq->sockpairW != INVALID_SOCKET){
216
#  ifndef WIN32
217
    if(write(mosq->sockpairW, &sockpair_data, 1)){
218
    }
219
#  else
220
    send(mosq->sockpairW, &sockpair_data, 1, 0);
221
#  endif
222
  }
223
224
  if(mosq->callback_depth == 0 && mosq->threaded == mosq_ts_none){
225
    return packet__write(mosq);
226
  }else{
227
    return MOSQ_ERR_SUCCESS;
228
  }
229
#endif
230
4.44k
}
231
232
233
int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
234
1.19k
{
235
1.19k
  uint32_t len;
236
237
1.19k
  if(mosq->maximum_packet_size == 0){
238
1.08k
    return MOSQ_ERR_SUCCESS;
239
1.08k
  }
240
241
114
  len = remaining_length + mosquitto_varint_bytes(remaining_length);
242
114
  if(len > mosq->maximum_packet_size){
243
5
    return MOSQ_ERR_OVERSIZE_PACKET;
244
109
  }else{
245
109
    return MOSQ_ERR_SUCCESS;
246
109
  }
247
114
}
248
249
struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
250
0
{
251
0
  struct mosquitto__packet *packet = NULL;
252
253
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
254
0
  if(mosq->out_packet){
255
0
    mosq->out_packet_count--;
256
0
    mosq->out_packet_bytes -= mosq->out_packet->packet_length;
257
0
    metrics__int_dec(mosq_gauge_out_packets, 1);
258
0
    metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length);
259
260
0
    mosq->out_packet = mosq->out_packet->next;
261
0
    if(!mosq->out_packet){
262
0
      mosq->out_packet_last = NULL;
263
0
    }
264
0
    packet = mosq->out_packet;
265
0
  }
266
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
267
268
0
  return packet;
269
0
}
270
271
272
int packet__write(struct mosquitto *mosq)
273
4.44k
{
274
4.44k
  ssize_t write_length;
275
4.44k
  struct mosquitto__packet *packet, *next_packet;
276
4.44k
  enum mosquitto_client_state state;
277
278
4.44k
  if(!mosq){
279
0
    return MOSQ_ERR_INVAL;
280
0
  }
281
4.44k
  if(!net__is_connected(mosq)){
282
4.44k
    return MOSQ_ERR_NO_CONN;
283
4.44k
  }
284
285
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
286
0
  packet = mosq->out_packet;
287
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
288
289
0
  if(packet == NULL){
290
0
    return MOSQ_ERR_SUCCESS;
291
0
  }
292
293
0
#ifdef WITH_BROKER
294
0
  mux__add_out(mosq);
295
0
#endif
296
297
0
  state = mosquitto__get_state(mosq);
298
0
  if(state == mosq_cs_connect_pending){
299
0
    return MOSQ_ERR_SUCCESS;
300
0
  }
301
302
0
  while(packet){
303
0
    while(packet->to_process > 0){
304
0
      write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
305
0
      if(write_length > 0){
306
0
        metrics__int_inc(mosq_counter_bytes_sent, write_length);
307
0
        packet->to_process -= (uint32_t)write_length;
308
0
        packet->pos += (uint32_t)write_length;
309
0
      }else{
310
0
        WINDOWS_SET_ERRNO_RW();
311
0
        if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK
312
#ifdef WIN32
313
            || errno == WSAENOTCONN
314
#endif
315
0
            ){
316
0
          return MOSQ_ERR_SUCCESS;
317
0
        }else{
318
0
          switch(errno){
319
0
            case COMPAT_ECONNRESET:
320
0
              return MOSQ_ERR_CONN_LOST;
321
0
            case COMPAT_EINTR:
322
0
              return MOSQ_ERR_SUCCESS;
323
0
            case EPROTO:
324
0
              return MOSQ_ERR_TLS;
325
0
            default:
326
0
              return MOSQ_ERR_ERRNO;
327
0
          }
328
0
        }
329
0
      }
330
0
    }
331
332
0
    metrics__int_inc(mosq_counter_messages_sent, 1);
333
0
    if(((packet->command)&0xF6) == CMD_PUBLISH){
334
#ifndef WITH_BROKER
335
      callback__on_publish(mosq, packet->mid, 0, NULL);
336
    }else if(((packet->command)&0xF0) == CMD_DISCONNECT){
337
      net__socket_shutdown(mosq);
338
      return MOSQ_ERR_SUCCESS;
339
#endif
340
0
    }
341
342
0
    next_packet = packet__get_next_out(mosq);
343
0
    mosquitto_FREE(packet);
344
0
    packet = next_packet;
345
346
0
#ifdef WITH_BROKER
347
0
    mosq->next_msg_out = db.now_s + mosq->keepalive;
348
#else
349
    COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
350
    mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
351
    COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
352
#endif
353
0
  }
354
0
#ifdef WITH_BROKER
355
0
  if(mosq->out_packet == NULL){
356
0
    mux__remove_out(mosq);
357
0
  }
358
0
#endif
359
0
  return MOSQ_ERR_SUCCESS;
360
0
}
361
362
363
static int read_header(struct mosquitto *mosq, ssize_t (*func_read)(struct mosquitto *, void *, size_t))
364
0
{
365
0
  ssize_t read_length;
366
367
0
  mosq->in_packet.packet_buffer_pos = 0;
368
0
  mosq->in_packet.packet_buffer_to_process = 0;
369
0
  read_length = func_read(mosq, mosq->in_packet.packet_buffer, mosq->in_packet.packet_buffer_size);
370
0
  if(read_length > 0){
371
0
    mosq->in_packet.packet_buffer_to_process = (uint16_t)read_length;
372
0
#ifdef WITH_BROKER
373
0
    metrics__int_inc(mosq_counter_bytes_received, read_length);
374
0
#endif
375
0
  }else{
376
0
    if(read_length == 0){
377
0
      return MOSQ_ERR_CONN_LOST; /* EOF */
378
0
    }
379
0
    WINDOWS_SET_ERRNO_RW();
380
0
    if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
381
0
      return MOSQ_ERR_SUCCESS;
382
0
    }else{
383
0
      switch(errno){
384
0
        case COMPAT_ECONNRESET:
385
0
          return MOSQ_ERR_CONN_LOST;
386
0
        case COMPAT_EINTR:
387
0
          return MOSQ_ERR_SUCCESS;
388
0
        default:
389
0
          return MOSQ_ERR_ERRNO;
390
0
      }
391
0
    }
392
0
  }
393
0
  return MOSQ_ERR_SUCCESS;
394
0
}
395
396
397
#ifdef WITH_BROKER
398
static int packet__check_in_packet_oversize(struct mosquitto *mosq)
399
0
{
400
0
  switch(mosq->in_packet.command & 0xF0){
401
0
    case CMD_CONNECT:
402
0
      if(mosq->in_packet.remaining_length > db.config->packet_max_connect){
403
0
        return MOSQ_ERR_OVERSIZE_PACKET;
404
0
      }
405
0
      break;
406
407
0
    case CMD_PUBACK:
408
0
    case CMD_PUBREC:
409
0
    case CMD_PUBREL:
410
0
    case CMD_PUBCOMP:
411
0
    case CMD_UNSUBACK:
412
0
      if(mosq->protocol == mosq_p_mqtt5){
413
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
414
0
          return MOSQ_ERR_OVERSIZE_PACKET;
415
0
        }
416
0
      }else{
417
0
        if(mosq->in_packet.remaining_length != 2){
418
0
          return MOSQ_ERR_MALFORMED_PACKET;
419
0
        }
420
0
      }
421
0
      break;
422
423
0
    case CMD_PINGREQ:
424
0
    case CMD_PINGRESP:
425
0
      if(mosq->in_packet.remaining_length != 0){
426
0
        return MOSQ_ERR_MALFORMED_PACKET;
427
0
      }
428
0
      break;
429
430
0
    case CMD_DISCONNECT:
431
0
      if(mosq->protocol == mosq_p_mqtt5){
432
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
433
0
          return MOSQ_ERR_OVERSIZE_PACKET;
434
0
        }
435
0
      }else{
436
0
        if(mosq->in_packet.remaining_length != 0){
437
0
          return MOSQ_ERR_MALFORMED_PACKET;
438
0
        }
439
0
      }
440
0
      break;
441
442
0
    case CMD_SUBSCRIBE:
443
0
    case CMD_UNSUBSCRIBE:
444
0
      if(mosq->protocol == mosq_p_mqtt5 && mosq->in_packet.remaining_length > db.config->packet_max_sub){
445
0
        return MOSQ_ERR_OVERSIZE_PACKET;
446
0
      }
447
0
      break;
448
449
0
    case CMD_AUTH:
450
0
      if(mosq->in_packet.remaining_length > db.config->packet_max_auth){
451
0
        return MOSQ_ERR_OVERSIZE_PACKET;
452
0
      }
453
0
      break;
454
455
0
  }
456
457
0
  if(db.config->max_packet_size > 0 && mosq->in_packet.remaining_length+1 > db.config->max_packet_size){
458
0
    if(mosq->protocol == mosq_p_mqtt5){
459
0
      send__disconnect(mosq, MQTT_RC_PACKET_TOO_LARGE, NULL);
460
0
    }
461
0
    return MOSQ_ERR_OVERSIZE_PACKET;
462
0
  }
463
464
0
  return MOSQ_ERR_SUCCESS;
465
0
}
466
#endif
467
468
469
static int packet__read_single(struct mosquitto *mosq, enum mosquitto_client_state state, ssize_t (*local__read)(struct mosquitto *, void *, size_t))
470
0
{
471
0
  ssize_t read_length;
472
0
  int rc = 0;
473
474
0
  if(!mosq->in_packet.command){
475
0
    if(mosq->in_packet.packet_buffer_to_process == 0){
476
0
      rc = read_header(mosq, local__read);
477
0
      if(rc){
478
0
        return rc;
479
0
      }
480
0
    }
481
482
0
    if(mosq->in_packet.packet_buffer_to_process > 0){
483
0
      mosq->in_packet.command = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
484
0
      mosq->in_packet.packet_buffer_to_process--;
485
0
      mosq->in_packet.packet_buffer_pos++;
486
0
#ifdef WITH_BROKER
487
      /* Clients must send CONNECT as their first command. */
488
0
      if(!(mosq->bridge) && state == mosq_cs_new && (mosq->in_packet.command&0xF0) != CMD_CONNECT){
489
0
        return MOSQ_ERR_PROTOCOL;
490
0
      }else if((mosq->in_packet.command&0xF0) == CMD_RESERVED){
491
0
        if(mosq->protocol == mosq_p_mqtt5){
492
0
          send__disconnect(mosq, MQTT_RC_PROTOCOL_ERROR, NULL);
493
0
        }
494
0
        return MOSQ_ERR_PROTOCOL;
495
0
      }
496
#else
497
      UNUSED(state);
498
#endif
499
0
    }else{
500
0
      return MOSQ_ERR_SUCCESS;
501
0
    }
502
0
  }
503
  /* remaining_count is the number of bytes that the remaining_length
504
   * parameter occupied in this incoming packet. We don't use it here as such
505
   * (it is used when allocating an outgoing packet), but we must be able to
506
   * determine whether all of the remaining_length parameter has been read.
507
   * remaining_count has three states here:
508
   *   0 means that we haven't read any remaining_length bytes
509
   *   <0 means we have read some remaining_length bytes but haven't finished
510
   *   >0 means we have finished reading the remaining_length bytes.
511
   */
512
0
  if(mosq->in_packet.remaining_count <= 0){
513
0
    uint8_t byte;
514
0
    do{
515
0
      if(mosq->in_packet.packet_buffer_to_process == 0){
516
0
        rc = read_header(mosq, local__read);
517
0
        if(rc){
518
0
          return rc;
519
0
        }
520
0
      }
521
522
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
523
0
        mosq->in_packet.remaining_count--;
524
        /* Max 4 bytes length for remaining length as defined by protocol.
525
         * Anything more likely means a broken/malicious client.
526
         */
527
0
        if(mosq->in_packet.remaining_count < -4){
528
0
          return MOSQ_ERR_MALFORMED_PACKET;
529
0
        }
530
531
0
        byte = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
532
0
        mosq->in_packet.packet_buffer_pos++;
533
0
        mosq->in_packet.packet_buffer_to_process--;
534
0
        mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
535
0
        mosq->in_packet.remaining_mult *= 128;
536
0
      }else{
537
0
        return MOSQ_ERR_SUCCESS;
538
0
      }
539
0
    }while((byte & 128) != 0);
540
    /* We have finished reading remaining_length, so make remaining_count
541
     * positive. */
542
0
    mosq->in_packet.remaining_count = (int8_t)(mosq->in_packet.remaining_count * -1);
543
544
0
#ifdef WITH_BROKER
545
0
    rc = packet__check_in_packet_oversize(mosq);
546
0
    if(rc){
547
0
      return rc;
548
0
    }
549
#else
550
    /* FIXME - client case for incoming message received from broker too large */
551
#endif
552
0
    if(mosq->in_packet.remaining_length > 0){
553
0
      mosq->in_packet.payload = mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
554
0
      if(!mosq->in_packet.payload){
555
0
        return MOSQ_ERR_NOMEM;
556
0
      }
557
558
0
      mosq->in_packet.pos = 0;
559
0
      mosq->in_packet.to_process = mosq->in_packet.remaining_length;
560
561
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
562
0
        uint32_t len;
563
0
        if(mosq->in_packet.packet_buffer_to_process > mosq->in_packet.remaining_length){
564
0
          len = mosq->in_packet.remaining_length;
565
0
        }else{
566
0
          len = mosq->in_packet.packet_buffer_to_process;
567
0
        }
568
0
        memcpy(mosq->in_packet.payload, &mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos], len);
569
0
        if(len < mosq->in_packet.packet_buffer_to_process){
570
0
          mosq->in_packet.packet_buffer_pos += (uint16_t)len;
571
0
          mosq->in_packet.packet_buffer_to_process -= (uint16_t)len;
572
0
        }else{
573
0
          mosq->in_packet.packet_buffer_pos = 0;
574
0
          mosq->in_packet.packet_buffer_to_process = 0;
575
0
        }
576
0
        mosq->in_packet.pos += len;
577
0
        mosq->in_packet.to_process -= len;
578
0
      }
579
0
    }
580
0
  }
581
0
  while(mosq->in_packet.to_process>0){
582
0
    read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
583
0
    if(read_length > 0){
584
0
      metrics__int_inc(mosq_counter_bytes_received, read_length);
585
0
      mosq->in_packet.to_process -= (uint32_t)read_length;
586
0
      mosq->in_packet.pos += (uint32_t)read_length;
587
0
    }else{
588
0
      WINDOWS_SET_ERRNO_RW();
589
0
      if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
590
0
        if(mosq->in_packet.to_process > 1000){
591
          /* Update last_msg_in time if more than 1000 bytes left to
592
           * receive. Helps when receiving large messages.
593
           * This is an arbitrary limit, but with some consideration.
594
           * If a client can't send 1000 bytes in a second it
595
           * probably shouldn't be using a 1 second keep alive. */
596
0
#ifdef WITH_BROKER
597
0
          keepalive__update(mosq);
598
#else
599
          COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
600
          mosq->last_msg_in = mosquitto_time();
601
          COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
602
#endif
603
0
        }
604
0
        return MOSQ_ERR_SUCCESS;
605
0
      }else{
606
0
        switch(errno){
607
0
          case COMPAT_ECONNRESET:
608
0
            return MOSQ_ERR_CONN_LOST;
609
0
          case COMPAT_EINTR:
610
0
            return MOSQ_ERR_SUCCESS;
611
0
          default:
612
0
            return MOSQ_ERR_ERRNO;
613
0
        }
614
0
      }
615
0
    }
616
0
  }
617
618
  /* All data for this packet is read. */
619
0
  mosq->in_packet.pos = 0;
620
0
#ifdef WITH_BROKER
621
0
  metrics__int_inc(mosq_counter_messages_received, 1);
622
0
#endif
623
0
  rc = handle__packet(mosq);
624
625
  /* Free data and reset values */
626
0
  packet__cleanup(&mosq->in_packet);
627
628
0
#ifdef WITH_BROKER
629
0
  keepalive__update(mosq);
630
#else
631
  COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
632
  mosq->last_msg_in = mosquitto_time();
633
  COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
634
#endif
635
0
  return rc;
636
0
}
637
638
639
int packet__read(struct mosquitto *mosq)
640
0
{
641
0
  int rc = 0;
642
0
  enum mosquitto_client_state state;
643
0
  ssize_t (*local__read)(struct mosquitto *, void *, size_t);
644
645
0
  if(!mosq){
646
0
    return MOSQ_ERR_INVAL;
647
0
  }
648
0
  if(!net__is_connected(mosq)){
649
0
    return MOSQ_ERR_NO_CONN;
650
0
  }
651
652
0
#if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
653
0
  if(mosq->transport == mosq_t_ws){
654
0
    local__read = net__read_ws;
655
0
  }else
656
0
#endif
657
0
  {
658
0
    local__read = net__read;
659
0
  }
660
661
  /* This gets called if pselect() indicates that there is network data
662
   * available - ie. at least one byte.  What we do depends on what data we
663
   * already have.
664
   * If we've not got a command, attempt to read one and save it. This should
665
   * always work because it's only a single byte.
666
   * Then try to read the remaining length. This may fail because it is may
667
   * be more than one byte - will need to save data pending next read if it
668
   * does fail.
669
   * Then try to read the remaining payload, where 'payload' here means the
670
   * combined variable packet_buffer and actual payload. This is the most likely to
671
   * fail due to longer length, so save current data and current position.
672
   * After all data is read, send to mosquitto__handle_packet() to deal with.
673
   * Finally, free the memory and reset everything to starting conditions.
674
   */
675
0
  do{
676
0
    state = mosquitto__get_state(mosq);
677
0
    if(state == mosq_cs_connect_pending){
678
0
      return MOSQ_ERR_SUCCESS;
679
0
    }
680
0
    rc = packet__read_single(mosq, state, local__read);
681
0
    if(rc){
682
0
      return rc;
683
0
    }
684
0
  }while(mosq->in_packet.packet_buffer_to_process > 0);
685
686
0
  return MOSQ_ERR_SUCCESS;
687
0
}