Coverage Report

Created: 2025-09-04 06:10

/src/mosquitto/lib/packet_mosq.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <string.h>
24
25
#ifdef WITH_BROKER
26
#  include "mosquitto_broker_internal.h"
27
#  if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
28
#    include <libwebsockets.h>
29
#  endif
30
#else
31
#  include "read_handle.h"
32
#endif
33
34
#include "callbacks.h"
35
#include "mosquitto/mqtt_protocol.h"
36
#include "net_mosq.h"
37
#include "packet_mosq.h"
38
#include "read_handle.h"
39
#include "util_mosq.h"
40
#ifdef WITH_BROKER
41
#  include "sys_tree.h"
42
#  include "send_mosq.h"
43
#else
44
#  define metrics__int_inc(stat, val)
45
#  define metrics__int_dec(stat, val)
46
#endif
47
48
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
49
4.41k
{
50
4.41k
  uint8_t remaining_bytes[5], byte;
51
4.41k
  int8_t remaining_count;
52
4.41k
  uint32_t packet_length;
53
4.41k
  uint32_t remaining_length_stored;
54
4.41k
  int i;
55
56
4.41k
  assert(packet);
57
58
4.41k
  remaining_length_stored = remaining_length;
59
4.41k
  remaining_count = 0;
60
4.62k
  do{
61
4.62k
    byte = remaining_length % 128;
62
4.62k
    remaining_length = remaining_length / 128;
63
    /* If there are more digits to encode, set the top bit of this digit */
64
4.62k
    if(remaining_length > 0){
65
209
      byte = byte | 0x80;
66
209
    }
67
4.62k
    remaining_bytes[remaining_count] = byte;
68
4.62k
    remaining_count++;
69
4.62k
  }while(remaining_length > 0 && remaining_count < 5);
70
4.41k
  if(remaining_count == 5) return MOSQ_ERR_PAYLOAD_SIZE;
71
72
4.41k
  packet_length = remaining_length_stored + 1 + (uint8_t)remaining_count;
73
4.41k
  (*packet) = mosquitto_malloc(sizeof(struct mosquitto__packet) + packet_length + WS_PACKET_OFFSET);
74
4.41k
  if((*packet) == NULL) return MOSQ_ERR_NOMEM;
75
76
  /* Clear memory for everything but the payload - that will be set to valid
77
   * values when the actual payload is copied in. */
78
4.41k
  memset((*packet), 0, sizeof(struct mosquitto__packet));
79
4.41k
  (*packet)->command = command;
80
4.41k
  (*packet)->remaining_length = remaining_length_stored;
81
4.41k
  (*packet)->remaining_count = remaining_count;
82
4.41k
  (*packet)->packet_length = packet_length + WS_PACKET_OFFSET;
83
84
4.41k
  (*packet)->payload[WS_PACKET_OFFSET] = (*packet)->command;
85
9.03k
  for(i=0; i<(*packet)->remaining_count; i++){
86
4.62k
    (*packet)->payload[WS_PACKET_OFFSET+i+1] = remaining_bytes[i];
87
4.62k
  }
88
4.41k
  (*packet)->pos = WS_PACKET_OFFSET + 1U + (uint8_t)(*packet)->remaining_count;
89
90
4.41k
  return MOSQ_ERR_SUCCESS;
91
4.41k
}
92
93
void packet__cleanup(struct mosquitto__packet_in *packet)
94
20.5k
{
95
20.5k
  if(!packet) return;
96
97
  /* Free data and reset values */
98
20.5k
  packet->command = 0;
99
20.5k
  packet->remaining_count = 0;
100
20.5k
  packet->remaining_mult = 1;
101
20.5k
  packet->remaining_length = 0;
102
20.5k
  mosquitto_FREE(packet->payload);
103
20.5k
  packet->to_process = 0;
104
20.5k
  packet->pos = 0;
105
20.5k
}
106
107
108
void packet__cleanup_all_no_locks(struct mosquitto *mosq)
109
0
{
110
0
  struct mosquitto__packet *packet;
111
112
  /* Out packet cleanup */
113
0
  while(mosq->out_packet){
114
0
    packet = mosq->out_packet;
115
    /* Free data and reset values */
116
0
    mosq->out_packet = mosq->out_packet->next;
117
118
0
    mosquitto_FREE(packet);
119
0
  }
120
0
  metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count);
