Coverage Report

Created: 2026-03-07 07:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/curl/lib/mqtt.c
Line
Count
Source
1
/***************************************************************************
2
 *                                  _   _ ____  _
3
 *  Project                     ___| | | |  _ \| |
4
 *                             / __| | | | |_) | |
5
 *                            | (__| |_| |  _ <| |___
6
 *                             \___|\___/|_| \_\_____|
7
 *
8
 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9
 * Copyright (C) Björn Stenberg, <bjorn@haxx.se>
10
 *
11
 * This software is licensed as described in the file COPYING, which
12
 * you should have received as part of this distribution. The terms
13
 * are also available at https://curl.se/docs/copyright.html.
14
 *
15
 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16
 * copies of the Software, and permit persons to whom the Software is
17
 * furnished to do so, under the terms of the COPYING file.
18
 *
19
 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20
 * KIND, either express or implied.
21
 *
22
 * SPDX-License-Identifier: curl
23
 *
24
 ***************************************************************************/
25
#include "curl_setup.h"
26
#include "urldata.h"
27
28
#ifndef CURL_DISABLE_MQTT
29
30
#include "transfer.h"
31
#include "sendf.h"
32
#include "curl_trc.h"
33
#include "progress.h"
34
#include "mqtt.h"
35
#include "select.h"
36
#include "url.h"
37
#include "escape.h"
38
#include "rand.h"
39
#include "cfilters.h"
40
#include "connect.h"
41
42
/* first byte is command.
43
   second byte is for flags. */
44
1.15k
#define MQTT_MSG_CONNECT    0x10
45
/* #define MQTT_MSG_CONNACK    0x20 */
46
520
#define MQTT_MSG_PUBLISH    0x30
47
4
#define MQTT_MSG_SUBSCRIBE  0x82
48
76
#define MQTT_MSG_SUBACK     0x90
49
5.06k
#define MQTT_MSG_DISCONNECT 0xe0
50
/* #define MQTT_MSG_PINGREQ    0xC0 */
51
5.03k
#define MQTT_MSG_PINGRESP   0xD0
52
53
82
#define MQTT_CONNACK_LEN  2
54
84
#define MQTT_SUBACK_LEN   3
55
6.91k
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
56
57
/* meta key for storing protocol meta at easy handle */
58
29.0k
#define CURL_META_MQTT_EASY   "meta:proto:mqtt:easy"
59
/* meta key for storing protocol meta at connection */
60
43.0k
#define CURL_META_MQTT_CONN   "meta:proto:mqtt:conn"
61
62
enum mqttstate {
63
  MQTT_FIRST,             /* 0 */
64
  MQTT_REMAINING_LENGTH,  /* 1 */
65
  MQTT_CONNACK,           /* 2 */
66
  MQTT_SUBACK,            /* 3 */
67
  MQTT_SUBACK_COMING,     /* 4 - the SUBACK remainder */
68
  MQTT_PUBWAIT,    /* 5 - wait for publish */
69
  MQTT_PUB_REMAIN,  /* 6 - wait for the remainder of the publish */
70
71
  MQTT_NOSTATE /* 7 - never used an actual state */
72
};
73
74
struct mqtt_conn {
75
  enum mqttstate state;
76
  enum mqttstate nextstate; /* switch to this after remaining length is
77
                               done */
78
  unsigned int packetid;
79
};
80
81
/* protocol-specific transfer-related data */
82
struct MQTT {
83
  struct dynbuf sendbuf;
84
  /* when receiving */
85
  struct dynbuf recvbuf;
86
  size_t npacket; /* byte counter */
87
  size_t remaining_length;
88
  unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
89
  struct curltime lastTime; /* last time we sent or received data */
90
  unsigned char firstbyte;
91
  BIT(pingsent); /* 1 while we wait for ping response */
92
};
93
94
static void mqtt_easy_dtor(void *key, size_t klen, void *entry)
95
5.31k
{
96
5.31k
  struct MQTT *mq = entry;
97
5.31k
  (void)key;
98
5.31k
  (void)klen;
99
5.31k
  curlx_dyn_free(&mq->sendbuf);
100
5.31k
  curlx_dyn_free(&mq->recvbuf);
101
5.31k
  curlx_free(mq);
102
5.31k
}
103
104
static void mqtt_conn_dtor(void *key, size_t klen, void *entry)
105
5.31k
{
106
5.31k
  (void)key;
107
5.31k
  (void)klen;
108
5.31k
  curlx_free(entry);
109
5.31k
}
110
111
static CURLcode mqtt_setup_conn(struct Curl_easy *data,
112
                                struct connectdata *conn)
113
5.31k
{
114
  /* setup MQTT specific meta data at easy handle and connection */
115
5.31k
  struct mqtt_conn *mqtt;
116
5.31k
  struct MQTT *mq;
117
118
5.31k
  mqtt = curlx_calloc(1, sizeof(*mqtt));
119
5.31k
  if(!mqtt ||
120
5.31k
     Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor))
121
0
    return CURLE_OUT_OF_MEMORY;
122
123
5.31k
  mq = curlx_calloc(1, sizeof(struct MQTT));
124
5.31k
  if(!mq)
125
0
    return CURLE_OUT_OF_MEMORY;
126
5.31k
  curlx_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
127
5.31k
  curlx_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
