Coverage Report

Created: 2026-06-01 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/curl/lib/mqtt.c
Line
Count
Source
1
/***************************************************************************
2
 *                                  _   _ ____  _
3
 *  Project                     ___| | | |  _ \| |
4
 *                             / __| | | | |_) | |
5
 *                            | (__| |_| |  _ <| |___
6
 *                             \___|\___/|_| \_\_____|
7
 *
8
 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9
 * Copyright (C) Björn Stenberg, <bjorn@haxx.se>
10
 *
11
 * This software is licensed as described in the file COPYING, which
12
 * you should have received as part of this distribution. The terms
13
 * are also available at https://curl.se/docs/copyright.html.
14
 *
15
 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16
 * copies of the Software, and permit persons to whom the Software is
17
 * furnished to do so, under the terms of the COPYING file.
18
 *
19
 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20
 * KIND, either express or implied.
21
 *
22
 * SPDX-License-Identifier: curl
23
 *
24
 ***************************************************************************/
25
#include "curl_setup.h"
26
#include "urldata.h"
27
28
#ifndef CURL_DISABLE_MQTT
29
30
#include "transfer.h"
31
#include "sendf.h"
32
#include "curl_trc.h"
33
#include "progress.h"
34
#include "mqtt.h"
35
#include "select.h"
36
#include "url.h"
37
#include "escape.h"
38
#include "rand.h"
39
#include "cfilters.h"
40
#include "connect.h"
41
42
/* first byte is command.
43
   second byte is for flags. */
44
1.33k
#define MQTT_MSG_CONNECT    0x10
45
/* #define MQTT_MSG_CONNACK    0x20 */
46
765
#define MQTT_MSG_PUBLISH    0x30
47
19
#define MQTT_MSG_SUBSCRIBE  0x82
48
57
#define MQTT_MSG_SUBACK     0x90
49
26.2k
#define MQTT_MSG_DISCONNECT 0xe0
50
/* #define MQTT_MSG_PINGREQ    0xC0 */
51
16.5k
#define MQTT_MSG_PINGRESP   0xD0
52
53
186
#define MQTT_CONNACK_LEN  2
54
49
#define MQTT_SUBACK_LEN   3
55
7.99k
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
56
57
/* meta key for storing protocol meta at easy handle */
58
35.9k
#define CURL_META_MQTT_EASY   "meta:proto:mqtt:easy"
59
/* meta key for storing protocol meta at connection */
60
54.0k
#define CURL_META_MQTT_CONN   "meta:proto:mqtt:conn"
61
62
enum mqttstate {
63
  MQTT_FIRST,             /* 0 */
64
  MQTT_REMAINING_LENGTH,  /* 1 */
65
  MQTT_CONNACK,           /* 2 */
66
  MQTT_SUBACK,            /* 3 */
67
  MQTT_SUBACK_COMING,     /* 4 - the SUBACK remainder */
68
  MQTT_PUBWAIT,    /* 5 - wait for publish */
69
  MQTT_PUB_REMAIN,  /* 6 - wait for the remainder of the publish */
70
71
  MQTT_NOSTATE /* 7 - never used an actual state */
72
};
73
74
struct mqtt_conn {
75
  enum mqttstate state;
76
  enum mqttstate nextstate; /* switch to this after remaining length is
77
                               done */
78
  unsigned int packetid;
79
};
80
81
/* protocol-specific transfer-related data */
82
struct MQTT {
83
  struct dynbuf sendbuf;
84
  /* when receiving */
85
  struct dynbuf recvbuf;
86
  size_t npacket; /* byte counter */
87
  size_t remaining_length;
88
  unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
89
  struct curltime lastTime; /* last time we sent or received data */
90
  unsigned char firstbyte;
91
  BIT(pingsent); /* 1 while we wait for ping response */
92
};
93
94
static void mqtt_easy_dtor(void *key, size_t klen, void *entry)
95
6.01k
{
96
6.01k
  struct MQTT *mq = entry;
97
6.01k
  (void)key;
98
6.01k
  (void)klen;
99
6.01k
  curlx_dyn_free(&mq->sendbuf);
100
6.01k
  curlx_dyn_free(&mq->recvbuf);
101
6.01k
  curlx_free(mq);
102
6.01k
}
103
104
static void mqtt_conn_dtor(void *key, size_t klen, void *entry)
105
6.01k
{
106
6.01k
  (void)key;
107
6.01k
  (void)klen;
108
6.01k
  curlx_free(entry);
109
6.01k
}
110
111
static CURLcode mqtt_setup_conn(struct Curl_easy *data,
112
                                struct connectdata *conn)
113
6.01k
{
114
  /* setup MQTT specific meta data at easy handle and connection */
115
6.01k
  struct mqtt_conn *mqtt;
116
6.01k
  struct MQTT *mq;
117
118
6.01k
  mqtt = curlx_calloc(1, sizeof(*mqtt));
119
6.01k
  if(!mqtt ||
120
6.01k
     Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor))
121
0
    return CURLE_OUT_OF_MEMORY;
122
123
6.01k
  mq = curlx_calloc(1, sizeof(struct MQTT));
124
6.01k
  if(!mq)
125
0
    return CURLE_OUT_OF_MEMORY;
126
6.01k
  curlx_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
127
6.01k
  curlx_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
128
6.01k
  if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor))