121
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes);
122
0
  mosq->out_packet_count = 0;
123
0
  mosq->out_packet_bytes = 0;
124
0
  mosq->out_packet_last = NULL;
125
126
0
  packet__cleanup(&mosq->in_packet);
127
0
}
128
129
void packet__cleanup_all(struct mosquitto *mosq)
130
0
{
131
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
132
0
  packet__cleanup_all_no_locks(mosq);
133
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
134
0
}
135
136
137
static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packet *packet)
138
4.41k
{
139
4.41k
#ifdef WITH_BROKER
140
4.41k
  if(db.config->max_queued_messages > 0 && mosq->out_packet_count >= db.config->max_queued_messages){
141
0
    mosquitto_free(packet);
142
0
    if(mosq->is_dropping == false){
143
0
      mosq->is_dropping = true;
144
0
      log__printf(NULL, MOSQ_LOG_NOTICE,
145
0
          "Outgoing messages are being dropped for client %s.",
146
0
          mosq->id);
147
0
    }
148
0
    metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
149
0
    return;
150
0
  }
151
4.41k
#endif
152
153
4.41k
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
154
4.41k
  if(mosq->out_packet){
155
11
    mosq->out_packet_last->next = packet;
156
4.40k
  }else{
157
4.40k
    mosq->out_packet = packet;
158
4.40k
  }
159
4.41k
  mosq->out_packet_last = packet;
160
4.41k
  mosq->out_packet_count++;
161
4.41k
  mosq->out_packet_bytes += packet->packet_length;
162
4.41k
  metrics__int_inc(mosq_gauge_out_packets, 1);
163
4.41k
  metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length);
164
4.41k
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
165
4.41k
}
166
167
168
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
169
4.41k
{
170
#ifndef WITH_BROKER
171
  char sockpair_data = 0;
172
#endif
173
4.41k
  assert(mosq);
174
4.41k
  assert(packet);
175
176
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
177
  if(mosq->wsi){
178
    packet->next = NULL;
179
    packet->pos = WS_PACKET_OFFSET;
180
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
181
182
    packet__queue_append(mosq, packet);
183
184
    lws_callback_on_writable(mosq->wsi);
185
    return MOSQ_ERR_SUCCESS;
186
  }else
187
#elif defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
188
4.41k
  if(mosq->transport == mosq_t_ws){
189
0
    ws__prepare_packet(mosq, packet);
190
0
  }else
191
4.41k
#endif
192
4.41k
  {
193
    /* Normal TCP */
194
4.41k
    packet->next = NULL;
195
4.41k
    packet->pos = WS_PACKET_OFFSET;
196
4.41k
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
197
4.41k
  }
198
199
4.41k
  packet__queue_append(mosq, packet);
200
201
4.41k
#ifdef WITH_BROKER
202
4.41k
  return packet__write(mosq);
203
#else
204
  /* Write a single byte to sockpairW (connected to sockpairR) to break out
205
   * of select() if in threaded mode. */
206
  if(mosq->sockpairW != INVALID_SOCKET){
207
#  ifndef WIN32
208
    if(write(mosq->sockpairW, &sockpair_data, 1)){
209
    }
210
#  else
211
    send(mosq->sockpairW, &sockpair_data, 1, 0);
212
#  endif
213
  }
214
215
  if(mosq->callback_depth == 0 && mosq->threaded == mosq_ts_none){
216
    return packet__write(mosq);
217
  }else{
218
    return MOSQ_ERR_SUCCESS;
219
  }
220
#endif
221
4.41k
}
222
223
224
int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
225
1.13k
{
226
1.13k
  uint32_t len;
227
228
1.13k
  if(mosq->maximum_packet_size == 0) return MOSQ_ERR_SUCCESS;
229
230
111
  len = remaining_length + mosquitto_varint_bytes(remaining_length);
231
111
  if(len > mosq->maximum_packet_size){
232
4
    return MOSQ_ERR_OVERSIZE_PACKET;
233
107
  }else{
234
107
    return MOSQ_ERR_SUCCESS;
235
107
  }
236
111
}
237
238
struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
239
0
{
240
0
  struct mosquitto__packet *packet = NULL;
241
242
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
243
0
  if(mosq->out_packet){
244
0
    mosq->out_packet_count--;
245
0
    mosq->out_packet_bytes -= mosq->out_packet->packet_length;
246
0
    metrics__int_dec(mosq_gauge_out_packets, 1);
247
0
    metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length);
248
249
0
    mosq->out_packet = mosq->out_packet->next;
250
0
    if(!mosq->out_packet){
251
0
      mosq->out_packet_last = NULL;
252
0
    }
253
0
    packet = mosq->out_packet;
254
0
  }