128
5.31k
  if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor))
129
0
    return CURLE_OUT_OF_MEMORY;
130
5.31k
  return CURLE_OK;
131
5.31k
}
132
133
static CURLcode mqtt_send(struct Curl_easy *data,
134
                          const char *buf, size_t len)
135
1.19k
{
136
1.19k
  size_t n;
137
1.19k
  CURLcode result;
138
1.19k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
139
140
1.19k
  if(!mq)
141
0
    return CURLE_FAILED_INIT;
142
143
1.19k
  result = Curl_xfer_send(data, buf, len, FALSE, &n);
144
1.19k
  if(result)
145
0
    return result;
146
1.19k
  mq->lastTime = *Curl_pgrs_now(data);
147
1.19k
  Curl_debug(data, CURLINFO_HEADER_OUT, buf, n);
148
1.19k
  if(len != n) {
149
0
    size_t nsend = len - n;
150
0
    if(curlx_dyn_len(&mq->sendbuf)) {
151
0
      DEBUGASSERT(curlx_dyn_len(&mq->sendbuf) >= nsend);
152
0
      result = curlx_dyn_tail(&mq->sendbuf, nsend); /* keep this much */
153
0
    }
154
0
    else {
155
0
      result = curlx_dyn_addn(&mq->sendbuf, &buf[n], nsend);
156
0
    }
157
0
  }
158
1.19k
  else
159
1.19k
    curlx_dyn_reset(&mq->sendbuf);
160
1.19k
  return result;
161
1.19k
}
162
163
/* Generic function called by the multi interface to figure out what socket(s)
164
   to wait for and for what actions during the DOING and PROTOCONNECT
165
   states */
166
static CURLcode mqtt_pollset(struct Curl_easy *data,
167
                             struct easy_pollset *ps)
168
8.53k
{
169
8.53k
  return Curl_pollset_add_in(data, ps, data->conn->sock[FIRSTSOCKET]);
170
8.53k
}
171
172
static int mqtt_encode_len(char *buf, size_t len)
173
1.15k
{
174
1.15k
  int i;
175
176
2.40k
  for(i = 0; (len > 0) && (i < 4); i++) {
177
1.24k
    unsigned char encoded;
178
1.24k
    encoded = len % 0x80;
179
1.24k
    len /= 0x80;
180
1.24k
    if(len)
181
87
      encoded |= 0x80;
182
1.24k
    buf[i] = (char)encoded;
183
1.24k
  }
184
185
1.15k
  return i;
186
1.15k
}
187
188
/* add the passwd to the CONNECT packet */
189
static int add_passwd(const char *passwd, const size_t plen,
190
                      char *pkt, const size_t start, int remain_pos)
191
49
{
192
  /* magic number that need to be set properly */
193
49
  const size_t conn_flags_pos = remain_pos + 8;
194
49
  if(plen > 0xffff)
195
3
    return 1;
196
197
  /* set password flag */
198
46
  pkt[conn_flags_pos] |= 0x40;
199
200
  /* length of password provided */
201
46
  pkt[start] = (char)((plen >> 8) & 0xFF);
202
46
  pkt[start + 1] = (char)(plen & 0xFF);
203
46
  memcpy(&pkt[start + 2], passwd, plen);
204
46
  return 0;
205
49
}
206
207
/* add user to the CONNECT packet */
208
static int add_user(const char *username, const size_t ulen,
209
                    unsigned char *pkt, const size_t start, int remain_pos)
210
120
{
211
  /* magic number that need to be set properly */
212
120
  const size_t conn_flags_pos = remain_pos + 8;
213
120
  if(ulen > 0xffff)
214
7
    return 1;
215
216
  /* set username flag */
217
113
  pkt[conn_flags_pos] |= 0x80;
218
  /* length of username provided */
219
113
  pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
220
113
  pkt[start + 1] = (unsigned char)(ulen & 0xFF);
221
113
  memcpy(&pkt[start + 2], username, ulen);
222
113
  return 0;
223
120
}
224
225
/* add client ID to the CONNECT packet */
226
static int add_client_id(const char *client_id, const size_t client_id_len,
227
                         char *pkt, const size_t start)