129
0
    return CURLE_OUT_OF_MEMORY;
130
6.01k
  return CURLE_OK;
131
6.01k
}
132
133
static CURLcode mqtt_send(struct Curl_easy *data,
134
                          const char *buf, size_t len)
135
1.46k
{
136
1.46k
  size_t n;
137
1.46k
  CURLcode result;
138
1.46k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
139
140
1.46k
  if(!mq)
141
0
    return CURLE_FAILED_INIT;
142
143
1.46k
  result = Curl_xfer_send(data, buf, len, FALSE, &n);
144
1.46k
  if(result)
145
0
    return result;
146
1.46k
  mq->lastTime = *Curl_pgrs_now(data);
147
1.46k
  Curl_debug(data, CURLINFO_HEADER_OUT, buf, n);
148
1.46k
  if(len != n) {
149
0
    size_t nsend = len - n;
150
0
    if(curlx_dyn_len(&mq->sendbuf)) {
151
0
      DEBUGASSERT(curlx_dyn_len(&mq->sendbuf) >= nsend);
152
0
      result = curlx_dyn_tail(&mq->sendbuf, nsend); /* keep this much */
153
0
    }
154
0
    else {
155
0
      result = curlx_dyn_addn(&mq->sendbuf, &buf[n], nsend);
156
0
    }
157
0
  }
158
1.46k
  else
159
1.46k
    curlx_dyn_reset(&mq->sendbuf);
160
1.46k
  return result;
161
1.46k
}
162
163
/* Generic function called by the multi interface to figure out what socket(s)
164
   to wait for and for what actions during the DOING and PROTOCONNECT
165
   states */
166
static CURLcode mqtt_pollset(struct Curl_easy *data,
167
                             struct easy_pollset *ps)
168
11.0k
{
169
11.0k
  return Curl_pollset_add_in(data, ps, data->conn->sock[FIRSTSOCKET]);
170
11.0k
}
171
172
static int mqtt_encode_len(char *buf, size_t len)
173
1.37k
{
174
1.37k
  int i;
175
176
2.84k
  for(i = 0; (len > 0) && (i < 4); i++) {
177
1.47k
    unsigned char encoded;
178
1.47k
    encoded = len % 0x80;
179
1.47k
    len /= 0x80;
180
1.47k
    if(len)
181
107
      encoded |= 0x80;
182
1.47k
    buf[i] = (char)encoded;
183
1.47k
  }
184
185
1.37k
  return i;
186
1.37k
}
187
188
/* add the passwd to the CONNECT packet */
189
static int add_passwd(const char *passwd, const size_t plen,
190
                      char *pkt, const size_t start, int remain_pos)
191
29
{
192
  /* magic number that need to be set properly */
193
29
  const size_t conn_flags_pos = remain_pos + 8;
194
29
  if(plen > 0xffff)
195
2
    return 1;
196
197
  /* set password flag */
198
27
  pkt[conn_flags_pos] |= 0x40;
199
200
  /* length of password provided */
201
27
  pkt[start] = (char)((plen >> 8) & 0xFF);
202
27
  pkt[start + 1] = (char)(plen & 0xFF);
203
27
  memcpy(&pkt[start + 2], passwd, plen);
204
27
  return 0;
205
29
}
206
207
/* add user to the CONNECT packet */
208
static int add_user(const char *username, const size_t ulen,
209
                    unsigned char *pkt, const size_t start, int remain_pos)
210
51
{
211
  /* magic number that need to be set properly */
212
51
  const size_t conn_flags_pos = remain_pos + 8;
213
51
  if(ulen > 0xffff)
214
5
    return 1;
215
216
  /* set username flag */
217
46
  pkt[conn_flags_pos] |= 0x80;
218
  /* length of username provided */
219
46
  pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
220
46
  pkt[start + 1] = (unsigned char)(ulen & 0xFF);
221
46
  memcpy(&pkt[start + 2], username, ulen);
222
46
  return 0;
223
51
}
224
225
/* add client ID to the CONNECT packet */
226
static int add_client_id(const char *client_id, const size_t client_id_len,
227
                         char *pkt, const size_t start)