255
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
256
257
0
  return packet;
258
0
}
259
260
261
int packet__write(struct mosquitto *mosq)
262
4.41k
{
263
4.41k
  ssize_t write_length;
264
4.41k
  struct mosquitto__packet *packet, *next_packet;
265
4.41k
  enum mosquitto_client_state state;
266
267
4.41k
  if(!mosq) return MOSQ_ERR_INVAL;
268
4.41k
  if(!net__is_connected(mosq)){
269
4.41k
    return MOSQ_ERR_NO_CONN;
270
4.41k
  }
271
272
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
273
0
  packet = mosq->out_packet;
274
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
275
276
0
  if(packet == NULL){
277
0
    return MOSQ_ERR_SUCCESS;
278
0
  }
279
280
0
#ifdef WITH_BROKER
281
0
  mux__add_out(mosq);
282
0
#endif
283
284
0
  state = mosquitto__get_state(mosq);
285
0
  if(state == mosq_cs_connect_pending){
286
0
    return MOSQ_ERR_SUCCESS;
287
0
  }
288
289
0
  while(packet){
290
0
    while(packet->to_process > 0){
291
0
      write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
292
0
      if(write_length > 0){
293
0
        metrics__int_inc(mosq_counter_bytes_sent, write_length);
294
0
        packet->to_process -= (uint32_t)write_length;
295
0
        packet->pos += (uint32_t)write_length;
296
0
      }else{
297
0
        WINDOWS_SET_ERRNO();
298
299
0
        if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK
300
#ifdef WIN32
301
            || errno == WSAENOTCONN
302
#endif
303
0
            ){
304
0
          return MOSQ_ERR_SUCCESS;
305
0
        }else{
306
0
          switch(errno){
307
0
            case COMPAT_ECONNRESET:
308
0
              return MOSQ_ERR_CONN_LOST;
309
0
            case COMPAT_EINTR:
310
0
              return MOSQ_ERR_SUCCESS;
311
0
            case EPROTO:
312
0
              return MOSQ_ERR_TLS;
313
0
            default:
314
0
              return MOSQ_ERR_ERRNO;
315
0
          }
316
0
        }
317
0
      }
318
0
    }
319
320
0
    metrics__int_inc(mosq_counter_messages_sent, 1);
321
0
    if(((packet->command)&0xF6) == CMD_PUBLISH){
322
#ifndef WITH_BROKER
323
      callback__on_publish(mosq, packet->mid, 0, NULL);
324
    }else if(((packet->command)&0xF0) == CMD_DISCONNECT){
325
      net__socket_shutdown(mosq);
326
      return MOSQ_ERR_SUCCESS;
327
#endif
328
0
    }
329
330
0
    next_packet = packet__get_next_out(mosq);
331
0
    mosquitto_FREE(packet);
332
0
    packet = next_packet;
333
334
0
#ifdef WITH_BROKER
335
0
    mosq->next_msg_out = db.now_s + mosq->keepalive;
336
#else
337
    COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
338
    mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
339
    COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
340
#endif
341
0
  }
342
0
#ifdef WITH_BROKER
343
0
  if (mosq->out_packet == NULL) {
344
0
    mux__remove_out(mosq);
345
0
  }
346
0
#endif
347
0
  return MOSQ_ERR_SUCCESS;
348
0
}
349
350
351
static int read_header(struct mosquitto *mosq, ssize_t (*func_read)(struct mosquitto *, void *, size_t))
352
0
{
353
0
  ssize_t read_length;
354
355
0
  read_length = func_read(mosq, &mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos], mosq->in_packet.packet_buffer_size-mosq->in_packet.packet_buffer_pos);