228
1.15k
{
229
1.15k
  if(client_id_len != MQTT_CLIENTID_LEN)
230
0
    return 1;
231
1.15k
  pkt[start] = 0x00;
232
1.15k
  pkt[start + 1] = MQTT_CLIENTID_LEN;
233
1.15k
  memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
234
1.15k
  return 0;
235
1.15k
}
236
237
/* Set initial values of CONNECT packet */
238
static int init_connpack(char *packet, char *remain, int remain_pos)
239
1.15k
{
240
  /* Fixed header starts */
241
  /* packet type */
242
1.15k
  packet[0] = MQTT_MSG_CONNECT;
243
  /* remaining length field */
244
1.15k
  memcpy(&packet[1], remain, remain_pos);
245
  /* Fixed header ends */
246
247
  /* Variable header starts */
248
  /* protocol length */
249
1.15k
  packet[remain_pos + 1] = 0x00;
250
1.15k
  packet[remain_pos + 2] = 0x04;
251
  /* protocol name */
252
1.15k
  packet[remain_pos + 3] = 'M';
253
1.15k
  packet[remain_pos + 4] = 'Q';
254
1.15k
  packet[remain_pos + 5] = 'T';
255
1.15k
  packet[remain_pos + 6] = 'T';
256
  /* protocol level */
257
1.15k
  packet[remain_pos + 7] = 0x04;
258
  /* CONNECT flag: CleanSession */
259
1.15k
  packet[remain_pos + 8] = 0x02;
260
  /* keep-alive 0 = disabled */
261
1.15k
  packet[remain_pos + 9] = 0x00;
262
1.15k
  packet[remain_pos + 10] = 0x3c;
263
  /* end of variable header */
264
1.15k
  return remain_pos + 10;
265
1.15k
}
266
267
static CURLcode mqtt_connect(struct Curl_easy *data)
268
1.15k
{
269
1.15k
  CURLcode result = CURLE_OK;
270
1.15k
  int pos = 0;
271
1.15k
  int rc = 0;
272
  /* remain length */
273
1.15k
  int remain_pos = 0;
274
1.15k
  char remain[4] = { 0 };
275
1.15k
  size_t packetlen = 0;
276
1.15k
  size_t start_user = 0;
277
1.15k
  size_t start_pwd = 0;
278
1.15k
  char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
279
1.15k
  const size_t clen = strlen("curl");
280
1.15k
  char *packet = NULL;
281
282
  /* extracting username from request */
283
1.15k
  const char *username = data->state.aptr.user ? data->state.aptr.user : "";
284
1.15k
  const size_t ulen = strlen(username);
285
  /* extracting password from request */
286
1.15k
  const char *passwd = data->state.aptr.passwd ? data->state.aptr.passwd : "";
287
1.15k
  const size_t plen = strlen(passwd);
288
1.15k
  const size_t payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2 +
289
  /* The plus 2s below are for the MSB and LSB describing the length of the
290
     string to be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
291
1.15k
    (ulen ? 2 : 0) +
292
1.15k
    (plen ? 2 : 0);
293
294
  /* getting how much occupy the remain length */
295
1.15k
  remain_pos = mqtt_encode_len(remain, payloadlen + 10);
296
297
  /* 10 length of variable header and 1 the first byte of the fixed header */
298
1.15k
  packetlen = payloadlen + 10 + remain_pos + 1;
299
300
  /* allocating packet */
301
1.15k
  if(packetlen > 0xFFFFFFF)
302
0
    return CURLE_WEIRD_SERVER_REPLY;
303
1.15k
  packet = curlx_calloc(1, packetlen);
304
1.15k
  if(!packet)
305
0
    return CURLE_OUT_OF_MEMORY;
306
307
  /* set initial values for the CONNECT packet */
308
1.15k
  pos = init_connpack(packet, remain, remain_pos);
309
310
1.15k
  result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen],
311
1.15k
                           MQTT_CLIENTID_LEN - clen + 1);
312
  /* add client id */
313
1.15k
  rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
314
1.15k
  if(rc) {
315
0
    failf(data, "Client ID length mismatched: [%zu]", strlen(client_id));
316
0
    result = CURLE_WEIRD_SERVER_REPLY;
317
0
    goto end;
318
0
  }
319
1.15k
  infof(data, "Using client id '%s'", client_id);
320
321
  /* position where the user payload starts */
322
1.15k
  start_user = pos + 3 + MQTT_CLIENTID_LEN;
323
  /* position where the password payload starts */
324
1.15k
  start_pwd = start_user + ulen;
325
  /* if username was provided, add it to the packet */
326
1.15k
  if(ulen) {
327
120
    start_pwd += 2;
328
329
120
    rc = add_user(username, ulen,
330
120
                  (unsigned char *)packet, start_user, remain_pos);
331
120
    if(rc) {
332
7
      failf(data, "Username too long: [%zu]", ulen);
333
7
      result = CURLE_WEIRD_SERVER_REPLY;
334
7
      goto end;
335
7
    }
336
120
  }
337
338
  /* if passwd was provided, add it to the packet */
339
1.14k
  if(plen) {
340
49
    rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
341
49
    if(rc) {
342
3
      failf(data, "Password too long: [%zu]", plen);
343
3
      result = CURLE_WEIRD_SERVER_REPLY;
344
3
      goto end;
345
3
    }
346
49
  }
347
348
1.14k
  if(!result)
349
1.14k
    result = mqtt_send(data, packet, packetlen);
350
351
1.15k
end:
352
1.15k
  if(packet)
353
1.15k
    curlx_free(packet);
354
1.15k
  Curl_safefree(data->state.aptr.user);
355
1.15k
  Curl_safefree(data->state.aptr.passwd);
356
1.15k
  return result;