228
1.33k
{
229
1.33k
  if(client_id_len != MQTT_CLIENTID_LEN)
230
0
    return 1;
231
1.33k
  pkt[start] = 0x00;
232
1.33k
  pkt[start + 1] = MQTT_CLIENTID_LEN;
233
1.33k
  memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
234
1.33k
  return 0;
235
1.33k
}
236
237
/* Set initial values of CONNECT packet */
238
static int init_connpack(char *packet, char *remain, int remain_pos)
239
1.33k
{
240
  /* Fixed header starts */
241
  /* packet type */
242
1.33k
  packet[0] = MQTT_MSG_CONNECT;
243
  /* remaining length field */
244
1.33k
  memcpy(&packet[1], remain, remain_pos);
245
  /* Fixed header ends */
246
247
  /* Variable header starts */
248
  /* protocol length */
249
1.33k
  packet[remain_pos + 1] = 0x00;
250
1.33k
  packet[remain_pos + 2] = 0x04;
251
  /* protocol name */
252
1.33k
  packet[remain_pos + 3] = 'M';
253
1.33k
  packet[remain_pos + 4] = 'Q';
254
1.33k
  packet[remain_pos + 5] = 'T';
255
1.33k
  packet[remain_pos + 6] = 'T';
256
  /* protocol level */
257
1.33k
  packet[remain_pos + 7] = 0x04;
258
  /* CONNECT flag: CleanSession */
259
1.33k
  packet[remain_pos + 8] = 0x02;
260
  /* keep-alive 0 = disabled */
261
1.33k
  packet[remain_pos + 9] = 0x00;
262
1.33k
  packet[remain_pos + 10] = 0x3c;
263
  /* end of variable header */
264
1.33k
  return remain_pos + 10;
265
1.33k
}
266
267
static CURLcode mqtt_connect(struct Curl_easy *data)
268
1.33k
{
269
1.33k
  CURLcode result = CURLE_OK;
270
1.33k
  int pos = 0;
271
1.33k
  int rc = 0;
272
  /* remain length */
273
1.33k
  int remain_pos = 0;
274
1.33k
  char remain[4] = { 0 };
275
1.33k
  size_t packetlen = 0;
276
1.33k
  size_t start_user = 0;
277
1.33k
  size_t start_pwd = 0;
278
1.33k
  char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
279
1.33k
  const size_t clen = strlen("curl");
280
1.33k
  char *packet = NULL;
281
282
  /* extracting username from request */
283
1.33k
  struct Curl_creds *creds = data->state.creds;
284
1.33k
  const size_t ulen = creds ? strlen(creds->user) : 0;
285
1.33k
  const size_t plen = creds ? strlen(creds->passwd) : 0;
286
1.33k
  const size_t payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2 +
287
  /* The plus 2s below are for the MSB and LSB describing the length of the
288
     string to be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
289
1.33k
    (ulen ? 2 : 0) +
290
1.33k
    (plen ? 2 : 0);
291
292
  /* getting how much occupy the remain length */
293
1.33k
  remain_pos = mqtt_encode_len(remain, payloadlen + 10);
294
295
  /* 10 length of variable header and 1 the first byte of the fixed header */
296
1.33k
  packetlen = payloadlen + 10 + remain_pos + 1;
297
298
  /* allocating packet */
299
1.33k
  if(packetlen > 0xFFFFFFF)
300
0
    return CURLE_WEIRD_SERVER_REPLY;
301
1.33k
  packet = curlx_calloc(1, packetlen);
302
1.33k
  if(!packet)
303
0
    return CURLE_OUT_OF_MEMORY;
304
305
  /* set initial values for the CONNECT packet */
306
1.33k
  pos = init_connpack(packet, remain, remain_pos);
307
308
1.33k
  result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen],
309
1.33k
                           MQTT_CLIENTID_LEN - clen + 1);
310
  /* add client id */
311
1.33k
  rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
312
1.33k
  if(rc) {
313
0
    failf(data, "Client ID length mismatched: [%zu]", strlen(client_id));
314
0
    result = CURLE_WEIRD_SERVER_REPLY;
315
0
    goto end;
316
0
  }
317
1.33k
  infof(data, "Using client id '%s'", client_id);
318
319
  /* position where the user payload starts */
320
1.33k
  start_user = pos + 3 + MQTT_CLIENTID_LEN;
321
  /* position where the password payload starts */
322
1.33k
  start_pwd = start_user + ulen;
323
  /* if username was provided, add it to the packet */
324
1.33k
  if(ulen) {
325
51
    start_pwd += 2;
326
327
51
    rc = add_user(creds->user, ulen,
328
51
                  (unsigned char *)packet, start_user, remain_pos);
329
51
    if(rc) {
330
5
      failf(data, "Username too long: [%zu]", ulen);
331
5
      result = CURLE_WEIRD_SERVER_REPLY;
332
5
      goto end;
333
5
    }
334
51
  }
335
336
  /* if passwd was provided, add it to the packet */
337
1.32k
  if(plen) {
338
29
    rc = add_passwd(creds->passwd, plen, packet, start_pwd, remain_pos);
339
29
    if(rc) {
340
2
      failf(data, "Password too long: [%zu]", plen);
341
2
      result = CURLE_WEIRD_SERVER_REPLY;
342
2
      goto end;
343
2
    }
344
29
  }
345
346
1.32k
  if(!result)
347
1.32k
    result = mqtt_send(data, packet, packetlen);
348
349
1.33k
end:
350
1.33k
  if(packet)
351
1.33k
    curlx_free(packet);
352
1.33k
  Curl_creds_unlink(&data->state.creds);
353
1.33k
  return result;