356
0
  if(read_length > 0){
357
0
    mosq->in_packet.packet_buffer_to_process = (uint16_t)read_length;
358
0
#ifdef WITH_BROKER
359
0
    metrics__int_inc(mosq_counter_bytes_received, read_length);
360
0
#endif
361
0
  }else{
362
0
    if(read_length == 0){
363
0
      return MOSQ_ERR_CONN_LOST; /* EOF */
364
0
    }
365
0
    WINDOWS_SET_ERRNO();
366
0
    if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
367
0
      return MOSQ_ERR_SUCCESS;
368
0
    }else{
369
0
      switch(errno){
370
0
        case COMPAT_ECONNRESET:
371
0
          return MOSQ_ERR_CONN_LOST;
372
0
        case COMPAT_EINTR:
373
0
          return MOSQ_ERR_SUCCESS;
374
0
        default:
375
0
          return MOSQ_ERR_ERRNO;
376
0
      }
377
0
    }
378
0
  }
379
0
  return MOSQ_ERR_SUCCESS;
380
0
}
381
382
static int packet__read_single(struct mosquitto *mosq, enum mosquitto_client_state state, ssize_t (*local__read)(struct mosquitto *, void *, size_t))
383
0
{
384
0
  ssize_t read_length;
385
0
  int rc = 0;
386
387
0
  if(!mosq->in_packet.command){
388
0
    if(mosq->in_packet.packet_buffer_to_process == 0){
389
0
      rc = read_header(mosq, local__read);
390
0
      if(rc) return rc;
391
0
    }
392
393
0
    if(mosq->in_packet.packet_buffer_to_process > 0){
394
0
      mosq->in_packet.command = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
395
0
      mosq->in_packet.packet_buffer_to_process--;
396
0
      mosq->in_packet.packet_buffer_pos++;
397
0
#ifdef WITH_BROKER
398
      /* Clients must send CONNECT as their first command. */
399
0
      if(!(mosq->bridge) && state == mosq_cs_new && (mosq->in_packet.command&0xF0) != CMD_CONNECT){
400
0
        return MOSQ_ERR_PROTOCOL;
401
0
      }else if((mosq->in_packet.command&0xF0) == CMD_RESERVED){
402
0
        if(mosq->protocol == mosq_p_mqtt5){
403
0
          send__disconnect(mosq, MQTT_RC_PROTOCOL_ERROR, NULL);
404
0
        }
405
0
        return MOSQ_ERR_PROTOCOL;
406
0
      }
407
#else
408
      UNUSED(state);
409
#endif
410
0
    }else{
411
0
      return MOSQ_ERR_SUCCESS;
412
0
    }
413
0
  }
414
  /* remaining_count is the number of bytes that the remaining_length
415
   * parameter occupied in this incoming packet. We don't use it here as such
416
   * (it is used when allocating an outgoing packet), but we must be able to
417
   * determine whether all of the remaining_length parameter has been read.
418
   * remaining_count has three states here:
419
   *   0 means that we haven't read any remaining_length bytes
420
   *   <0 means we have read some remaining_length bytes but haven't finished
421
   *   >0 means we have finished reading the remaining_length bytes.
422
   */
423
0
  if(mosq->in_packet.remaining_count <= 0){
424
0
    uint8_t byte;
425
0
    do{
426
0
      if(mosq->in_packet.packet_buffer_to_process == 0){
427
0
        rc = read_header(mosq, local__read);
428
0
        if(rc) return rc;
429
0
      }
430
431
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
432
0
        mosq->in_packet.remaining_count--;
433
        /* Max 4 bytes length for remaining length as defined by protocol.
434
         * Anything more likely means a broken/malicious client.
435
         */
436
0
        if(mosq->in_packet.remaining_count < -4){
437
0
          return MOSQ_ERR_MALFORMED_PACKET;
438
0
        }
439
440
0
        byte = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
441
0
        mosq->in_packet.packet_buffer_pos++;
442
0
        mosq->in_packet.packet_buffer_to_process--;
443
0
        mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
444
0
        mosq->in_packet.remaining_mult *= 128;
445
0
      }else{
446
0
        return MOSQ_ERR_SUCCESS;
447
0
      }
448
0
    }while((byte & 128) != 0);
449
    /* We have finished reading remaining_length, so make remaining_count
450
     * positive. */
451
0
    mosq->in_packet.remaining_count = (int8_t)(mosq->in_packet.remaining_count * -1);
452
453
0
#ifdef WITH_BROKER
454
0
    switch(mosq->in_packet.command & 0xF0){
455
0
      case CMD_CONNECT:
456
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_connect){
457
0
          return MOSQ_ERR_OVERSIZE_PACKET;
458
0
        }
459
0
        break;
460
461
0
      case CMD_PUBACK:
462
0
      case CMD_PUBREC:
463
0
      case CMD_PUBREL:
464
0
      case CMD_PUBCOMP:
465
0
      case CMD_UNSUBACK:
466
0
        if(mosq->protocol == mosq_p_mqtt5){
467
0
          if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
468
0
            return MOSQ_ERR_OVERSIZE_PACKET;
469
0
          }
470
0
        }else{
471
0
          if(mosq->in_packet.remaining_length != 2){
472
0
            return MOSQ_ERR_MALFORMED_PACKET;
473
0
          }
474
0
        }
