Coverage Report

Created: 2025-11-07 06:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/lib/packet_mosq.c
Line
Count
Source
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <errno.h>
23
#include <string.h>
24
25
#ifdef WITH_BROKER
26
#  include "mosquitto_broker_internal.h"
27
#  if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
28
#    include <libwebsockets.h>
29
#  endif
30
#else
31
#  include "read_handle.h"
32
#endif
33
34
#include "callbacks.h"
35
#include "mosquitto/mqtt_protocol.h"
36
#include "net_mosq.h"
37
#include "packet_mosq.h"
38
#include "read_handle.h"
39
#include "util_mosq.h"
40
#ifdef WITH_BROKER
41
#  include "sys_tree.h"
42
#  include "send_mosq.h"
43
#else
44
#  define metrics__int_inc(stat, val)
45
#  define metrics__int_dec(stat, val)
46
#endif
47
48
49
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
50
2.16k
{
51
2.16k
  uint8_t remaining_bytes[5], byte;
52
2.16k
  int8_t remaining_count;
53
2.16k
  uint32_t packet_length;
54
2.16k
  uint32_t remaining_length_stored;
55
2.16k
  int i;
56
57
2.16k
  assert(packet);
58
59
2.16k
  remaining_length_stored = remaining_length;
60
2.16k
  remaining_count = 0;
61
2.38k
  do{
62
2.38k
    byte = remaining_length % 128;
63
2.38k
    remaining_length = remaining_length / 128;
64
    /* If there are more digits to encode, set the top bit of this digit */
65
2.38k
    if(remaining_length > 0){
66
226
      byte = byte | 0x80;
67
226
    }
68
2.38k
    remaining_bytes[remaining_count] = byte;
69
2.38k
    remaining_count++;
70
2.38k
  }while(remaining_length > 0 && remaining_count < 5);
71
2.16k
  if(remaining_count == 5){
72
0
    return MOSQ_ERR_PAYLOAD_SIZE;
73
0
  }
74
75
2.16k
  packet_length = remaining_length_stored + 1 + (uint8_t)remaining_count;
76
2.16k
  (*packet) = mosquitto_malloc(sizeof(struct mosquitto__packet) + packet_length + WS_PACKET_OFFSET);
77
2.16k
  if((*packet) == NULL){
78
0
    return MOSQ_ERR_NOMEM;
79
0
  }
80
81
  /* Clear memory for everything but the payload - that will be set to valid
82
   * values when the actual payload is copied in. */
83
2.16k
  memset((*packet), 0, sizeof(struct mosquitto__packet));
84
2.16k
  (*packet)->command = command;
85
2.16k
  (*packet)->remaining_length = remaining_length_stored;
86
2.16k
  (*packet)->remaining_count = remaining_count;
87
2.16k
  (*packet)->packet_length = packet_length + WS_PACKET_OFFSET;
88
89
2.16k
  (*packet)->payload[WS_PACKET_OFFSET] = (*packet)->command;
90
4.55k
  for(i=0; i<(*packet)->remaining_count; i++){
91
2.38k
    (*packet)->payload[WS_PACKET_OFFSET+i+1] = remaining_bytes[i];
92
2.38k
  }
93
2.16k
  (*packet)->pos = WS_PACKET_OFFSET + 1U + (uint8_t)(*packet)->remaining_count;
94
95
2.16k
  return MOSQ_ERR_SUCCESS;
96
2.16k
}
97
98
99
void packet__cleanup(struct mosquitto__packet_in *packet)
100
15.2k
{
101
15.2k
  if(!packet){
102
0
    return;
103
0
  }
104
105
  /* Free data and reset values */
106
15.2k
  packet->command = 0;
107
15.2k
  packet->remaining_count = 0;
108
15.2k
  packet->remaining_mult = 1;
109
15.2k
  packet->remaining_length = 0;
110
15.2k
  mosquitto_FREE(packet->payload);
111
15.2k
  packet->to_process = 0;
112
15.2k
  packet->pos = 0;
113
15.2k
}
114
115
116
void packet__cleanup_all_no_locks(struct mosquitto *mosq)
117
0
{
118
0
  struct mosquitto__packet *packet;
119
120
  /* Out packet cleanup */
121
0
  while(mosq->out_packet){
122
0
    packet = mosq->out_packet;
123
    /* Free data and reset values */
124
0
    mosq->out_packet = mosq->out_packet->next;
125
126
0
    mosquitto_FREE(packet);
127
0
  }
128
0
  metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count);