354
1.32k
}
355
356
static CURLcode mqtt_disconnect(struct Curl_easy *data)
357
19
{
358
19
  return mqtt_send(data, "\xe0\x00", 2);
359
19
}
360
361
static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
362
88
{
363
88
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
364
88
  size_t rlen;
365
88
  CURLcode result;
366
367
88
  if(!mq)
368
0
    return CURLE_FAILED_INIT;
369
88
  rlen = curlx_dyn_len(&mq->recvbuf);
370
371
88
  if(rlen < nbytes) {
372
88
    unsigned char readbuf[1024];
373
88
    size_t nread;
374
375
88
    DEBUGASSERT(nbytes - rlen < sizeof(readbuf));
376
88
    result = Curl_xfer_recv(data, (char *)readbuf, nbytes - rlen, &nread);
377
88
    if(result)
378
3
      return result;
379
85
    if(!nread) /* EOF */
380
4
       return CURLE_RECV_ERROR;
381
81
    if(curlx_dyn_addn(&mq->recvbuf, readbuf, nread))
382
0
      return CURLE_OUT_OF_MEMORY;
383
81
    rlen = curlx_dyn_len(&mq->recvbuf);
384
81
  }
385
81
  return (rlen >= nbytes) ? CURLE_OK : CURLE_AGAIN;
386
88
}
387
388
static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
389
68
{
390
68
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
391
68
  DEBUGASSERT(mq);
392
68
  if(mq) {
393
68
    size_t rlen = curlx_dyn_len(&mq->recvbuf);
394
68
    if(rlen <= nbytes)
395
68
      curlx_dyn_reset(&mq->recvbuf);
396
0
    else
397
0
      curlx_dyn_tail(&mq->recvbuf, rlen - nbytes);
398
68
  }
399
68
}
400
401
static CURLcode mqtt_verify_connack(struct Curl_easy *data)
402
103
{
403
103
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
404
103
  CURLcode result;
405
103
  const char *ptr;
406
407
103
  DEBUGASSERT(mq);
408
103
  if(!mq)
409
0
    return CURLE_FAILED_INIT;
410
103
  if(mq->remaining_length != 2) {
411
34
    failf(data, "CONNACK expected Remaining Length 2, got %zu",
412
34
          mq->remaining_length);
413
34
    return CURLE_WEIRD_SERVER_REPLY;
414
34
  }
415
416
69
  result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
417
69
  if(result)
418
8
    return result;
419
420
  /* verify CONNACK */
421
61
  DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
422
61
  ptr = curlx_dyn_ptr(&mq->recvbuf);
423
61
  Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN);
424
425
61
  if(ptr[0] != 0x00 || ptr[1] != 0x00) {
426
5
    failf(data, "Expected %02x%02x but got %02x%02x",
427
5
          0x00, 0x00, ptr[0], ptr[1]);
428
5
    curlx_dyn_reset(&mq->recvbuf);
429
5
    return CURLE_WEIRD_SERVER_REPLY;
430
5
  }
431
56
  mqtt_recv_consume(data, MQTT_CONNACK_LEN);
432
56
  return CURLE_OK;
433
61
}
434
435
static CURLcode mqtt_get_topic(struct Curl_easy *data,
436
                               char **topic, size_t *topiclen)
437
55
{
438
55
  const char *path = data->state.up.path;
439
55
  CURLcode result = CURLE_URL_MALFORMAT;
440
55
  if(strlen(path) > 1) {
441
53
    result = Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA);
442
53
    if(!result && (*topiclen > 0xffff)) {
443
15
      failf(data, "Too long MQTT topic");
444
15
      result = CURLE_URL_MALFORMAT;
445
15
    }
446
53
  }
447
2
  else
448
2
    failf(data, "No MQTT topic found. Forgot to URL encode it?");
449
450
55
  return result;
451
55
}
452
453
static CURLcode mqtt_subscribe(struct Curl_easy *data)
454
22
{
455
22
  CURLcode result = CURLE_OK;
456
22
  char *topic = NULL;
457
22
  size_t topiclen;
458
22
  unsigned char *packet = NULL;
459
22
  size_t packetlen;
460
22
  char encodedsize[4];
461
22
  size_t n;
462
22
  struct connectdata *conn = data->conn;
463
22
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
464
465
22
  if(!mqtt)
466
0
    return CURLE_FAILED_INIT;
467
468
22
  result = mqtt_get_topic(data, &topic, &topiclen);
469
22
  if(result)
470
3
    goto fail;
471
472
19
  mqtt->packetid++;
473
474
19
  packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
475
                               + 2 bytes topic length + QoS byte */
476
19
  n = mqtt_encode_len((char *)encodedsize, packetlen);
477
19
  packetlen += n + 1; /* add one for the control packet type byte */
478
479
19
  packet = curlx_malloc(packetlen);
480
19
  if(!packet) {
481
0
    result = CURLE_OUT_OF_MEMORY;
482
0
    goto fail;
483
0
  }
484
485
19
  packet[0] = MQTT_MSG_SUBSCRIBE;
486
19
  memcpy(&packet[1], encodedsize, n);
487
19
  packet[1 + n] = (mqtt->packetid >> 8) & 0xff;
488
19
  packet[2 + n] = mqtt->packetid & 0xff;
489
19
  packet[3 + n] = (topiclen >> 8) & 0xff;
490
19
  packet[4 + n] = topiclen & 0xff;
491
19
  memcpy(&packet[5 + n], topic, topiclen);
492
19
  packet[5 + n + topiclen] = 0; /* QoS zero */
493
494
19
  result = mqtt_send(data, (const char *)packet, packetlen);
495
496
22
fail:
497
22
  curlx_free(topic);
498
22
  curlx_free(packet);
499
22
  return result;
500
19
}
501
502
/*
503
 * Called when the first byte was already read.
504
 */