475
0
        break;
476
477
0
      case CMD_PINGREQ:
478
0
      case CMD_PINGRESP:
479
0
        if(mosq->in_packet.remaining_length != 0){
480
0
          return MOSQ_ERR_MALFORMED_PACKET;
481
0
        }
482
0
        break;
483
484
0
      case CMD_DISCONNECT:
485
0
        if(mosq->protocol == mosq_p_mqtt5){
486
0
          if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
487
0
            return MOSQ_ERR_OVERSIZE_PACKET;
488
0
          }
489
0
        }else{
490
0
          if(mosq->in_packet.remaining_length != 0){
491
0
            return MOSQ_ERR_MALFORMED_PACKET;
492
0
          }
493
0
        }
494
0
        break;
495
496
0
      case CMD_SUBSCRIBE:
497
0
      case CMD_UNSUBSCRIBE:
498
0
        if(mosq->protocol == mosq_p_mqtt5 && mosq->in_packet.remaining_length > db.config->packet_max_sub){
499
0
          return MOSQ_ERR_OVERSIZE_PACKET;
500
0
        }
501
0
        break;
502
503
0
      case CMD_AUTH:
504
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_auth){
505
0
          return MOSQ_ERR_OVERSIZE_PACKET;
506
0
        }
507
0
        break;
508
509
0
    }
510
511
0
    if(db.config->max_packet_size > 0 && mosq->in_packet.remaining_length+1 > db.config->max_packet_size){
512
0
      if(mosq->protocol == mosq_p_mqtt5){
513
0
        send__disconnect(mosq, MQTT_RC_PACKET_TOO_LARGE, NULL);
514
0
      }
515
0
      return MOSQ_ERR_OVERSIZE_PACKET;
516
0
    }
517
#else
518
    /* FIXME - client case for incoming message received from broker too large */
519
#endif
520
0
    if(mosq->in_packet.remaining_length > 0){
521
0
      mosq->in_packet.payload = mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
522
0
      if(!mosq->in_packet.payload){
523
0
        return MOSQ_ERR_NOMEM;
524
0
      }
525
526
0
      mosq->in_packet.pos = 0;
527
0
      mosq->in_packet.to_process = mosq->in_packet.remaining_length;
528
529
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
530
0
        uint32_t len;
531
0
        if(mosq->in_packet.packet_buffer_to_process > mosq->in_packet.remaining_length){
532
0
          len = mosq->in_packet.remaining_length;
533
0
        }else{
534
0
          len = mosq->in_packet.packet_buffer_to_process;
535
0
        }
536
0
        memcpy(mosq->in_packet.payload, &mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos], len);
537
0
        if(len < mosq->in_packet.packet_buffer_to_process){
538
0
          mosq->in_packet.packet_buffer_pos += (uint16_t)len;
539
0
          mosq->in_packet.packet_buffer_to_process -= (uint16_t)len;
540
0
        }else{
541
0
          mosq->in_packet.packet_buffer_pos = 0;
542
0
          mosq->in_packet.packet_buffer_to_process = 0;
543
0
        }