357
1.14k
}
358
359
static CURLcode mqtt_disconnect(struct Curl_easy *data)
360
1
{
361
1
  return mqtt_send(data, "\xe0\x00", 2);
362
1
}
363
364
static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
365
97
{
366
97
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
367
97
  size_t rlen;
368
97
  CURLcode result;
369
370
97
  if(!mq)
371
0
    return CURLE_FAILED_INIT;
372
97
  rlen = curlx_dyn_len(&mq->recvbuf);
373
374
97
  if(rlen < nbytes) {
375
97
    unsigned char readbuf[1024];
376
97
    size_t nread;
377
378
97
    DEBUGASSERT(nbytes - rlen < sizeof(readbuf));
379
97
    result = Curl_xfer_recv(data, (char *)readbuf, nbytes - rlen, &nread);
380
97
    if(result)
381
2
      return result;
382
95
    if(!nread) /* EOF */
383
43
       return CURLE_RECV_ERROR;
384
52
    if(curlx_dyn_addn(&mq->recvbuf, readbuf, nread))
385
0
      return CURLE_OUT_OF_MEMORY;
386
52
    rlen = curlx_dyn_len(&mq->recvbuf);
387
52
  }
388
52
  return (rlen >= nbytes) ? CURLE_OK : CURLE_AGAIN;
389
97
}
390
391
static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
392
27
{
393
27
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
394
27
  DEBUGASSERT(mq);
395
27
  if(mq) {
396
27
    size_t rlen = curlx_dyn_len(&mq->recvbuf);
397
27
    if(rlen <= nbytes)
398
27
      curlx_dyn_reset(&mq->recvbuf);
399
0
    else
400
0
      curlx_dyn_tail(&mq->recvbuf, rlen - nbytes);
401
27
  }
402
27
}
403
404
static CURLcode mqtt_verify_connack(struct Curl_easy *data)
405
100
{
406
100
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
407
100
  CURLcode result;
408
100
  const char *ptr;
409
410
100
  DEBUGASSERT(mq);
411
100
  if(!mq)
412
0
    return CURLE_FAILED_INIT;
413
100
  if(mq->remaining_length != 2) {
414
36
    failf(data, "CONNACK expected Remaining Length 2, got %zu",
415
36
          mq->remaining_length);
416
36
    return CURLE_WEIRD_SERVER_REPLY;
417
36
  }
418
419
64
  result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
420
64
  if(result)
421
54
    return result;
422
423
  /* verify CONNACK */
424
10
  DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
425
10
  ptr = curlx_dyn_ptr(&mq->recvbuf);
426
10
  Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN);
427
428
10
  if(ptr[0] != 0x00 || ptr[1] != 0x00) {
429
2
    failf(data, "Expected %02x%02x but got %02x%02x",
430
2
          0x00, 0x00, ptr[0], ptr[1]);
431
2
    curlx_dyn_reset(&mq->recvbuf);
432
2
    return CURLE_WEIRD_SERVER_REPLY;
433
2
  }
434
8
  mqtt_recv_consume(data, MQTT_CONNACK_LEN);
435
8
  return CURLE_OK;
436
10
}
437
438
static CURLcode mqtt_get_topic(struct Curl_easy *data,
439
                               char **topic, size_t *topiclen)
440
7
{
441
7
  const char *path = data->state.up.path;
442
7
  CURLcode result = CURLE_URL_MALFORMAT;
443
7
  if(strlen(path) > 1) {
444
5
    result = Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA);
445
5
    if(!result && (*topiclen > 0xffff)) {
446
0
      failf(data, "Too long MQTT topic");
447
0
      result = CURLE_URL_MALFORMAT;
448
0
    }
449
5
  }
450
2
  else
451
2
    failf(data, "No MQTT topic found. Forgot to URL encode it?");
452
453
7
  return result;
454
7
}
455
456
static CURLcode mqtt_subscribe(struct Curl_easy *data)
457
5
{
458
5
  CURLcode result = CURLE_OK;
459
5
  char *topic = NULL;
460
5
  size_t topiclen;
461
5
  unsigned char *packet = NULL;
462
5
  size_t packetlen;
463
5
  char encodedsize[4];
464
5
  size_t n;
465
5
  struct connectdata *conn = data->conn;
466
5
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
467
468
5
  if(!mqtt)
469
0
    return CURLE_FAILED_INIT;
470
471
5
  result = mqtt_get_topic(data, &topic, &topiclen);
472
5
  if(result)
473
1
    goto fail;
474
475
4
  mqtt->packetid++;
476
477
4
  packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
478
                               + 2 bytes topic length + QoS byte */
479
4
  n = mqtt_encode_len((char *)encodedsize, packetlen);
480
4
  packetlen += n + 1; /* add one for the control packet type byte */
481
482
4
  packet = curlx_malloc(packetlen);
483
4
  if(!packet) {
484
0
    result = CURLE_OUT_OF_MEMORY;
485
0
    goto fail;
486
0
  }
487
488
4
  packet[0] = MQTT_MSG_SUBSCRIBE;
489
4
  memcpy(&packet[1], encodedsize, n);
490
4
  packet[1 + n] = (mqtt->packetid >> 8) & 0xff;
491
4
  packet[2 + n] = mqtt->packetid & 0xff;
492
4
  packet[3 + n] = (topiclen >> 8) & 0xff;
493
4
  packet[4 + n] = topiclen & 0xff;
494
4
  memcpy(&packet[5 + n], topic, topiclen);
495
4
  packet[5 + n + topiclen] = 0; /* QoS zero */
496
497
4
  result = mqtt_send(data, (const char *)packet, packetlen);
498
499
5
fail:
500
5
  curlx_free(topic);
501
5
  curlx_free(packet);
502
5
  return result;
503
4
}
504
505
/*
506
 * Called when the first byte was already read.
507
 */