505
static CURLcode mqtt_verify_suback(struct Curl_easy *data)
506
37
{
507
37
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
508
37
  struct connectdata *conn = data->conn;
509
37
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
510
37
  CURLcode result;
511
37
  const char *ptr;
512
513
37
  if(!mqtt || !mq)
514
0
    return CURLE_FAILED_INIT;
515
516
37
  if(mq->remaining_length != 3) {
517
18
    failf(data, "SUBACK expected Remaining Length 3, got %zu",
518
18
          mq->remaining_length);
519
18
    return CURLE_WEIRD_SERVER_REPLY;
520
18
  }
521
522
19
  result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
523
19
  if(result)
524
1
    goto fail;
525
526
  /* verify SUBACK */
527
18
  DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
528
18
  ptr = curlx_dyn_ptr(&mq->recvbuf);
529
18
  Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN);
530
531
18
  if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) ||
532
16
     ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) ||
533
13
     ptr[2] != 0x00) {
534
6
    curlx_dyn_reset(&mq->recvbuf);
535
6
    result = CURLE_WEIRD_SERVER_REPLY;
536
6
    goto fail;
537
6
  }
538
12
  mqtt_recv_consume(data, MQTT_SUBACK_LEN);
539
19
fail:
540
19
  return result;
541
12
}
542
543
19
#define MAX_MQTT_MESSAGE_SIZE 0xFFFFFFF
544
545
static CURLcode mqtt_publish(struct Curl_easy *data)
546
34
{
547
34
  CURLcode result;
548
34
  char *payload = data->set.postfields;
549
34
  size_t payloadlen;
550
34
  char *topic = NULL;
551
34
  size_t topiclen;
552
34
  unsigned char *pkt = NULL;
553
34
  size_t i = 0;
554
34
  size_t remaininglength;
555
34
  size_t encodelen;
556
34
  char encodedbytes[4];
557
34
  curl_off_t postfieldsize = data->set.postfieldsize;
558
559
34
  if(!payload) {
560
1
    DEBUGF(infof(data, "mqtt_publish without payload, return bad arg"));
561
1
    return CURLE_BAD_FUNCTION_ARGUMENT;
562
1
  }
563
33
  if(!curlx_sotouz_fits(postfieldsize, &payloadlen)) {
564
33
    if(postfieldsize > 0) /* off_t does not fit into size_t */
565
0
      return CURLE_BAD_FUNCTION_ARGUMENT;
566
33
    payloadlen = strlen(payload);
567
33
  }
568
569
33
  result = mqtt_get_topic(data, &topic, &topiclen);
570
33
  if(result)
571
14
    goto fail;
572
573
19
  remaininglength = payloadlen + 2 + topiclen;
574
19
  encodelen = mqtt_encode_len(encodedbytes, remaininglength);
575
19
  if(remaininglength > (MAX_MQTT_MESSAGE_SIZE - encodelen - 1)) {
576
0
    result = CURLE_TOO_LARGE;
577
0
    goto fail;
578
0
  }
579
580
  /* add the control byte and the encoded remaining length */
581
19
  pkt = curlx_malloc(remaininglength + 1 + encodelen);
582
19
  if(!pkt) {
583
0
    result = CURLE_OUT_OF_MEMORY;
584
0
    goto fail;
585
0
  }
586
587
  /* assemble packet */
588
19
  pkt[i++] = MQTT_MSG_PUBLISH;
589
19
  memcpy(&pkt[i], encodedbytes, encodelen);
590
19
  i += encodelen;
591
19
  pkt[i++] = (topiclen >> 8) & 0xff;
592
19
  pkt[i++] = (topiclen & 0xff);
593
19
  memcpy(&pkt[i], topic, topiclen);
594
19
  i += topiclen;
595
19
  memcpy(&pkt[i], payload, payloadlen);
596
19
  i += payloadlen;
597
19
  result = mqtt_send(data, (const char *)pkt, i);
598
599
33
fail:
600
33
  curlx_free(pkt);
601
33
  curlx_free(topic);
602
33
  return result;
603
19
}
604
605
/* return 0 on success, non-zero on error */
606
static int mqtt_decode_len(size_t *lenp, const unsigned char *buf,
607
                           size_t buflen)
608
9.61k
{
609
9.61k
  size_t len = 0;
610
9.61k
  size_t mult = 1;
611
9.61k
  size_t i;
612
9.61k
  unsigned char encoded = 128;
613
614
21.0k
  for(i = 0; (i < buflen) && (encoded & 128); i++) {
615
11.4k
    if(i == 4)
616
0
      return 1; /* bad size */
617
11.4k
    encoded = buf[i];
618
11.4k
    len += (encoded & 127) * mult;
619
11.4k
    mult *= 128;
620
11.4k
  }
621
622
9.61k
  *lenp = len;
623
9.61k
  return 0;
624
9.61k
}
625
626
#if defined(DEBUGBUILD) && defined(CURLVERBOSE)
627
static const char *statenames[] = {
628
  "MQTT_FIRST",
629
  "MQTT_REMAINING_LENGTH",
630
  "MQTT_CONNACK",
631
  "MQTT_SUBACK",
632
  "MQTT_SUBACK_COMING",
633
  "MQTT_PUBWAIT",
634
  "MQTT_PUB_REMAIN",
635
636
  "NOT A STATE"
637
};
638
#endif
639
640
/* The only way to change state */
641
static void mqstate(struct Curl_easy *data,
642
                    enum mqttstate state,
643
                    enum mqttstate nextstate) /* used if state == FIRST */