129
0
  metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes);
130
0
  mosq->out_packet_count = 0;
131
0
  mosq->out_packet_bytes = 0;
132
0
  mosq->out_packet_last = NULL;
133
134
0
  packet__cleanup(&mosq->in_packet);
135
0
}
136
137
138
void packet__cleanup_all(struct mosquitto *mosq)
139
0
{
140
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
141
0
  packet__cleanup_all_no_locks(mosq);
142
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
143
0
}
144
145
146
static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packet *packet)
147
2.16k
{
148
2.16k
#ifdef WITH_BROKER
149
2.16k
  if(db.config->max_queued_messages > 0 && mosq->out_packet_count >= db.config->max_queued_messages){
150
0
    mosquitto_free(packet);
151
0
    if(mosq->is_dropping == false){
152
0
      mosq->is_dropping = true;
153
0
      log__printf(NULL, MOSQ_LOG_NOTICE,
154
0
          "Outgoing messages are being dropped for client %s.",
155
0
          mosq->id);
156
0
    }
157
0
    metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
158
0
    return;
159
0
  }
160
2.16k
#endif
161
162
2.16k
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
163
2.16k
  if(mosq->out_packet){
164
0
    mosq->out_packet_last->next = packet;
165
2.16k
  }else{
166
2.16k
    mosq->out_packet = packet;
167
2.16k
  }
168
2.16k
  mosq->out_packet_last = packet;
169
2.16k
  mosq->out_packet_count++;
170
2.16k
  mosq->out_packet_bytes += packet->packet_length;
171
2.16k
  metrics__int_inc(mosq_gauge_out_packets, 1);
172
2.16k
  metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length);
173
2.16k
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
174
2.16k
}
175
176
177
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
178
2.16k
{
179
#ifndef WITH_BROKER
180
  char sockpair_data = 0;
181
#endif
182
2.16k
  assert(mosq);
183
2.16k
  assert(packet);
184
185
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_LWS
186
  if(mosq->wsi){
187
    packet->next = NULL;
188
    packet->pos = WS_PACKET_OFFSET;
189
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
190
191
    packet__queue_append(mosq, packet);
192
193
    lws_callback_on_writable(mosq->wsi);
194
    return MOSQ_ERR_SUCCESS;
195
  }else
196
#elif defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
197
2.16k
  if(mosq->transport == mosq_t_ws){
198
0
    ws__prepare_packet(mosq, packet);
199
0
  }else
200
2.16k
#endif
201
2.16k
  {
202
    /* Normal TCP */
203
2.16k
    packet->next = NULL;
204
2.16k
    packet->pos = WS_PACKET_OFFSET;
205
2.16k
    packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
206
2.16k
  }
207
208
2.16k
  packet__queue_append(mosq, packet);
209
210
2.16k
#ifdef WITH_BROKER
211
2.16k
  return packet__write(mosq);
212
#else
213
  /* Write a single byte to sockpairW (connected to sockpairR) to break out
214
   * of select() if in threaded mode. */
215
  if(mosq->sockpairW != INVALID_SOCKET){
216
#  ifndef WIN32
217
    if(write(mosq->sockpairW, &sockpair_data, 1)){
218
    }
219
#  else
220
    send(mosq->sockpairW, &sockpair_data, 1, 0);
221
#  endif
222
  }
223
224
  if(mosq->callback_depth == 0 && mosq->threaded == mosq_ts_none){
225
    return packet__write(mosq);
226
  }else{
227
    return MOSQ_ERR_SUCCESS;
228
  }
229
#endif
230
2.16k
}
231
232
233
int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
234
1.45k
{
235
1.45k
  uint32_t len;
236
237
1.45k
  if(mosq->maximum_packet_size == 0){
238
1.34k
    return MOSQ_ERR_SUCCESS;
239
1.34k
  }
240
241
103
  len = remaining_length + mosquitto_varint_bytes(remaining_length);
242
103
  if(len > mosq->maximum_packet_size){
243
6
    return MOSQ_ERR_OVERSIZE_PACKET;
244
97
  }else{
245
97
    return MOSQ_ERR_SUCCESS;
246
97
  }
247
103
}
248
249
struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
250
0
{
251
0
  struct mosquitto__packet *packet = NULL;
252
253
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
254
0
  if(mosq->out_packet){
255
0
    mosq->out_packet_count--;
256
0
    mosq->out_packet_bytes -= mosq->out_packet->packet_length;
257
0
    metrics__int_dec(mosq_gauge_out_packets, 1);
258
0
    metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length);
259
260
0
    mosq->out_packet = mosq->out_packet->next;
261
0
    if(!mosq->out_packet){
262
0
      mosq->out_packet_last = NULL;
263
0
    }
264
0
    packet = mosq->out_packet;
265
0
  }