508
static CURLcode mqtt_verify_suback(struct Curl_easy *data)
509
48
{
510
48
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
511
48
  struct connectdata *conn = data->conn;
512
48
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
513
48
  CURLcode result;
514
48
  const char *ptr;
515
516
48
  if(!mqtt || !mq)
517
0
    return CURLE_FAILED_INIT;
518
519
48
  if(mq->remaining_length != 3) {
520
15
    failf(data, "SUBACK expected Remaining Length 3, got %zu",
521
15
          mq->remaining_length);
522
15
    return CURLE_WEIRD_SERVER_REPLY;
523
15
  }
524
525
33
  result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
526
33
  if(result)
527
1
    goto fail;
528
529
  /* verify SUBACK */
530
32
  DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
531
32
  ptr = curlx_dyn_ptr(&mq->recvbuf);
532
32
  Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN);
533
534
32
  if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) ||
535
25
     ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) ||
536
20
     ptr[2] != 0x00) {
537
13
    curlx_dyn_reset(&mq->recvbuf);
538
13
    result = CURLE_WEIRD_SERVER_REPLY;
539
13
    goto fail;
540
13
  }
541
19
  mqtt_recv_consume(data, MQTT_SUBACK_LEN);
542
33
fail:
543
33
  return result;
544
19
}
545
546
1
#define MAX_MQTT_MESSAGE_SIZE 0xFFFFFFF
547
548
static CURLcode mqtt_publish(struct Curl_easy *data)
549
3
{
550
3
  CURLcode result;
551
3
  char *payload = data->set.postfields;
552
3
  size_t payloadlen;
553
3
  char *topic = NULL;
554
3
  size_t topiclen;
555
3
  unsigned char *pkt = NULL;
556
3
  size_t i = 0;
557
3
  size_t remaininglength;
558
3
  size_t encodelen;
559
3
  char encodedbytes[4];
560
3
  curl_off_t postfieldsize = data->set.postfieldsize;
561
562
3
  if(!payload) {
563
1
    DEBUGF(infof(data, "mqtt_publish without payload, return bad arg"));
564
1
    return CURLE_BAD_FUNCTION_ARGUMENT;
565
1
  }
566
2
  if(!curlx_sotouz_fits(postfieldsize, &payloadlen)) {
567
2
    if(postfieldsize > 0) /* off_t does not fit into size_t */
568
0
      return CURLE_BAD_FUNCTION_ARGUMENT;
569
2
    payloadlen = strlen(payload);
570
2
  }
571
572
2
  result = mqtt_get_topic(data, &topic, &topiclen);
573
2
  if(result)
574
1
    goto fail;
575
576
1
  remaininglength = payloadlen + 2 + topiclen;
577
1
  encodelen = mqtt_encode_len(encodedbytes, remaininglength);
578
1
  if(remaininglength > (MAX_MQTT_MESSAGE_SIZE - encodelen - 1)) {
579
0
    result = CURLE_TOO_LARGE;
580
0
    goto fail;
581
0
  }
582
583
  /* add the control byte and the encoded remaining length */
584
1
  pkt = curlx_malloc(remaininglength + 1 + encodelen);
585
1
  if(!pkt) {
586
0
    result = CURLE_OUT_OF_MEMORY;
587
0
    goto fail;
588
0
  }
589
590
  /* assemble packet */
591
1
  pkt[i++] = MQTT_MSG_PUBLISH;
592
1
  memcpy(&pkt[i], encodedbytes, encodelen);
593
1
  i += encodelen;
594
1
  pkt[i++] = (topiclen >> 8) & 0xff;
595
1
  pkt[i++] = (topiclen & 0xff);
596
1
  memcpy(&pkt[i], topic, topiclen);
597
1
  i += topiclen;
598
1
  memcpy(&pkt[i], payload, payloadlen);
599
1
  i += payloadlen;
600
1
  result = mqtt_send(data, (const char *)pkt, i);
601
602
2
fail:
603
2
  curlx_free(pkt);
604
2
  curlx_free(topic);
605
2
  return result;
606
1
}
607
608
/* return 0 on success, non-zero on error */
609
static int mqtt_decode_len(size_t *lenp, const unsigned char *buf,
610
                           size_t buflen)
611
7.64k
{
612
7.64k
  size_t len = 0;
613
7.64k
  size_t mult = 1;
614
7.64k
  size_t i;
615
7.64k
  unsigned char encoded = 128;
616
617
16.9k
  for(i = 0; (i < buflen) && (encoded & 128); i++) {
618
9.26k
    if(i == 4)
619
0
      return 1; /* bad size */
620
9.26k
    encoded = buf[i];
621
9.26k
    len += (encoded & 127) * mult;
622
9.26k
    mult *= 128;
623
9.26k
  }
624
625
7.64k
  *lenp = len;
626
7.64k
  return 0;
627
7.64k
}
628
629
#if defined(DEBUGBUILD) && defined(CURLVERBOSE)
630
static const char *statenames[] = {
631
  "MQTT_FIRST",
632
  "MQTT_REMAINING_LENGTH",
633
  "MQTT_CONNACK",
634
  "MQTT_SUBACK",
635
  "MQTT_SUBACK_COMING",
636
  "MQTT_PUBWAIT",
637
  "MQTT_PUB_REMAIN",
638
639
  "NOT A STATE"
640
};
641
#endif
642
643
/* The only way to change state */
644
static void mqstate(struct Curl_easy *data,
645
                    enum mqttstate state,
646
                    enum mqttstate nextstate) /* used if state == FIRST */