644
22.4k
{
645
22.4k
  struct connectdata *conn = data->conn;
646
22.4k
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
647
22.4k
  DEBUGASSERT(mqtt);
648
22.4k
  if(!mqtt)
649
0
    return;
650
22.4k
#ifdef DEBUGBUILD
651
22.4k
  infof(data, "%s (from %s) (next is %s)",
652
22.4k
        statenames[state],
653
22.4k
        statenames[mqtt->state],
654
22.4k
        (state == MQTT_FIRST) ? statenames[nextstate] : "");
655
22.4k
#endif
656
22.4k
  mqtt->state = state;
657
22.4k
  if(state == MQTT_FIRST)
658
11.0k
    mqtt->nextstate = nextstate;
659
22.4k
}
660
661
static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
662
1.06k
{
663
1.06k
  CURLcode result = CURLE_OK;
664
1.06k
  struct connectdata *conn = data->conn;
665
1.06k
  size_t nread;
666
1.06k
  size_t remlen;
667
1.06k
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
668
1.06k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
669
1.06k
  unsigned char packet;
670
671
1.06k
  DEBUGASSERT(mqtt);
672
1.06k
  if(!mqtt || !mq)
673
0
    return CURLE_FAILED_INIT;
674
675
1.06k
  switch(mqtt->state) {
676
37
MQTT_SUBACK_COMING:
677
37
  case MQTT_SUBACK_COMING:
678
37
    result = mqtt_verify_suback(data);
679
37
    if(result)
680
25
      break;
681
682
12
    mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
683
12
    break;
684
685
7
  case MQTT_SUBACK:
686
746
  case MQTT_PUBWAIT:
687
    /* we are expecting PUBLISH or SUBACK */
688
746
    packet = mq->firstbyte & 0xf0;
689
746
    if(packet == MQTT_MSG_PUBLISH)
690
689
      mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
691
57
    else if(packet == MQTT_MSG_SUBACK) {
692
37
      mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
693
37
      goto MQTT_SUBACK_COMING;
694
37
    }
695
20
    else if(packet == MQTT_MSG_DISCONNECT) {
696
0
      infof(data, "Got DISCONNECT");
697
0
      *done = TRUE;
698
0
      goto end;
699
0
    }
700
20
    else {
701
20
      result = CURLE_WEIRD_SERVER_REPLY;
702
20
      goto end;
703
20
    }
704
705
    /* -- switched state -- */
706
689
    remlen = mq->remaining_length;
707
689
    infof(data, "Remaining length: %zu bytes", remlen);
708
689
    if(data->set.max_filesize &&
709
131
       (curl_off_t)remlen > data->set.max_filesize) {
710
19
      failf(data, "Maximum file size exceeded");
711
19
      result = CURLE_FILESIZE_EXCEEDED;
712
19
      goto end;
713
19
    }
714
670
    Curl_pgrsSetDownloadSize(data, remlen);
715
670
    data->req.bytecount = 0;
716
670
    data->req.size = remlen;
717
670
    mq->npacket = remlen; /* get this many bytes */
718
670
    FALLTHROUGH();
719
991
  case MQTT_PUB_REMAIN: {
720
    /* read rest of packet, but no more. Cap to buffer size */
721
991
    char buffer[4 * 1024];
722
991
    size_t rest = mq->npacket;
723
991
    if(rest > sizeof(buffer))
724
417
      rest = sizeof(buffer);
725
991
    result = Curl_xfer_recv(data, buffer, rest, &nread);
726
991
    if(result) {
727
48
      if(result == CURLE_AGAIN) {
728
48
        infof(data, "EEEE AAAAGAIN");
729
48
      }
730
48
      goto end;
731
48
    }
732
943
    if(!nread) {
733
217
      infof(data, "server disconnected");
734
217
      result = CURLE_PARTIAL_FILE;
735
217
      goto end;
736
217
    }
737
738
    /* we received something */
739
726
    mq->lastTime = *Curl_pgrs_now(data);
740
741
    /* if QoS is set, message contains packet id */
742
726
    result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
743
726
    if(result)
744
2
      goto end;
745
746
724
    mq->npacket -= nread;
747
724
    if(!mq->npacket)
748
      /* no more PUBLISH payload, back to subscribe wait state */
749
402
      mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
750
724
    break;
751
726
  }
752
0
  default:
753
0
    DEBUGASSERT(NULL); /* illegal state */
754
0
    result = CURLE_WEIRD_SERVER_REPLY;
755
0
    goto end;
756
1.06k
  }
757
1.06k
end:
758
1.06k
  return result;
759
1.06k
}
760
761
static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
762
1.33k
{
763
1.33k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
764
1.33k
  CURLcode result = CURLE_OK;
765
1.33k
  *done = FALSE; /* unconditionally */
766
767
1.33k
  if(!mq)
768
0
    return CURLE_FAILED_INIT;
769
1.33k
  mq->lastTime = *Curl_pgrs_now(data);
770
1.33k
  mq->pingsent = FALSE;
771
772
1.33k
  result = mqtt_connect(data);
773
1.33k
  if(result) {
774
7
    failf(data, "Error %d sending MQTT CONNECT request", result);
775
7
    return result;
776
7
  }
777
1.32k
  mqstate(data, MQTT_FIRST, MQTT_CONNACK);
778
1.32k
  return CURLE_OK;
779
1.33k
}
780
781
static CURLcode mqtt_done(struct Curl_easy *data,
782
                          CURLcode status, bool premature)