266
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
267
268
0
  return packet;
269
0
}
270
271
272
int packet__write(struct mosquitto *mosq)
273
2.16k
{
274
2.16k
  ssize_t write_length;
275
2.16k
  struct mosquitto__packet *packet, *next_packet;
276
2.16k
  enum mosquitto_client_state state;
277
278
2.16k
  if(!mosq){
279
0
    return MOSQ_ERR_INVAL;
280
0
  }
281
2.16k
  if(!net__is_connected(mosq)){
282
2.16k
    return MOSQ_ERR_NO_CONN;
283
2.16k
  }
284
285
0
  COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
286
0
  packet = mosq->out_packet;
287
0
  COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
288
289
0
  if(packet == NULL){
290
0
    return MOSQ_ERR_SUCCESS;
291
0
  }
292
293
0
#ifdef WITH_BROKER
294
0
  mux__add_out(mosq);
295
0
#endif
296
297
0
  state = mosquitto__get_state(mosq);
298
0
  if(state == mosq_cs_connect_pending){
299
0
    return MOSQ_ERR_SUCCESS;
300
0
  }
301
302
0
  while(packet){
303
0
    while(packet->to_process > 0){
304
0
      write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
305
0
      if(write_length > 0){
306
0
        metrics__int_inc(mosq_counter_bytes_sent, write_length);
307
0
        packet->to_process -= (uint32_t)write_length;
308
0
        packet->pos += (uint32_t)write_length;
309
0
      }else{
310
0
        WINDOWS_SET_ERRNO();
311
312
0
        if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK
313
#ifdef WIN32
314
            || errno == WSAENOTCONN
315
#endif
316
0
            ){
317
0
          return MOSQ_ERR_SUCCESS;
318
0
        }else{
319
0
          switch(errno){
320
0
            case COMPAT_ECONNRESET:
321
0
              return MOSQ_ERR_CONN_LOST;
322
0
            case COMPAT_EINTR:
323
0
              return MOSQ_ERR_SUCCESS;
324
0
            case EPROTO:
325
0
              return MOSQ_ERR_TLS;
326
0
            default:
327
0
              return MOSQ_ERR_ERRNO;
328
0
          }
329
0
        }
330
0
      }
331
0
    }
332
333
0
    metrics__int_inc(mosq_counter_messages_sent, 1);
334
0
    if(((packet->command)&0xF6) == CMD_PUBLISH){
335
#ifndef WITH_BROKER
336
      callback__on_publish(mosq, packet->mid, 0, NULL);
337
    }else if(((packet->command)&0xF0) == CMD_DISCONNECT){
338
      net__socket_shutdown(mosq);
339
      return MOSQ_ERR_SUCCESS;
340
#endif
341
0
    }
342
343
0
    next_packet = packet__get_next_out(mosq);
344
0
    mosquitto_FREE(packet);
345
0
    packet = next_packet;
346
347
0
#ifdef WITH_BROKER
348
0
    mosq->next_msg_out = db.now_s + mosq->keepalive;
349
#else
350
    COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
351
    mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
352
    COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
353
#endif
354
0
  }
355
0
#ifdef WITH_BROKER
356
0
  if(mosq->out_packet == NULL){
357
0
    mux__remove_out(mosq);
358
0
  }
359
0
#endif
360
0
  return MOSQ_ERR_SUCCESS;
361
0
}
362
363
364
static int read_header(struct mosquitto *mosq, ssize_t (*func_read)(struct mosquitto *, void *, size_t))
365
0
{
366
0
  ssize_t read_length;
367
368
0
  read_length = func_read(mosq, &mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos], mosq->in_packet.packet_buffer_size-mosq->in_packet.packet_buffer_pos);