647
17.7k
{
648
17.7k
  struct connectdata *conn = data->conn;
649
17.7k
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
650
17.7k
  DEBUGASSERT(mqtt);
651
17.7k
  if(!mqtt)
652
0
    return;
653
17.7k
#ifdef DEBUGBUILD
654
17.7k
  infof(data, "%s (from %s) (next is %s)",
655
17.7k
        statenames[state],
656
17.7k
        statenames[mqtt->state],
657
17.7k
        (state == MQTT_FIRST) ? statenames[nextstate] : "");
658
17.7k
#endif
659
17.7k
  mqtt->state = state;
660
17.7k
  if(state == MQTT_FIRST)
661
8.86k
    mqtt->nextstate = nextstate;
662
17.7k
}
663
664
static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
665
734
{
666
734
  CURLcode result = CURLE_OK;
667
734
  struct connectdata *conn = data->conn;
668
734
  size_t nread;
669
734
  size_t remlen;
670
734
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
671
734
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
672
734
  unsigned char packet;
673
674
734
  DEBUGASSERT(mqtt);
675
734
  if(!mqtt || !mq)
676
0
    return CURLE_FAILED_INIT;
677
678
734
  switch(mqtt->state) {
679
48
MQTT_SUBACK_COMING:
680
48
  case MQTT_SUBACK_COMING:
681
48
    result = mqtt_verify_suback(data);
682
48
    if(result)
683
29
      break;
684
685
19
    mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
686
19
    break;
687
688
3
  case MQTT_SUBACK:
689
519
  case MQTT_PUBWAIT:
690
    /* we are expecting PUBLISH or SUBACK */
691
519
    packet = mq->firstbyte & 0xf0;
692
519
    if(packet == MQTT_MSG_PUBLISH)
693
443
      mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
694
76
    else if(packet == MQTT_MSG_SUBACK) {
695
48
      mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
696
48
      goto MQTT_SUBACK_COMING;
697
48
    }
698
28
    else if(packet == MQTT_MSG_DISCONNECT) {
699
9
      infof(data, "Got DISCONNECT");
700
9
      *done = TRUE;
701
9
      goto end;
702
9
    }
703
19
    else {
704
19
      result = CURLE_WEIRD_SERVER_REPLY;
705
19
      goto end;
706
19
    }
707
708
    /* -- switched state -- */
709
443
    remlen = mq->remaining_length;
710
443
    infof(data, "Remaining length: %zu bytes", remlen);
711
443
    if(data->set.max_filesize &&
712
115
       (curl_off_t)remlen > data->set.max_filesize) {
713
15
      failf(data, "Maximum file size exceeded");
714
15
      result = CURLE_FILESIZE_EXCEEDED;
715
15
      goto end;
716
15
    }
717
428
    Curl_pgrsSetDownloadSize(data, remlen);
718
428
    data->req.bytecount = 0;
719
428
    data->req.size = remlen;
720
428
    mq->npacket = remlen; /* get this many bytes */
721
428
    FALLTHROUGH();
722
643
  case MQTT_PUB_REMAIN: {
723
    /* read rest of packet, but no more. Cap to buffer size */
724
643
    char buffer[4 * 1024];
725
643
    size_t rest = mq->npacket;
726
643
    if(rest > sizeof(buffer))
727
288
      rest = sizeof(buffer);
728
643
    result = Curl_xfer_recv(data, buffer, rest, &nread);
729
643
    if(result) {
730
45
      if(result == CURLE_AGAIN) {
731
45
        infof(data, "EEEE AAAAGAIN");
732
45
      }
733
45
      goto end;
734
45
    }
735
598
    if(!nread) {
736
177
      infof(data, "server disconnected");
737
177
      result = CURLE_PARTIAL_FILE;
738
177
      goto end;
739
177
    }
740
741
    /* we received something */
742
421
    mq->lastTime = *Curl_pgrs_now(data);
743
744
    /* if QoS is set, message contains packet id */
745
421
    result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
746
421
    if(result)
747
1
      goto end;
748
749
420
    mq->npacket -= nread;
750
420
    if(!mq->npacket)
751
      /* no more PUBLISH payload, back to subscribe wait state */
752
205
      mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
753
420
    break;
754
421
  }
755
0
  default:
756
0
    DEBUGASSERT(NULL); /* illegal state */
757
0
    result = CURLE_WEIRD_SERVER_REPLY;
758
0
    goto end;
759
734
  }
760
734
end:
761
734
  return result;
762
734
}
763
764
static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
765
1.15k
{
766
1.15k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
767
1.15k
  CURLcode result = CURLE_OK;
768
1.15k
  *done = FALSE; /* unconditionally */
769
770
1.15k
  if(!mq)
771
0
    return CURLE_FAILED_INIT;
772
1.15k
  mq->lastTime = *Curl_pgrs_now(data);
773
1.15k
  mq->pingsent = FALSE;
774
775
1.15k
  result = mqtt_connect(data);
776
1.15k
  if(result) {
777
10
    failf(data, "Error %d sending MQTT CONNECT request", result);
778
10
    return result;
779
10
  }
780
1.14k
  mqstate(data, MQTT_FIRST, MQTT_CONNACK);
781
1.14k
  return CURLE_OK;
782
1.15k
}
783
784
static CURLcode mqtt_done(struct Curl_easy *data,
785
                          CURLcode status, bool premature)