783
1.33k
{
784
1.33k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
785
1.33k
  (void)status;
786
1.33k
  (void)premature;
787
1.33k
  if(mq) {
788
1.33k
    curlx_dyn_free(&mq->sendbuf);
789
1.33k
    curlx_dyn_free(&mq->recvbuf);
790
1.33k
  }
791
1.33k
  return CURLE_OK;
792
1.33k
}
793
794
/* we ping regularly to avoid being disconnected by the server */
795
static CURLcode mqtt_ping(struct Curl_easy *data)
796
12.2k
{
797
12.2k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
798
12.2k
  CURLcode result = CURLE_OK;
799
12.2k
  struct connectdata *conn = data->conn;
800
12.2k
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
801
802
12.2k
  if(!mqtt || !mq)
803
0
    return CURLE_FAILED_INIT;
804
805
12.2k
  if(mqtt->state == MQTT_FIRST &&
806
10.9k
     !mq->pingsent &&
807
10.6k
     data->set.upkeep_interval_ms > 0) {
808
10.6k
    struct curltime t = *Curl_pgrs_now(data);
809
10.6k
    timediff_t diff = curlx_ptimediff_ms(&t, &mq->lastTime);
810
811
10.6k
    if(diff > data->set.upkeep_interval_ms) {
812
      /* 0xC0 is PINGREQ, and 0x00 is remaining length */
813
83
      unsigned char packet[2] = { 0xC0, 0x00 };
814
83
      size_t packetlen = sizeof(packet);
815
816
83
      result = mqtt_send(data, (char *)packet, packetlen);
817
83
      if(!result) {
818
83
        mq->pingsent = TRUE;
819
83
      }
820
83
      infof(data, "mqtt_ping: sent ping request.");
821
83
    }
822
10.6k
  }
823
12.2k
  return result;
824
12.2k
}
825
826
static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
827
12.2k
{
828
12.2k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
829
12.2k
  CURLcode result = CURLE_OK;
830
12.2k
  size_t nread;
831
12.2k
  unsigned char recvbyte;
832
12.2k
  struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN);
833
834
12.2k
  if(!mqtt || !mq)
835
0
    return CURLE_FAILED_INIT;
836
837
12.2k
  *done = FALSE;
838
839
12.2k
  if(curlx_dyn_len(&mq->sendbuf)) {
840
    /* send the remainder of an outgoing packet */
841
0
    result = mqtt_send(data, curlx_dyn_ptr(&mq->sendbuf),
842
0
                       curlx_dyn_len(&mq->sendbuf));
843
0
    if(result)
844
0
      return result;
845
0
  }
846
847
12.2k
  result = mqtt_ping(data);
848
12.2k
  if(result)
849
0
    return result;
850
851
12.2k
  infof(data, "mqtt_doing: state [%d]", (int)mqtt->state);
852
12.2k
  switch(mqtt->state) {
853
10.9k
  case MQTT_FIRST:
854
    /* Read the initial byte only */
855
10.9k
    result = Curl_xfer_recv(data, (char *)&mq->firstbyte, 1, &nread);
856
10.9k
    if(result)
857
629
      break;
858
10.3k
    else if(!nread) {
859
678
      failf(data, "Connection disconnected");
860
678
      *done = TRUE;
861
678
      result = CURLE_RECV_ERROR;
862
678
      break;
863
678
    }
864
9.63k
    Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1);
865
866
    /* we received something */
867
9.63k
    mq->lastTime = *Curl_pgrs_now(data);
868
869
    /* remember the first byte */
870
9.63k
    mq->npacket = 0;
871
9.63k
    mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
872
9.63k
    FALLTHROUGH();
873
9.65k
  case MQTT_REMAINING_LENGTH:
874
11.6k
    do {
875
11.6k
      result = Curl_xfer_recv(data, (char *)&recvbyte, 1, &nread);
876
11.6k
      if(result || !nread)
877
123
        break;
878
11.4k
      Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&recvbyte, 1);
879
11.4k
      mq->pkt_hd[mq->npacket++] = recvbyte;
880
11.4k
    } while((recvbyte & 0x80) && (mq->npacket < 4));
881
9.65k
    if(!result && nread && (recvbyte & 0x80))
882
      /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
883
         127 * 128^3 bytes. server tried to send more */
884
12
      result = CURLE_WEIRD_SERVER_REPLY;
885
9.65k
    if(result)
886
35
      break;
887
9.61k
    if(mqtt_decode_len(&mq->remaining_length, mq->pkt_hd, mq->npacket)) {
888
0
      result = CURLE_WEIRD_SERVER_REPLY;
889
0
      break;
890
0
    }
891
9.61k
    mq->npacket = 0;
892
    /* PINGRESP and DISCONNECT must have remaining_length == 0 and
893
     * reserved bits (low nibble) must be zero per MQTT 3.1.1
894
     * sections 2.2.2, 3.13.1 and 3.14.1. Reject before state
895
     * dispatch to prevent nextstate confusion. */