544
0
        mosq->in_packet.pos += len;
545
0
        mosq->in_packet.to_process -= len;
546
0
      }
547
0
    }
548
0
  }
549
0
  while(mosq->in_packet.to_process>0){
550
0
    read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
551
0
    if(read_length > 0){
552
0
      metrics__int_inc(mosq_counter_bytes_received, read_length);
553
0
      mosq->in_packet.to_process -= (uint32_t)read_length;
554
0
      mosq->in_packet.pos += (uint32_t)read_length;
555
0
    }else{
556
0
      WINDOWS_SET_ERRNO();
557
0
      if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
558
0
        if(mosq->in_packet.to_process > 1000){
559
          /* Update last_msg_in time if more than 1000 bytes left to
560
           * receive. Helps when receiving large messages.
561
           * This is an arbitrary limit, but with some consideration.
562
           * If a client can't send 1000 bytes in a second it
563
           * probably shouldn't be using a 1 second keep alive. */
564
0
#ifdef WITH_BROKER
565
0
          keepalive__update(mosq);
566
#else
567
          COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
568
          mosq->last_msg_in = mosquitto_time();
569
          COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
570
#endif
571
0
        }
572
0
        return MOSQ_ERR_SUCCESS;
573
0
      }else{
574
0
        switch(errno){
575
0
          case COMPAT_ECONNRESET:
576
0
            return MOSQ_ERR_CONN_LOST;
577
0
          case COMPAT_EINTR:
578
0
            return MOSQ_ERR_SUCCESS;
579
0
          default:
580
0
            return MOSQ_ERR_ERRNO;
581
0
        }
582
0
      }
583
0
    }
584
0
  }
585
586
  /* All data for this packet is read. */
587
0
  mosq->in_packet.pos = 0;
588
0
#ifdef WITH_BROKER
589
0
  metrics__int_inc(mosq_counter_messages_received, 1);
590
0
#endif
591
0
  rc = handle__packet(mosq);
592
593
  /* Free data and reset values */
594
0
  packet__cleanup(&mosq->in_packet);
595
596
0
#ifdef WITH_BROKER
597
0
  keepalive__update(mosq);
598
#else
599
  COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
600
  mosq->last_msg_in = mosquitto_time();
601
  COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
602
#endif
603
0
  return rc;
604
0
}
605
606
607
int packet__read(struct mosquitto *mosq)
608
0
{
609
0
  int rc = 0;
610
0
  enum mosquitto_client_state state;
611
0
  ssize_t (*local__read)(struct mosquitto *, void *, size_t);
612
613
0
  if(!mosq){
614
0
    return MOSQ_ERR_INVAL;
615
0
  }
616
0
  if(!net__is_connected(mosq)){
617
0
    return MOSQ_ERR_NO_CONN;
618
0
  }
619
620
0
#if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
621
0
  if(mosq->transport == mosq_t_ws){
622
0
    local__read = net__read_ws;
623
0
  }else
624
0
#endif
625
0
  {
626
0
    local__read = net__read;
627
0
  }
628
629
  /* This gets called if pselect() indicates that there is network data
630
   * available - ie. at least one byte.  What we do depends on what data we
631
   * already have.
632
   * If we've not got a command, attempt to read one and save it. This should
633
   * always work because it's only a single byte.
634
   * Then try to read the remaining length. This may fail because it is may
635
   * be more than one byte - will need to save data pending next read if it
636
   * does fail.
637
   * Then try to read the remaining payload, where 'payload' here means the
638
   * combined variable packet_buffer and actual payload. This is the most likely to
639
   * fail due to longer length, so save current data and current position.
640
   * After all data is read, send to mosquitto__handle_packet() to deal with.
641
   * Finally, free the memory and reset everything to starting conditions.
642
   */
643
0
  do{
644
0
    state = mosquitto__get_state(mosq);
645
0
    if(state == mosq_cs_connect_pending){
646
0
      return MOSQ_ERR_SUCCESS;
647
0
    }
648
0
    rc = packet__read_single(mosq, state, local__read);
649
0
    if(rc) return rc;
650
0
  }while(mosq->in_packet.packet_buffer_to_process > 0);
651
652
0
  return MOSQ_ERR_SUCCESS;
653
0
}