369
0
  if(read_length > 0){
370
0
    mosq->in_packet.packet_buffer_to_process = (uint16_t)read_length;
371
0
#ifdef WITH_BROKER
372
0
    metrics__int_inc(mosq_counter_bytes_received, read_length);
373
0
#endif
374
0
  }else{
375
0
    if(read_length == 0){
376
0
      return MOSQ_ERR_CONN_LOST; /* EOF */
377
0
    }
378
0
    WINDOWS_SET_ERRNO();
379
0
    if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
380
0
      return MOSQ_ERR_SUCCESS;
381
0
    }else{
382
0
      switch(errno){
383
0
        case COMPAT_ECONNRESET:
384
0
          return MOSQ_ERR_CONN_LOST;
385
0
        case COMPAT_EINTR:
386
0
          return MOSQ_ERR_SUCCESS;
387
0
        default:
388
0
          return MOSQ_ERR_ERRNO;
389
0
      }
390
0
    }
391
0
  }
392
0
  return MOSQ_ERR_SUCCESS;
393
0
}
394
395
396
static int packet__read_single(struct mosquitto *mosq, enum mosquitto_client_state state, ssize_t (*local__read)(struct mosquitto *, void *, size_t))
397
0
{
398
0
  ssize_t read_length;
399
0
  int rc = 0;
400
401
0
  if(!mosq->in_packet.command){
402
0
    if(mosq->in_packet.packet_buffer_to_process == 0){
403
0
      rc = read_header(mosq, local__read);
404
0
      if(rc){
405
0
        return rc;
406
0
      }
407
0
    }
408
409
0
    if(mosq->in_packet.packet_buffer_to_process > 0){
410
0
      mosq->in_packet.command = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
411
0
      mosq->in_packet.packet_buffer_to_process--;
412
0
      mosq->in_packet.packet_buffer_pos++;
413
0
#ifdef WITH_BROKER
414
      /* Clients must send CONNECT as their first command. */
415
0
      if(!(mosq->bridge) && state == mosq_cs_new && (mosq->in_packet.command&0xF0) != CMD_CONNECT){
416
0
        return MOSQ_ERR_PROTOCOL;
417
0
      }else if((mosq->in_packet.command&0xF0) == CMD_RESERVED){
418
0
        if(mosq->protocol == mosq_p_mqtt5){
419
0
          send__disconnect(mosq, MQTT_RC_PROTOCOL_ERROR, NULL);
420
0
        }
421
0
        return MOSQ_ERR_PROTOCOL;
422
0
      }
423
#else
424
      UNUSED(state);
425
#endif
426
0
    }else{
427
0
      return MOSQ_ERR_SUCCESS;
428
0
    }
429
0
  }
430
  /* remaining_count is the number of bytes that the remaining_length
431
   * parameter occupied in this incoming packet. We don't use it here as such
432
   * (it is used when allocating an outgoing packet), but we must be able to
433
   * determine whether all of the remaining_length parameter has been read.
434
   * remaining_count has three states here:
435
   *   0 means that we haven't read any remaining_length bytes
436
   *   <0 means we have read some remaining_length bytes but haven't finished
437
   *   >0 means we have finished reading the remaining_length bytes.
438
   */
439
0
  if(mosq->in_packet.remaining_count <= 0){
440
0
    uint8_t byte;
441
0
    do{
442
0
      if(mosq->in_packet.packet_buffer_to_process == 0){
443
0
        rc = read_header(mosq, local__read);
444
0
        if(rc){
445
0
          return rc;
446
0
        }
447
0
      }
448
449
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
450
0
        mosq->in_packet.remaining_count--;
451
        /* Max 4 bytes length for remaining length as defined by protocol.
452
         * Anything more likely means a broken/malicious client.
453
         */
454
0
        if(mosq->in_packet.remaining_count < -4){
455
0
          return MOSQ_ERR_MALFORMED_PACKET;
456
0
        }
457
458
0
        byte = mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos];
459
0
        mosq->in_packet.packet_buffer_pos++;
460
0
        mosq->in_packet.packet_buffer_to_process--;
461
0
        mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
462
0
        mosq->in_packet.remaining_mult *= 128;
463
0
      }else{
464
0
        return MOSQ_ERR_SUCCESS;
465
0
      }
466
0
    }while((byte & 128) != 0);
467
    /* We have finished reading remaining_length, so make remaining_count
468
     * positive. */