896
9.61k
    {
897
9.61k
      const unsigned char type = mq->firstbyte & 0xF0;
898
9.61k
      const unsigned char reserved = mq->firstbyte & 0x0F;
899
9.61k
      if((type == MQTT_MSG_DISCONNECT || type == MQTT_MSG_PINGRESP) &&
900
778
         (mq->remaining_length || reserved)) {
901
30
        failf(data,
902
30
              "Broker sent malformed %s "
903
30
              "(remaining_length=%zu, header byte=0x%02x)",
904
30
              type == MQTT_MSG_DISCONNECT ? "DISCONNECT" : "PINGRESP",
905
30
              mq->remaining_length, mq->firstbyte);
906
30
        result = CURLE_WEIRD_SERVER_REPLY;
907
30
        break;
908
30
      }
909
9.61k
    }
910
9.58k
    if(mq->remaining_length) {
911
2.59k
      mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
912
2.59k
      break;
913
2.59k
    }
914
6.99k
    mqstate(data, MQTT_FIRST, MQTT_FIRST);
915
916
6.99k
    if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
917
34
      infof(data, "Got DISCONNECT");
918
34
      *done = TRUE;
919
34
    }
920
921
    /* ping response */
922
6.99k
    if(mq->firstbyte == MQTT_MSG_PINGRESP) {
923
714
      infof(data, "Received ping response.");
924
714
      mq->pingsent = FALSE;
925
714
      mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
926
714
    }
927
6.99k
    break;
928
103
  case MQTT_CONNACK:
929
103
    result = mqtt_verify_connack(data);
930
103
    if(result)
931
47
      break;
932
933
56
    if(data->state.httpreq == HTTPREQ_POST) {
934
34
      result = mqtt_publish(data);
935
34
      if(!result) {
936
19
        result = mqtt_disconnect(data);
937
19
        *done = TRUE;
938
19
      }
939
34
      mqtt->nextstate = MQTT_FIRST;
940
34
    }
941
22
    else {
942
22
      result = mqtt_subscribe(data);
943
22
      if(!result) {
944
19
        mqstate(data, MQTT_FIRST, MQTT_SUBACK);
945
19
      }
946
22
    }
947
56
    break;
948
949
7
  case MQTT_SUBACK:
950
746
  case MQTT_PUBWAIT:
951
1.06k
  case MQTT_PUB_REMAIN:
952
1.06k
    result = mqtt_read_publish(data, done);
953
1.06k
    break;
954
955
114
  default:
956
114
    failf(data, "State not handled yet");
957
114
    *done = TRUE;
958
114
    break;
959
12.2k
  }
960
961
12.2k
  if(result == CURLE_AGAIN)
962
705
    result = CURLE_OK;
963
12.2k
  return result;
964
12.2k
}
965
966
#ifdef USE_SSL
967
968
static CURLcode mqtts_connecting(struct Curl_easy *data, bool *done)
969
0
{
970
0
  struct connectdata *conn = data->conn;
971
0
  CURLcode result;
972
973
0
  result = Curl_conn_connect(data, FIRSTSOCKET, TRUE, done);
974
0
  if(result)
975
0
    connclose(conn, "Failed TLS connection");
976
0
  return result;
977
0
}
978
979
/*
980
 * MQTTS protocol.
981
 */
982
const struct Curl_protocol Curl_protocol_mqtts = {
983
  mqtt_setup_conn,                    /* setup_connection */
984
  mqtt_do,                            /* do_it */
985
  mqtt_done,                          /* done */
986
  ZERO_NULL,                          /* do_more */
987
  ZERO_NULL,                          /* connect_it */
988
  mqtts_connecting,                   /* connecting */
989
  mqtt_doing,                         /* doing */
990
  ZERO_NULL,                          /* proto_pollset */
991
  mqtt_pollset,                       /* doing_pollset */
992
  ZERO_NULL,                          /* domore_pollset */
993
  ZERO_NULL,                          /* perform_pollset */
994
  ZERO_NULL,                          /* disconnect */
995
  ZERO_NULL,                          /* write_resp */
996
  ZERO_NULL,                          /* write_resp_hd */
997
  ZERO_NULL,                          /* connection_is_dead */
998
  ZERO_NULL,                          /* attach connection */
999
  ZERO_NULL,                          /* follow */
1000
};
1001
1002
#endif
1003
1004
/*
1005
 * MQTT protocol.
1006
 */
1007
const struct Curl_protocol Curl_protocol_mqtt = {
1008
  mqtt_setup_conn,                    /* setup_connection */
1009
  mqtt_do,                            /* do_it */
1010
  mqtt_done,                          /* done */
1011
  ZERO_NULL,                          /* do_more */
1012
  ZERO_NULL,                          /* connect_it */
1013
  ZERO_NULL,                          /* connecting */
1014
  mqtt_doing,                         /* doing */
1015
  ZERO_NULL,                          /* proto_pollset */
1016
  mqtt_pollset,                       /* doing_pollset */
1017
  ZERO_NULL,                          /* domore_pollset */
1018
  ZERO_NULL,                          /* perform_pollset */
1019
  ZERO_NULL,                          /* disconnect */
1020
  ZERO_NULL,                          /* write_resp */
1021
  ZERO_NULL,                          /* write_resp_hd */
1022
  ZERO_NULL,                          /* connection_is_dead */
1023
  ZERO_NULL,                          /* attach connection */
1024
  ZERO_NULL,                          /* follow */
1025
};
1026
1027
#endif /* CURL_DISABLE_MQTT */