786
1.15k
{
787
1.15k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
788
1.15k
  (void)status;
789
1.15k
  (void)premature;
790
1.15k
  if(mq) {
791
1.15k
    curlx_dyn_free(&mq->sendbuf);
792
1.15k
    curlx_dyn_free(&mq->recvbuf);
793
1.15k
  }
794
1.15k
  return CURLE_OK;
795
1.15k
}
796
797
/* we ping regularly to avoid being disconnected by the server */
798
static CURLcode mqtt_ping(struct Curl_easy *data)
799
9.60k
{
800
9.60k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
801
9.60k
  CURLcode result = CURLE_OK;
802
9.60k
  struct connectdata *conn = data->conn;
803
9.60k
  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
804
805
9.60k
  if(!mqtt || !mq)
806
0
    return CURLE_FAILED_INIT;
807
808
9.60k
  if(mqtt->state == MQTT_FIRST &&
809
8.67k
     !mq->pingsent &&
810
8.50k
     data->set.upkeep_interval_ms > 0) {
811
8.43k
    struct curltime t = *Curl_pgrs_now(data);
812
8.43k
    timediff_t diff = curlx_ptimediff_ms(&t, &mq->lastTime);
813
814
8.43k
    if(diff > data->set.upkeep_interval_ms) {
815
      /* 0xC0 is PINGREQ, and 0x00 is remaining length */
816
46
      unsigned char packet[2] = { 0xC0, 0x00 };
817
46
      size_t packetlen = sizeof(packet);
818
819
46
      result = mqtt_send(data, (char *)packet, packetlen);
820
46
      if(!result) {
821
46
        mq->pingsent = TRUE;
822
46
      }
823
46
      infof(data, "mqtt_ping: sent ping request.");
824
46
    }
825
8.43k
  }
826
9.60k
  return result;
827
9.60k
}
828
829
static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
830
9.60k
{
831
9.60k
  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
832
9.60k
  CURLcode result = CURLE_OK;
833
9.60k
  size_t nread;
834
9.60k
  unsigned char recvbyte;
835
9.60k
  struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN);
836
837
9.60k
  if(!mqtt || !mq)
838
0
    return CURLE_FAILED_INIT;
839
840
9.60k
  *done = FALSE;
841
842
9.60k
  if(curlx_dyn_len(&mq->sendbuf)) {
843
    /* send the remainder of an outgoing packet */
844
0
    result = mqtt_send(data, curlx_dyn_ptr(&mq->sendbuf),
845
0
                       curlx_dyn_len(&mq->sendbuf));
846
0
    if(result)
847
0
      return result;
848
0
  }
849
850
9.60k
  result = mqtt_ping(data);
851
9.60k
  if(result)
852
0
    return result;
853
854
9.60k
  infof(data, "mqtt_doing: state [%d]", (int)mqtt->state);
855
9.60k
  switch(mqtt->state) {
856
8.67k
  case MQTT_FIRST:
857
    /* Read the initial byte only */
858
8.67k
    result = Curl_xfer_recv(data, (char *)&mq->firstbyte, 1, &nread);
859
8.67k
    if(result)
860
405
      break;
861
8.27k
    else if(!nread) {
862
619
      failf(data, "Connection disconnected");
863
619
      *done = TRUE;
864
619
      result = CURLE_RECV_ERROR;
865
619
      break;
866
619
    }
867
7.65k
    Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1);
868
869
    /* we received something */
870
7.65k
    mq->lastTime = *Curl_pgrs_now(data);
871
872
    /* remember the first byte */
873
7.65k
    mq->npacket = 0;
874
7.65k
    mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
875
7.65k
    FALLTHROUGH();
876
7.66k
  case MQTT_REMAINING_LENGTH:
877
9.37k
    do {
878
9.37k
      result = Curl_xfer_recv(data, (char *)&recvbyte, 1, &nread);
879
9.37k
      if(result || !nread)
880
78
        break;
881
9.30k
      Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&recvbyte, 1);
882
9.30k
      mq->pkt_hd[mq->npacket++] = recvbyte;
883
9.30k
    } while((recvbyte & 0x80) && (mq->npacket < 4));
884
7.66k
    if(!result && nread && (recvbyte & 0x80))
885
      /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
886
         127 * 128^3 bytes. server tried to send more */
887
8
      result = CURLE_WEIRD_SERVER_REPLY;
888
7.66k
    if(result)
889
19
      break;
890
7.64k
    if(mqtt_decode_len(&mq->remaining_length, mq->pkt_hd, mq->npacket)) {
891
0
      result = CURLE_WEIRD_SERVER_REPLY;
892
0
      break;
893
0
    }
894
7.64k
    mq->npacket = 0;
895
7.64k
    if(mq->remaining_length) {
896
2.60k
      mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
897
2.60k
      break;
898
2.60k
    }
899
5.03k
    mqstate(data, MQTT_FIRST, MQTT_FIRST);
900
901
5.03k
    if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
902
25
      infof(data, "Got DISCONNECT");
903
25
      *done = TRUE;
904
25
    }
905
906
    /* ping response */
907
5.03k
    if(mq->firstbyte == MQTT_MSG_PINGRESP) {
908
547
      infof(data, "Received ping response.");
909
547
      mq->pingsent = FALSE;
910
547
      mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
911
547
    }