469
0
    mosq->in_packet.remaining_count = (int8_t)(mosq->in_packet.remaining_count * -1);
470
471
0
#ifdef WITH_BROKER
472
0
    switch(mosq->in_packet.command & 0xF0){
473
0
      case CMD_CONNECT:
474
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_connect){
475
0
          return MOSQ_ERR_OVERSIZE_PACKET;
476
0
        }
477
0
        break;
478
479
0
      case CMD_PUBACK:
480
0
      case CMD_PUBREC:
481
0
      case CMD_PUBREL:
482
0
      case CMD_PUBCOMP:
483
0
      case CMD_UNSUBACK:
484
0
        if(mosq->protocol == mosq_p_mqtt5){
485
0
          if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
486
0
            return MOSQ_ERR_OVERSIZE_PACKET;
487
0
          }
488
0
        }else{
489
0
          if(mosq->in_packet.remaining_length != 2){
490
0
            return MOSQ_ERR_MALFORMED_PACKET;
491
0
          }
492
0
        }
493
0
        break;
494
495
0
      case CMD_PINGREQ:
496
0
      case CMD_PINGRESP:
497
0
        if(mosq->in_packet.remaining_length != 0){
498
0
          return MOSQ_ERR_MALFORMED_PACKET;
499
0
        }
500
0
        break;
501
502
0
      case CMD_DISCONNECT:
503
0
        if(mosq->protocol == mosq_p_mqtt5){
504
0
          if(mosq->in_packet.remaining_length > db.config->packet_max_simple){
505
0
            return MOSQ_ERR_OVERSIZE_PACKET;
506
0
          }
507
0
        }else{
508
0
          if(mosq->in_packet.remaining_length != 0){
509
0
            return MOSQ_ERR_MALFORMED_PACKET;
510
0
          }
511
0
        }
512
0
        break;
513
514
0
      case CMD_SUBSCRIBE:
515
0
      case CMD_UNSUBSCRIBE:
516
0
        if(mosq->protocol == mosq_p_mqtt5 && mosq->in_packet.remaining_length > db.config->packet_max_sub){
517
0
          return MOSQ_ERR_OVERSIZE_PACKET;
518
0
        }
519
0
        break;
520
521
0
      case CMD_AUTH:
522
0
        if(mosq->in_packet.remaining_length > db.config->packet_max_auth){
523
0
          return MOSQ_ERR_OVERSIZE_PACKET;
524
0
        }
525
0
        break;
526
527
0
    }
528
529
0
    if(db.config->max_packet_size > 0 && mosq->in_packet.remaining_length+1 > db.config->max_packet_size){
530
0
      if(mosq->protocol == mosq_p_mqtt5){
531
0
        send__disconnect(mosq, MQTT_RC_PACKET_TOO_LARGE, NULL);
532
0
      }
533
0
      return MOSQ_ERR_OVERSIZE_PACKET;
534
0
    }
535
#else
536
    /* FIXME - client case for incoming message received from broker too large */
537
#endif
538
0
    if(mosq->in_packet.remaining_length > 0){
539
0
      mosq->in_packet.payload = mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
540
0
      if(!mosq->in_packet.payload){
541
0
        return MOSQ_ERR_NOMEM;
542
0
      }
543
544
0
      mosq->in_packet.pos = 0;
545
0
      mosq->in_packet.to_process = mosq->in_packet.remaining_length;
546
547
0
      if(mosq->in_packet.packet_buffer_to_process > 0){
548
0
        uint32_t len;
549
0
        if(mosq->in_packet.packet_buffer_to_process > mosq->in_packet.remaining_length){
550
0
          len = mosq->in_packet.remaining_length;
551
0
        }else{
552
0
          len = mosq->in_packet.packet_buffer_to_process;
553
0
        }
554
0
        memcpy(mosq->in_packet.payload, &mosq->in_packet.packet_buffer[mosq->in_packet.packet_buffer_pos], len);
555
0
        if(len < mosq->in_packet.packet_buffer_to_process){
556
0
          mosq->in_packet.packet_buffer_pos += (uint16_t)len;
557
0
          mosq->in_packet.packet_buffer_to_process -= (uint16_t)len;
558
0
        }else{
559
0
          mosq->in_packet.packet_buffer_pos = 0;
560
0
          mosq->in_packet.packet_buffer_to_process = 0;
561
0
        }
562
0
        mosq->in_packet.pos += len;
563
0
        mosq->in_packet.to_process -= len;
564
0
      }
565
0
    }