912
5.03k
    break;
913
100
  case MQTT_CONNACK:
914
100
    result = mqtt_verify_connack(data);
915
100
    if(result)
916
92
      break;
917
918
8
    if(data->state.httpreq == HTTPREQ_POST) {
919
3
      result = mqtt_publish(data);
920
3
      if(!result) {
921
1
        result = mqtt_disconnect(data);
922
1
        *done = TRUE;
923
1
      }
924
3
      mqtt->nextstate = MQTT_FIRST;
925
3
    }
926
5
    else {
927
5
      result = mqtt_subscribe(data);
928
5
      if(!result) {
929
4
        mqstate(data, MQTT_FIRST, MQTT_SUBACK);
930
4
      }
931
5
    }
932
8
    break;
933
934
3
  case MQTT_SUBACK:
935
519
  case MQTT_PUBWAIT:
936
734
  case MQTT_PUB_REMAIN:
937
734
    result = mqtt_read_publish(data, done);
938
734
    break;
939
940
86
  default:
941
86
    failf(data, "State not handled yet");
942
86
    *done = TRUE;
943
86
    break;
944
9.60k
  }
945
946
9.60k
  if(result == CURLE_AGAIN)
947
473
    result = CURLE_OK;
948
9.60k
  return result;
949
9.60k
}
950
951
#ifdef USE_SSL
952
953
static CURLcode mqtts_connecting(struct Curl_easy *data, bool *done)
954
0
{
955
0
  struct connectdata *conn = data->conn;
956
0
  CURLcode result;
957
958
0
  result = Curl_conn_connect(data, FIRSTSOCKET, TRUE, done);
959
0
  if(result)
960
0
    connclose(conn, "Failed TLS connection");
961
0
  return result;
962
0
}
963
964
/*
965
 * MQTTS protocol.
966
 */
967
static const struct Curl_protocol Curl_protocol_mqtts = {
968
  mqtt_setup_conn,                    /* setup_connection */
969
  mqtt_do,                            /* do_it */
970
  mqtt_done,                          /* done */
971
  ZERO_NULL,                          /* do_more */
972
  ZERO_NULL,                          /* connect_it */
973
  mqtts_connecting,                   /* connecting */
974
  mqtt_doing,                         /* doing */
975
  ZERO_NULL,                          /* proto_pollset */
976
  mqtt_pollset,                       /* doing_pollset */
977
  ZERO_NULL,                          /* domore_pollset */
978
  ZERO_NULL,                          /* perform_pollset */
979
  ZERO_NULL,                          /* disconnect */
980
  ZERO_NULL,                          /* write_resp */
981
  ZERO_NULL,                          /* write_resp_hd */
982
  ZERO_NULL,                          /* connection_check */
983
  ZERO_NULL,                          /* attach connection */
984
  ZERO_NULL,                          /* follow */
985
};
986
987
#endif
988
989
/*
990
 * MQTT protocol.
991
 */
992
static const struct Curl_protocol Curl_protocol_mqtt = {
993
  mqtt_setup_conn,                    /* setup_connection */
994
  mqtt_do,                            /* do_it */
995
  mqtt_done,                          /* done */
996
  ZERO_NULL,                          /* do_more */
997
  ZERO_NULL,                          /* connect_it */
998
  ZERO_NULL,                          /* connecting */
999
  mqtt_doing,                         /* doing */
1000
  ZERO_NULL,                          /* proto_pollset */
1001
  mqtt_pollset,                       /* doing_pollset */
1002
  ZERO_NULL,                          /* domore_pollset */
1003
  ZERO_NULL,                          /* perform_pollset */
1004
  ZERO_NULL,                          /* disconnect */
1005
  ZERO_NULL,                          /* write_resp */
1006
  ZERO_NULL,                          /* write_resp_hd */
1007
  ZERO_NULL,                          /* connection_check */
1008
  ZERO_NULL,                          /* attach connection */
1009
  ZERO_NULL,                          /* follow */
1010
};
1011
1012
#endif /* CURL_DISABLE_MQTT */
1013
1014
const struct Curl_scheme Curl_scheme_mqtts = {
1015
  "mqtts",                            /* scheme */
1016
#if defined(CURL_DISABLE_MQTT) || !defined(USE_SSL)
1017
  ZERO_NULL,
1018
#else
1019
  &Curl_protocol_mqtts,
1020
#endif
1021
  CURLPROTO_MQTTS,                    /* protocol */
1022
  CURLPROTO_MQTT,                     /* family */
1023
  PROTOPT_SSL,                        /* flags */
1024
  PORT_MQTTS,                         /* defport */
1025
};
1026
1027
/*
1028
 * MQTT protocol.
1029
 */
1030
const struct Curl_scheme Curl_scheme_mqtt = {
1031
  "mqtt",                             /* scheme */
1032
#ifdef CURL_DISABLE_MQTT
1033
  ZERO_NULL,
1034
#else
1035
  &Curl_protocol_mqtt,
1036
#endif
1037
  CURLPROTO_MQTT,                     /* protocol */
1038
  CURLPROTO_MQTT,                     /* family */
1039
  PROTOPT_NONE,                       /* flags */
1040
  PORT_MQTT,                          /* defport */
1041
};