566
0
  }
567
0
  while(mosq->in_packet.to_process>0){
568
0
    read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
569
0
    if(read_length > 0){
570
0
      metrics__int_inc(mosq_counter_bytes_received, read_length);
571
0
      mosq->in_packet.to_process -= (uint32_t)read_length;
572
0
      mosq->in_packet.pos += (uint32_t)read_length;
573
0
    }else{
574
0
      WINDOWS_SET_ERRNO();
575
0
      if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
576
0
        if(mosq->in_packet.to_process > 1000){
577
          /* Update last_msg_in time if more than 1000 bytes left to
578
           * receive. Helps when receiving large messages.
579
           * This is an arbitrary limit, but with some consideration.
580
           * If a client can't send 1000 bytes in a second it
581
           * probably shouldn't be using a 1 second keep alive. */
582
0
#ifdef WITH_BROKER
583
0
          keepalive__update(mosq);
584
#else
585
          COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
586
          mosq->last_msg_in = mosquitto_time();
587
          COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
588
#endif
589
0
        }
590
0
        return MOSQ_ERR_SUCCESS;
591
0
      }else{
592
0
        switch(errno){
593
0
          case COMPAT_ECONNRESET:
594
0
            return MOSQ_ERR_CONN_LOST;
595
0
          case COMPAT_EINTR:
596
0
            return MOSQ_ERR_SUCCESS;
597
0
          default:
598
0
            return MOSQ_ERR_ERRNO;
599
0
        }
600
0
      }
601
0
    }
602
0
  }
603
604
  /* All data for this packet is read. */
605
0
  mosq->in_packet.pos = 0;
606
0
#ifdef WITH_BROKER
607
0
  metrics__int_inc(mosq_counter_messages_received, 1);
608
0
#endif
609
0
  rc = handle__packet(mosq);
610
611
  /* Free data and reset values */
612
0
  packet__cleanup(&mosq->in_packet);
613
614
0
#ifdef WITH_BROKER
615
0
  keepalive__update(mosq);
616
#else
617
  COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
618
  mosq->last_msg_in = mosquitto_time();
619
  COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
620
#endif
621
0
  return rc;
622
0
}
623
624
625
int packet__read(struct mosquitto *mosq)
626
0
{
627
0
  int rc = 0;
628
0
  enum mosquitto_client_state state;
629
0
  ssize_t (*local__read)(struct mosquitto *, void *, size_t);
630
631
0
  if(!mosq){
632
0
    return MOSQ_ERR_INVAL;
633
0
  }
634
0
  if(!net__is_connected(mosq)){
635
0
    return MOSQ_ERR_NO_CONN;
636
0
  }
637
638
0
#if defined(WITH_WEBSOCKETS) && WITH_WEBSOCKETS == WS_IS_BUILTIN
639
0
  if(mosq->transport == mosq_t_ws){
640
0
    local__read = net__read_ws;
641
0
  }else
642
0
#endif
643
0
  {
644
0
    local__read = net__read;
645
0
  }
646
647
  /* This gets called if pselect() indicates that there is network data
648
   * available - ie. at least one byte.  What we do depends on what data we
649
   * already have.
650
   * If we've not got a command, attempt to read one and save it. This should
651
   * always work because it's only a single byte.
652
   * Then try to read the remaining length. This may fail because it is may
653
   * be more than one byte - will need to save data pending next read if it
654
   * does fail.
655
   * Then try to read the remaining payload, where 'payload' here means the
656
   * combined variable packet_buffer and actual payload. This is the most likely to
657
   * fail due to longer length, so save current data and current position.
658
   * After all data is read, send to mosquitto__handle_packet() to deal with.
659
   * Finally, free the memory and reset everything to starting conditions.
660
   */
661
0
  do{
662
0
    state = mosquitto__get_state(mosq);
663
0
    if(state == mosq_cs_connect_pending){
664
0
      return MOSQ_ERR_SUCCESS;
665
0
    }
666
0
    rc = packet__read_single(mosq, state, local__read);
667
0
    if(rc){
668
0
      return rc;
669
0
    }
670
0
  }while(mosq->in_packet.packet_buffer_to_process > 0);
671
672
0
  return MOSQ_ERR_SUCCESS;
673
0
}