Coverage Report

Created: 2025-07-31 06:22

/src/wolfmqtt/src/mqtt_client.c
Line
Count
Source (jump to first uncovered line)
1
/* mqtt_client.c
2
 *
3
 * Copyright (C) 2006-2025 wolfSSL Inc.
4
 *
5
 * This file is part of wolfMQTT.
6
 *
7
 * wolfMQTT is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * wolfMQTT is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA
20
 */
21
22
/* Include the autoconf generated config.h */
23
#ifdef HAVE_CONFIG_H
24
    #include <config.h>
25
#endif
26
27
#include "wolfmqtt/mqtt_client.h"
28
29
/* DOCUMENTED BUILD OPTIONS:
30
 *
31
 * WOLFMQTT_MULTITHREAD: Enables multi-thread support with mutex protection on
32
 *  client struct, write and read. When a pending response is needed its added
33
 *  to a linked list and if another thread reads the expected response it is
34
 *  flagged, so the other thread knows it completed.
35
 *
36
 * WOLFMQTT_NONBLOCK: Enabled transport support for returning WANT READ/WRITE,
37
 *  which becomes WOLFMQTT_CODE_CONTINUE. This prevents blocking if the
38
 *  transport (socket) has no data.
39
 *
40
 * WOLFMQTT_V5: Enables MQTT v5.0 support
41
 *
42
 * WOLFMQTT_ALLOW_NODATA_UNLOCK: Used with multi-threading and non-blocking to
43
 *   allow unlock if no data was sent/received. Note the TLS stack typically
44
 *   requires an attempt to write to continue with same write, not different.
45
 *   By default if we attempt a write we keep the mutex locked and return
46
 *   MQTT_CODE_CONTINUE
47
 *
48
 * WOLFMQTT_USER_THREADING: Allows custom mutex functions to be defined by the
49
 *  user. Example: wm_SemInit
50
 *
51
 * WOLFMQTT_DEBUG_CLIENT: Enables verbose PRINTF for the client code.
52
 */
53
54
55
/* Private functions */
56
57
/* forward declarations */
58
static int MqttClient_Publish_ReadPayload(MqttClient* client,
59
    MqttPublish* publish, int timeout_ms);
60
#if !defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_NONBLOCK)
61
static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg);
62
#endif
63
64
65
#ifdef WOLFMQTT_MULTITHREAD
66
67
#ifdef WOLFMQTT_USER_THREADING
68
69
    /* User will supply their own semaphore functions.
70
     * int wm_SemInit(wm_Sem *s)
71
     * int wm_SemFree(wm_Sem *s)
72
     * int wm_SemLock(wm_Sem *s)
73
     * int wm_SemUnlock(wm_Sem *s)
74
     */
75
76
#elif defined(__MACH__)
77
78
    /* Apple style dispatch semaphore */
79
    int wm_SemInit(wm_Sem *s) {
80
        /* dispatch_release() fails hard, with Trace/BPT trap signal, if the
81
         * sem's internal count is less than the value passed in with
82
         * dispatch_semaphore_create().  work around this by initializing
83
         * with 0, then incrementing it afterwards.
84
         */
85
        s->sem = dispatch_semaphore_create(0);
86
        if (s->sem == NULL)
87
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_MEMORY);
88
        if (dispatch_semaphore_signal(s->sem) < 0) {
89
            dispatch_release(s->sem);
90
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
91
        }
92
93
        return 0;
94
    }
95
    int wm_SemFree(wm_Sem *s) {
96
        if ((s == NULL) ||
97
            (s->sem == NULL))
98
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
99
        dispatch_release(s->sem);
100
        s->sem = NULL;
101
        return 0;
102
    }
103
    int wm_SemLock(wm_Sem *s) {
104
        dispatch_semaphore_wait(s->sem, DISPATCH_TIME_FOREVER);
105
        return 0;
106
    }
107
    int wm_SemUnlock(wm_Sem *s){
108
        dispatch_semaphore_signal(s->sem);
109
        return 0;
110
    }
111
#elif defined(WOLFMQTT_POSIX_SEMAPHORES)
112
    /* Posix style semaphore */
113
    int wm_SemInit(wm_Sem *s) {
114
    #ifndef WOLFMQTT_NO_COND_SIGNAL
115
        s->lockCount = 0;
116
        pthread_cond_init(&s->cond, NULL);
117
    #endif
118
        pthread_mutex_init(&s->mutex, NULL);
119
        return 0;
120
    }
121
    int wm_SemFree(wm_Sem *s) {
122
        pthread_mutex_destroy(&s->mutex);
123
    #ifndef WOLFMQTT_NO_COND_SIGNAL
124
        pthread_cond_destroy(&s->cond);
125
    #endif
126
        return 0;
127
    }
128
    int wm_SemLock(wm_Sem *s) {
129
        pthread_mutex_lock(&s->mutex);
130
    #ifndef WOLFMQTT_NO_COND_SIGNAL
131
        while (s->lockCount > 0)
132
            pthread_cond_wait(&s->cond, &s->mutex);
133
        s->lockCount++;
134
        pthread_mutex_unlock(&s->mutex);
135
    #endif
136
        return 0;
137
    }
138
    int wm_SemUnlock(wm_Sem *s) {
139
    #ifndef WOLFMQTT_NO_COND_SIGNAL
140
        pthread_mutex_lock(&s->mutex);
141
        if (s->lockCount > 0) {
142
            s->lockCount--;
143
            pthread_cond_signal(&s->cond);
144
        }
145
    #endif
146
        pthread_mutex_unlock(&s->mutex);
147
        return 0;
148
    }
149
#elif defined(FREERTOS)
150
    /* FreeRTOS binary semaphore */
151
    int wm_SemInit(wm_Sem *s) {
152
        *s = xSemaphoreCreateBinary();
153
        xSemaphoreGive(*s);
154
        return 0;
155
    }
156
    int wm_SemFree(wm_Sem *s) {
157
        vSemaphoreDelete(*s);
158
        *s = NULL;
159
        return 0;
160
    }
161
    int wm_SemLock(wm_Sem *s) {
162
        xSemaphoreTake(*s, portMAX_DELAY);
163
        return 0;
164
    }
165
    int wm_SemUnlock(wm_Sem *s) {
166
        xSemaphoreGive(*s);
167
        return 0;
168
    }
169
#elif defined(USE_WINDOWS_API)
170
    /* Windows semaphore object */
171
    int wm_SemInit(wm_Sem *s) {
172
        *s = CreateSemaphoreW( NULL, 1, 1, NULL);
173
        return 0;
174
    }
175
    int wm_SemFree(wm_Sem *s) {
176
        CloseHandle(*s);
177
        *s = NULL;
178
        return 0;
179
    }
180
    int wm_SemLock(wm_Sem *s) {
181
        WaitForSingleObject(*s, INFINITE);
182
        return 0;
183
    }
184
    int wm_SemUnlock(wm_Sem *s) {
185
        ReleaseSemaphore(*s, 1, NULL);
186
        return 0;
187
    }
188
189
#elif defined(THREADX)
190
    /* ThreadX semaphore */
191
    int wm_SemInit(wm_Sem *s) {
192
        if (tx_semaphore_create(s, NULL, 1) != TX_SUCCESS) {
193
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
194
        }
195
        return 0;
196
    }
197
    int wm_SemFree(wm_Sem *s) {
198
        if (tx_semaphore_delete(s) != TX_SUCCESS) {
199
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
200
        }
201
        return 0;
202
    }
203
204
    static UINT semstatus;
205
    int wm_SemLock(wm_Sem *s) {
206
        semstatus = tx_semaphore_get(s, TX_WAIT_FOREVER);
207
        if (semstatus != TX_SUCCESS) {
208
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
209
        }
210
        return 0;
211
    }
212
    int wm_SemUnlock(wm_Sem *s) {
213
        if (tx_semaphore_put(s) != TX_SUCCESS) {
214
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
215
        }
216
        return 0;
217
    }
218
#endif /* MUTEX */
219
#endif /* WOLFMQTT_MULTITHREAD */
220
221
static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat)
222
7.31k
{
223
7.31k
    int rc = MQTT_CODE_SUCCESS;
224
225
7.31k
#if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK)
226
  #ifdef WOLFMQTT_DEBUG_CLIENT
227
    if (stat->isWriteActive) {
228
        MQTT_TRACE_MSG("Warning, send already locked!");
229
        rc = MQTT_CODE_ERROR_SYSTEM;
230
    }
231
  #endif
232
7.31k
  #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK
233
    /* detect if a write is already in progress */
234
    #ifdef WOLFMQTT_MULTITHREAD
235
    if (wm_SemLock(&client->lockClient) == 0)
236
    #endif
237
7.31k
    {
238
7.31k
        if (client->write.isActive) {
239
0
            MQTT_TRACE_MSG("Partial write in progress!");
240
0
            rc = MQTT_CODE_CONTINUE; /* can't write yet */
241
0
        }
242
    #ifdef WOLFMQTT_MULTITHREAD
243
        wm_SemUnlock(&client->lockClient);
244
    #endif
245
7.31k
    }
246
7.31k
  #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */
247
7.31k
    if (rc != MQTT_CODE_SUCCESS) {
248
0
        return rc;
249
0
    }
250
7.31k
#endif
251
252
#ifdef WOLFMQTT_MULTITHREAD
253
    rc = wm_SemLock(&client->lockSend);
254
#endif
255
7.31k
    if (rc == MQTT_CODE_SUCCESS) {
256
7.31k
        stat->isWriteActive = 1;
257
258
    #ifdef WOLFMQTT_MULTITHREAD
259
        if (wm_SemLock(&client->lockClient) == 0)
260
    #endif
261
7.31k
        {
262
7.31k
            client->write.isActive = 1;
263
        #ifdef WOLFMQTT_MULTITHREAD
264
            wm_SemUnlock(&client->lockClient);
265
        #endif
266
7.31k
        }
267
268
7.31k
        MQTT_TRACE_MSG("lockSend");
269
7.31k
    }
270
271
7.31k
    return rc;
272
7.31k
}
273
static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat)
274
7.31k
{
275
#ifdef WOLFMQTT_DEBUG_CLIENT
276
    if (!stat->isWriteActive) {
277
        MQTT_TRACE_MSG("Warning, send not locked!");
278
        return;
279
    }
280
#endif
281
282
#ifdef WOLFMQTT_MULTITHREAD
283
    if (wm_SemLock(&client->lockClient) == 0)
284
#endif
285
7.31k
    {
286
        /* reset write */
287
7.31k
        XMEMSET(&client->write, 0, sizeof(client->write));
288
    #ifdef WOLFMQTT_MULTITHREAD
289
        wm_SemUnlock(&client->lockClient);
290
    #endif
291
7.31k
    }
292
293
7.31k
    if (stat->isWriteActive) {
294
7.31k
        MQTT_TRACE_MSG("unlockSend");
295
7.31k
        stat->isWriteActive = 0;
296
    #ifdef WOLFMQTT_MULTITHREAD
297
        wm_SemUnlock(&client->lockSend);
298
    #endif
299
7.31k
    }
300
7.31k
}
301
302
static int MqttReadStart(MqttClient* client, MqttMsgStat* stat)
303
9.08k
{
304
9.08k
    int rc = MQTT_CODE_SUCCESS;
305
306
9.08k
#if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK)
307
  #ifdef WOLFMQTT_DEBUG_CLIENT
308
    if (stat->isReadActive) {
309
        MQTT_TRACE_MSG("Warning, recv already locked!");
310
        rc = MQTT_CODE_ERROR_SYSTEM;
311
    }
312
  #endif /* WOLFMQTT_DEBUG_CLIENT */
313
9.08k
  #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK
314
    /* detect if a read is already in progress */
315
    #ifdef WOLFMQTT_MULTITHREAD
316
    if (wm_SemLock(&client->lockClient) == 0)
317
    #endif
318
9.08k
    {
319
9.08k
        if (client->read.isActive) {
320
0
            MQTT_TRACE_MSG("Partial read in progress!");
321
0
            rc = MQTT_CODE_CONTINUE; /* can't read yet */
322
0
        }
323
    #ifdef WOLFMQTT_MULTITHREAD
324
        wm_SemUnlock(&client->lockClient);
325
    #endif
326
9.08k
    }
327
9.08k
  #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */
328
9.08k
    if (rc != MQTT_CODE_SUCCESS) {
329
0
        return rc;
330
0
    }
331
9.08k
#endif /* WOLFMQTT_DEBUG_CLIENT || !WOLFMQTT_ALLOW_NODATA_UNLOCK */
332
333
#ifdef WOLFMQTT_MULTITHREAD
334
    rc = wm_SemLock(&client->lockRecv);
335
#endif
336
9.08k
    if (rc == MQTT_CODE_SUCCESS) {
337
9.08k
        stat->isReadActive = 1;
338
339
    #ifdef WOLFMQTT_MULTITHREAD
340
        if (wm_SemLock(&client->lockClient) == 0)
341
    #endif
342
9.08k
        {
343
            /* mark read active */
344
9.08k
            client->read.isActive = 1;
345
346
            /* reset the packet state used by MqttPacket_Read */
347
9.08k
            client->packet.stat = MQTT_PK_BEGIN;
348
349
        #ifdef WOLFMQTT_MULTITHREAD
350
            wm_SemUnlock(&client->lockClient);
351
        #endif
352
9.08k
        }
353
354
9.08k
        MQTT_TRACE_MSG("lockRecv");
355
9.08k
    }
356
357
9.08k
    return rc;
358
9.08k
}
359
static void MqttReadStop(MqttClient* client, MqttMsgStat* stat)
360
18.3k
{
361
#ifdef WOLFMQTT_DEBUG_CLIENT
362
    if (!stat->isReadActive) {
363
        MQTT_TRACE_MSG("Warning, recv not locked!");
364
        return;
365
    }
366
#endif
367
368
#ifdef WOLFMQTT_MULTITHREAD
369
    if (wm_SemLock(&client->lockClient) == 0)
370
#endif
371
18.3k
    {
372
        /* reset read */
373
18.3k
        XMEMSET(&client->read, 0, sizeof(client->read));
374
    #ifdef WOLFMQTT_MULTITHREAD
375
        wm_SemUnlock(&client->lockClient);
376
    #endif
377
18.3k
    }
378
379
18.3k
    if (stat->isReadActive) {
380
9.08k
        MQTT_TRACE_MSG("unlockRecv");
381
9.08k
        stat->isReadActive = 0;
382
    #ifdef WOLFMQTT_MULTITHREAD
383
        wm_SemUnlock(&client->lockRecv);
384
    #endif
385
9.08k
    }
386
18.3k
}
387
388
#ifdef WOLFMQTT_MULTITHREAD
389
390
/* These RespList functions assume caller has locked client->lockClient mutex */
391
int MqttClient_RespList_Add(MqttClient *client,
392
    MqttPacketType packet_type, word16 packet_id, MqttPendResp *newResp,
393
    void *packet_obj)
394
{
395
    MqttPendResp *tmpResp;
396
397
    if (client == NULL)
398
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
399
400
#ifdef WOLFMQTT_DEBUG_CLIENT
401
    PRINTF("PendResp Add: %p, Type %s (%d), ID %d",
402
        newResp, MqttPacket_TypeDesc(packet_type), packet_type, packet_id);
403
#endif
404
405
    /* verify newResp is not already in the list */
406
    for (tmpResp = client->firstPendResp;
407
         tmpResp != NULL;
408
         tmpResp = tmpResp->next)
409
    {
410
        if (tmpResp == newResp) {
411
        #ifdef WOLFMQTT_DEBUG_CLIENT
412
            PRINTF("Pending Response already in list!");
413
        #endif
414
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
415
        }
416
    }
417
418
    /* Initialize new response */
419
    XMEMSET(newResp, 0, sizeof(MqttPendResp));
420
    newResp->packet_id = packet_id;
421
    newResp->packet_type = packet_type;
422
    /* opaque pointer to struct based on type */
423
    newResp->packet_obj = packet_obj;
424
425
    if (client->lastPendResp == NULL) {
426
        /* This is the only list item */
427
        client->firstPendResp = newResp;
428
        client->lastPendResp = newResp;
429
    }
430
    else {
431
        /* Append to end of list */
432
        newResp->prev = client->lastPendResp;
433
        client->lastPendResp->next = newResp;
434
        client->lastPendResp = newResp;
435
    }
436
    return MQTT_CODE_SUCCESS;
437
}
438
439
void MqttClient_RespList_Remove(MqttClient *client, MqttPendResp *rmResp)
440
{
441
    MqttPendResp *tmpResp;
442
443
    if (client == NULL)
444
        return;
445
446
#ifdef WOLFMQTT_DEBUG_CLIENT
447
    PRINTF("PendResp Remove: %p", rmResp);
448
#endif
449
450
    /* Find the response entry */
451
    for (tmpResp = client->firstPendResp;
452
         tmpResp != NULL;
453
         tmpResp = tmpResp->next)
454
    {
455
        if (tmpResp == rmResp) {
456
            break;
457
        }
458
    }
459
    if (tmpResp) {
460
        /* Fix up the first and last pointers */
461
        if (client->firstPendResp == tmpResp) {
462
            client->firstPendResp = tmpResp->next;
463
        }
464
        if (client->lastPendResp == tmpResp) {
465
            client->lastPendResp = tmpResp->prev;
466
        }
467
468
        /* Remove the entry from the list */
469
        if (tmpResp->next != NULL) {
470
            tmpResp->next->prev = tmpResp->prev;
471
        }
472
        if (tmpResp->prev != NULL) {
473
            tmpResp->prev->next = tmpResp->next;
474
        }
475
    }
476
#ifdef WOLFMQTT_DEBUG_CLIENT
477
    else {
478
        PRINTF("\tPendResp not found");
479
    }
480
#endif
481
}
482
483
/* return codes: 0=not found, 1=found */
484
int MqttClient_RespList_Find(MqttClient *client,
485
    MqttPacketType packet_type, word16 packet_id, MqttPendResp **retResp)
486
{
487
    int rc = 0;
488
    MqttPendResp *tmpResp;
489
490
    if (client == NULL)
491
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
492
493
#ifdef WOLFMQTT_DEBUG_CLIENT
494
    #ifdef WOLFMQTT_NONBLOCK
495
    if (client->lastRc != MQTT_CODE_CONTINUE)
496
    #endif
497
    {
498
        PRINTF("PendResp Find: Type %s (%d), ID %d",
499
            MqttPacket_TypeDesc(packet_type), packet_type, packet_id);
500
    }
501
#endif
502
503
    if (retResp)
504
        *retResp = NULL; /* clear */
505
506
    /* Find pending response entry */
507
    for (tmpResp = client->firstPendResp;
508
         tmpResp != NULL;
509
         tmpResp = tmpResp->next)
510
    {
511
        if (packet_type == tmpResp->packet_type &&
512
           (packet_id == tmpResp->packet_id))
513
        {
514
        #ifdef WOLFMQTT_DEBUG_CLIENT
515
            #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_DEBUG_CLIENT)
516
            if (client->lastRc != MQTT_CODE_CONTINUE)
517
            #endif
518
            {
519
            PRINTF("PendResp Found: %p, Type %s (%d), ID %d, InProc %d, Done %d",
520
                tmpResp, MqttPacket_TypeDesc(tmpResp->packet_type),
521
                tmpResp->packet_type, tmpResp->packet_id,
522
                tmpResp->packetProcessing, tmpResp->packetDone);
523
            }
524
        #endif
525
526
            if (retResp)
527
                *retResp = tmpResp;
528
            rc = 1;
529
            break;
530
        }
531
    }
532
    return rc;
533
}
534
#endif /* WOLFMQTT_MULTITHREAD */
535
536
#ifdef WOLFMQTT_V5
537
static int Handle_Props(MqttClient* client, MqttProp* props, byte use_cb,
538
                        byte free_props)
539
12.3k
{
540
12.3k
    int rc = MQTT_CODE_SUCCESS;
541
542
    /* If no properties, just return */
543
12.3k
    if (props != NULL) {
544
151
    #ifdef WOLFMQTT_PROPERTY_CB
545
        /* Check for properties set by the server */
546
151
        if ((use_cb == 1) && (client->property_cb != NULL)) {
547
            /* capture error if returned */
548
0
            int rc_err = client->property_cb(client, props,
549
0
                    client->property_ctx);
550
0
            if (rc_err < 0) {
551
0
                rc = rc_err;
552
0
            }
553
0
        }
554
    #else
555
        (void)client;
556
        (void)use_cb;
557
    #endif
558
151
        if (free_props) {
559
            /* Free the properties */
560
151
            MqttProps_Free(props);
561
151
        }
562
151
    }
563
12.3k
    return rc;
564
12.3k
}
565
#endif
566
567
568
/* Returns length decoded or error (as negative) */
569
/*! \brief      Take a received MQTT packet and try and decode it
570
 *  \param      client       MQTT client context
571
 *  \param      rx_buf       Incoming buffer data
572
 *  \param      rx_len       Incoming buffer length
573
 *  \param      p_decode     Opaque pointer to packet structure based on type
574
 *  \param      ppacket_type Decoded packet type
575
 *  \param      ppacket_qos  Decoded QoS level
576
 *  \param      ppacket_id   Decoded packet id
577
 *  \param      doProps      True: Call Handle_Props to free prop struct
578
579
 *  \return     Returns length decoded or error (as negative) MQTT_CODE_ERROR_*
580
                (see enum MqttPacketResponseCodes)
581
 */
582
static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf,
583
    word32 rx_len, void *packet_obj, MqttPacketType* ppacket_type,
584
    MqttQoS* ppacket_qos, word16* ppacket_id, int doProps)
585
15.7k
{
586
15.7k
    int rc = MQTT_CODE_SUCCESS;
587
15.7k
    MqttPacket* header;
588
15.7k
    MqttPacketType packet_type;
589
15.7k
    MqttQoS packet_qos;
590
15.7k
    word16 packet_id = 0;
591
592
    /* must have rx buffer with at least 2 byes for header */
593
15.7k
    if (rx_buf == NULL || rx_len < MQTT_PACKET_HEADER_MIN_SIZE) {
594
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
595
0
    }
596
597
    /* Decode header */
598
15.7k
    header = (MqttPacket*)rx_buf;
599
15.7k
    packet_type = (MqttPacketType)MQTT_PACKET_TYPE_GET(header->type_flags);
600
15.7k
    if (ppacket_type) {
601
15.7k
        *ppacket_type = packet_type;
602
15.7k
    }
603
15.7k
    packet_qos = (MqttQoS)MQTT_PACKET_FLAGS_GET_QOS(header->type_flags);
604
15.7k
    if (ppacket_qos) {
605
15.7k
        *ppacket_qos = packet_qos;
606
15.7k
    }
607
608
    /* Decode packet specific data (if requested) */
609
15.7k
    if (ppacket_id || packet_obj) {
610
15.7k
        switch (packet_type) {
611
1.69k
        case MQTT_PACKET_TYPE_CONNECT_ACK:
612
1.69k
        {
613
1.69k
            MqttConnectAck connect_ack, *p_connect_ack = &connect_ack;
614
1.69k
            if (packet_obj) {
615
904
                p_connect_ack = (MqttConnectAck*)packet_obj;
616
904
            }
617
792
            else {
618
792
                XMEMSET(p_connect_ack, 0, sizeof(MqttConnectAck));
619
792
            }
620
1.69k
        #ifdef WOLFMQTT_V5
621
1.69k
            p_connect_ack->protocol_level = client->protocol_level;
622
1.69k
        #endif
623
1.69k
            rc = MqttDecode_ConnectAck(rx_buf, rx_len, p_connect_ack);
624
1.69k
        #ifdef WOLFMQTT_V5
625
1.69k
            if (rc >= 0 && doProps) {
626
1.62k
                int tmp = Handle_Props(client, p_connect_ack->props,
627
1.62k
                                       (packet_obj != NULL), 1);
628
1.62k
                p_connect_ack->props = NULL;
629
1.62k
                if (tmp != MQTT_CODE_SUCCESS) {
630
0
                    rc = tmp;
631
0
                }
632
1.62k
            }
633
1.69k
        #endif
634
1.69k
            break;
635
0
        }
636
3.02k
        case MQTT_PACKET_TYPE_PUBLISH:
637
3.02k
        {
638
3.02k
            MqttPublish publish, *p_publish;
639
3.02k
            if (packet_obj) {
640
1.66k
                p_publish = (MqttPublish*)packet_obj;
641
1.66k
            #ifdef WOLFMQTT_V5
642
                /* setting the protocol level will enable parsing of the
643
                 * properties. The properties are allocated from a list,
644
                 * so only parse if we are using a return packet object */
645
1.66k
                p_publish->protocol_level = client->protocol_level;
646
1.66k
            #endif
647
1.66k
            }
648
1.35k
            else {
649
1.35k
                p_publish = &publish;
650
1.35k
                XMEMSET(p_publish, 0, sizeof(MqttPublish));
651
1.35k
            }
652
3.02k
            rc = MqttDecode_Publish(rx_buf, rx_len, p_publish);
653
3.02k
            if (rc >= 0) {
654
2.83k
                packet_id = p_publish->packet_id;
655
2.83k
            #ifdef WOLFMQTT_V5
656
2.83k
                if (doProps) {
657
                    /* Do not free property list here. It will be freed
658
                       after the message callback. */
659
2.83k
                    int tmp = Handle_Props(client, p_publish->props,
660
2.83k
                                           (packet_obj != NULL), 0);
661
2.83k
                    if (tmp != MQTT_CODE_SUCCESS) {
662
0
                        rc = tmp;
663
0
                    }
664
2.83k
                }
665
2.83k
            #endif
666
2.83k
            }
667
3.02k
            break;
668
0
        }
669
752
        case MQTT_PACKET_TYPE_PUBLISH_ACK:
670
1.44k
        case MQTT_PACKET_TYPE_PUBLISH_REC:
671
2.36k
        case MQTT_PACKET_TYPE_PUBLISH_REL:
672
3.13k
        case MQTT_PACKET_TYPE_PUBLISH_COMP:
673
3.13k
        {
674
3.13k
            MqttPublishResp publish_resp, *p_publish_resp = &publish_resp;
675
3.13k
            if (packet_obj) {
676
1.63k
                p_publish_resp = (MqttPublishResp*)packet_obj;
677
1.63k
            }
678
1.49k
            else {
679
1.49k
                XMEMSET(p_publish_resp, 0, sizeof(MqttPublishResp));
680
1.49k
            }
681
682
3.13k
        #ifdef WOLFMQTT_V5
683
3.13k
            p_publish_resp->protocol_level = client->protocol_level;
684
3.13k
        #endif
685
3.13k
            rc = MqttDecode_PublishResp(rx_buf, rx_len, packet_type,
686
3.13k
                p_publish_resp);
687
3.13k
            if (rc >= 0) {
688
2.93k
                packet_id = p_publish_resp->packet_id;
689
2.93k
            #ifdef WOLFMQTT_V5
690
2.93k
                if (doProps) {
691
2.93k
                    int tmp = Handle_Props(client, p_publish_resp->props,
692
2.93k
                                           (packet_obj != NULL), 1);
693
2.93k
                    p_publish_resp->props = NULL;
694
2.93k
                    if (tmp != MQTT_CODE_SUCCESS) {
695
0
                        rc = tmp;
696
0
                    }
697
2.93k
                }
698
2.93k
            #endif
699
2.93k
            }
700
3.13k
            break;
701
2.36k
        }
702
1.95k
        case MQTT_PACKET_TYPE_SUBSCRIBE_ACK:
703
1.95k
        {
704
1.95k
            MqttSubscribeAck subscribe_ack, *p_subscribe_ack = &subscribe_ack;
705
1.95k
            if (packet_obj) {
706
1.03k
                p_subscribe_ack = (MqttSubscribeAck*)packet_obj;
707
1.03k
            }
708
916
            else {
709
916
                XMEMSET(p_subscribe_ack, 0, sizeof(MqttSubscribeAck));
710
916
            }
711
1.95k
        #ifdef WOLFMQTT_V5
712
1.95k
            p_subscribe_ack->protocol_level = client->protocol_level;
713
1.95k
        #endif
714
1.95k
            rc = MqttDecode_SubscribeAck(rx_buf, rx_len, p_subscribe_ack);
715
1.95k
            if (rc >= 0) {
716
1.80k
                packet_id = p_subscribe_ack->packet_id;
717
1.80k
            #ifdef WOLFMQTT_V5
718
1.80k
                if (doProps) {
719
1.80k
                    int tmp = Handle_Props(client, p_subscribe_ack->props,
720
1.80k
                                           (packet_obj != NULL), 1);
721
1.80k
                    p_subscribe_ack->props = NULL;
722
1.80k
                    if (tmp != MQTT_CODE_SUCCESS) {
723
0
                        rc = tmp;
724
0
                    }
725
1.80k
                }
726
1.80k
            #endif
727
1.80k
            }
728
1.95k
            break;
729
2.36k
        }
730
1.34k
        case MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK:
731
1.34k
        {
732
1.34k
            MqttUnsubscribeAck unsubscribe_ack,
733
1.34k
                               *p_unsubscribe_ack = &unsubscribe_ack;
734
1.34k
            if (packet_obj) {
735
724
                p_unsubscribe_ack = (MqttUnsubscribeAck*)packet_obj;
736
724
            }
737
625
            else {
738
625
                XMEMSET(p_unsubscribe_ack, 0, sizeof(MqttUnsubscribeAck));
739
625
            }
740
1.34k
        #ifdef WOLFMQTT_V5
741
1.34k
            p_unsubscribe_ack->protocol_level = client->protocol_level;
742
1.34k
        #endif
743
1.34k
            rc = MqttDecode_UnsubscribeAck(rx_buf, rx_len, p_unsubscribe_ack);
744
1.34k
            if (rc >= 0) {
745
1.16k
                packet_id = p_unsubscribe_ack->packet_id;
746
1.16k
            #ifdef WOLFMQTT_V5
747
1.16k
                if (doProps) {
748
1.16k
                    int tmp = Handle_Props(client, p_unsubscribe_ack->props,
749
1.16k
                                           (packet_obj != NULL), 1);
750
1.16k
                    p_unsubscribe_ack->props = NULL;
751
1.16k
                    if (tmp != MQTT_CODE_SUCCESS) {
752
0
                        rc = tmp;
753
0
                    }
754
1.16k
                }
755
1.16k
            #endif
756
1.16k
            }
757
1.34k
            break;
758
2.36k
        }
759
1.35k
        case MQTT_PACKET_TYPE_PING_RESP:
760
1.35k
        {
761
1.35k
            MqttPing ping, *p_ping = &ping;
762
1.35k
            if (packet_obj) {
763
714
                p_ping = (MqttPing*)packet_obj;
764
714
            }
765
642
            else {
766
642
                XMEMSET(p_ping, 0, sizeof(MqttPing));
767
642
            }
768
1.35k
            rc = MqttDecode_Ping(rx_buf, rx_len, p_ping);
769
1.35k
            break;
770
2.36k
        }
771
765
        case MQTT_PACKET_TYPE_AUTH:
772
765
        {
773
765
        #ifdef WOLFMQTT_V5
774
765
            MqttAuth auth, *p_auth = &auth;
775
765
            if (packet_obj) {
776
403
                p_auth = (MqttAuth*)packet_obj;
777
403
            }
778
362
            else {
779
362
                XMEMSET(p_auth, 0, sizeof(MqttAuth));
780
362
            }
781
765
            rc = MqttDecode_Auth(rx_buf, rx_len, p_auth);
782
765
            if (rc >= 0 && doProps) {
783
88
                int tmp = Handle_Props(client, p_auth->props,
784
88
                                       (packet_obj != NULL), 1);
785
88
                p_auth->props = NULL;
786
88
                if (tmp != MQTT_CODE_SUCCESS) {
787
0
                    rc = tmp;
788
0
                }
789
88
            }
790
        #else
791
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE);
792
        #endif /* WOLFMQTT_V5 */
793
765
            break;
794
2.36k
        }
795
2.34k
        case MQTT_PACKET_TYPE_DISCONNECT:
796
2.34k
        {
797
2.34k
        #ifdef WOLFMQTT_V5
798
2.34k
            MqttDisconnect disc, *p_disc = &disc;
799
2.34k
            if (packet_obj) {
800
1.24k
                p_disc = (MqttDisconnect*)packet_obj;
801
1.24k
            }
802
1.09k
            else {
803
1.09k
                XMEMSET(p_disc, 0, sizeof(MqttDisconnect));
804
1.09k
            }
805
2.34k
            rc = MqttDecode_Disconnect(rx_buf, rx_len, p_disc);
806
2.34k
            if (rc >= 0 && doProps) {
807
1.86k
                int tmp = Handle_Props(client, p_disc->props,
808
1.86k
                                       (packet_obj != NULL), 1);
809
1.86k
                p_disc->props = NULL;
810
1.86k
                if (tmp != MQTT_CODE_SUCCESS) {
811
0
                    rc = tmp;
812
0
                }
813
1.86k
            }
814
2.34k
            #ifdef WOLFMQTT_DISCONNECT_CB
815
            /* Call disconnect callback with reason code */
816
2.34k
            if ((packet_obj != NULL) && client->disconnect_cb) {
817
0
                client->disconnect_cb(client, p_disc->reason_code,
818
0
                    client->disconnect_ctx);
819
0
            }
820
2.34k
            #endif
821
        #else
822
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE);
823
        #endif /* WOLFMQTT_V5 */
824
2.34k
            break;
825
2.36k
        }
826
2
        case MQTT_PACKET_TYPE_CONNECT:
827
2
        case MQTT_PACKET_TYPE_SUBSCRIBE:
828
21
        case MQTT_PACKET_TYPE_UNSUBSCRIBE:
829
21
        case MQTT_PACKET_TYPE_PING_REQ:
830
21
        case MQTT_PACKET_TYPE_ANY:
831
99
        case MQTT_PACKET_TYPE_RESERVED:
832
99
        default:
833
            /* these type are only encoded by client */
834
99
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE);
835
99
            break;
836
15.7k
        } /* switch (packet_type) */
837
15.7k
    }
838
839
15.7k
    if (ppacket_id) {
840
15.7k
        *ppacket_id = packet_id;
841
15.7k
    }
842
843
15.7k
    (void)client;
844
15.7k
    (void)doProps;
845
846
#ifdef WOLFMQTT_DEBUG_CLIENT
847
    PRINTF("MqttClient_DecodePacket: Rc %d, Len %d, Type %s (%d), ID %d,"
848
            " QoS %d, doProps %d",
849
        rc, rx_len, MqttPacket_TypeDesc(packet_type), packet_type, packet_id,
850
        packet_qos, doProps);
851
#endif
852
853
15.7k
    return rc;
854
15.7k
}
855
856
static int MqttClient_HandlePacket(MqttClient* client,
857
    MqttPacketType packet_type, void *packet_obj, MqttPublishResp* resp,
858
    int timeout_ms)
859
8.49k
{
860
8.49k
    int rc = MQTT_CODE_SUCCESS;
861
8.49k
    MqttQoS packet_qos = MQTT_QOS_0;
862
8.49k
    word16 packet_id = 0;
863
864
8.49k
    if (client == NULL || packet_obj == NULL) {
865
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
866
0
    }
867
868
    /* make sure the response defaults to no ACK */
869
8.49k
    resp->packet_type = MQTT_PACKET_TYPE_RESERVED;
870
871
8.49k
    switch (packet_type)
872
8.49k
    {
873
792
        case MQTT_PACKET_TYPE_CONNECT_ACK:
874
792
        {
875
792
            rc = MqttClient_DecodePacket(client, client->rx_buf,
876
792
                client->packet.buf_len, packet_obj, &packet_type, &packet_qos,
877
792
                &packet_id, 1);
878
792
            break;
879
0
        }
880
3.19k
        case MQTT_PACKET_TYPE_PUBLISH:
881
3.19k
        {
882
3.19k
            MqttPublish* publish = (MqttPublish*)packet_obj;
883
3.19k
            if (publish->stat.read != MQTT_MSG_PAYLOAD2) {
884
3.06k
                rc = MqttClient_DecodePacket(client, client->rx_buf,
885
3.06k
                    client->packet.buf_len, packet_obj, &packet_type,
886
3.06k
                    &packet_qos, &packet_id, 1);
887
3.06k
                if (rc <= 0) {
888
1.34k
                    return rc;
889
1.34k
                }
890
3.06k
            }
891
135
            else {
892
                /* packet ID and QoS were already established */
893
135
                packet_id =  publish->packet_id;
894
135
                packet_qos = publish->qos;
895
135
            }
896
897
1.85k
            rc = MqttClient_Publish_ReadPayload(client, publish, timeout_ms);
898
1.85k
            if (rc < 0) {
899
297
                break;
900
297
            }
901
            /* Note: Getting here means the Publish Read is done */
902
1.55k
            publish->stat.read = MQTT_MSG_BEGIN; /* reset state */
903
904
1.55k
        #ifdef WOLFMQTT_V5
905
            /* Free the properties */
906
1.55k
            MqttProps_Free(publish->props);
907
1.55k
            publish->props = NULL;
908
1.55k
        #endif
909
910
            /* Handle QoS */
911
1.55k
            if (packet_qos == MQTT_QOS_0) {
912
                /* we are done, no QoS response */
913
450
                break;
914
450
            }
915
916
1.10k
        #ifdef WOLFMQTT_V5
917
            /* Copy response code in case changed by callback */
918
1.10k
            resp->reason_code = publish->resp.reason_code;
919
1.10k
        #endif
920
            /* Populate information needed for ack */
921
1.10k
            resp->packet_type = (packet_qos == MQTT_QOS_1) ?
922
358
                MQTT_PACKET_TYPE_PUBLISH_ACK :
923
1.10k
                MQTT_PACKET_TYPE_PUBLISH_REC;
924
1.10k
            resp->packet_id = packet_id;
925
1.10k
            break;
926
1.55k
        }
927
361
        case MQTT_PACKET_TYPE_PUBLISH_ACK:
928
685
        case MQTT_PACKET_TYPE_PUBLISH_REC:
929
1.11k
        case MQTT_PACKET_TYPE_PUBLISH_REL:
930
1.45k
        case MQTT_PACKET_TYPE_PUBLISH_COMP:
931
1.45k
        {
932
        #if defined(WOLFMQTT_V5) && defined(WOLFMQTT_DEBUG_CLIENT)
933
            MqttPublishResp* publish_resp = (MqttPublishResp*)packet_obj;
934
        #endif
935
1.45k
            rc = MqttClient_DecodePacket(client, client->rx_buf,
936
1.45k
                client->packet.buf_len, packet_obj, &packet_type,
937
1.45k
                &packet_qos, &packet_id, 1);
938
1.45k
            if (rc <= 0) {
939
0
                return rc;
940
0
            }
941
942
        #if defined(WOLFMQTT_V5) && defined(WOLFMQTT_DEBUG_CLIENT)
943
            PRINTF("\tPublish response: reason code %d, Type %s (%d),"
944
                    " ID %d, QoS %d",
945
                    publish_resp->reason_code,
946
                    MqttPacket_TypeDesc(packet_type),
947
                    packet_type, packet_id, packet_qos);
948
        #endif
949
950
            /* Only ACK publish Received or Release QoS levels */
951
1.45k
            if (packet_type != MQTT_PACKET_TYPE_PUBLISH_REC &&
952
1.45k
                packet_type != MQTT_PACKET_TYPE_PUBLISH_REL) {
953
697
                break;
954
697
            }
955
956
            /* Populate information needed for ack */
957
756
            resp->packet_type = packet_type+1; /* next ack */
958
756
            resp->packet_id = packet_id;
959
756
            break;
960
1.45k
        }
961
885
        case MQTT_PACKET_TYPE_SUBSCRIBE_ACK:
962
885
        {
963
885
            rc = MqttClient_DecodePacket(client, client->rx_buf,
964
885
                client->packet.buf_len, packet_obj, &packet_type, &packet_qos,
965
885
                &packet_id, 1);
966
885
            break;
967
1.45k
        }
968
573
        case MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK:
969
573
        {
970
573
            rc = MqttClient_DecodePacket(client, client->rx_buf,
971
573
                client->packet.buf_len, packet_obj, &packet_type, &packet_qos,
972
573
                &packet_id, 1);
973
573
            break;
974
1.45k
        }
975
642
        case MQTT_PACKET_TYPE_PING_RESP:
976
642
        {
977
642
            rc = MqttClient_DecodePacket(client, client->rx_buf,
978
642
                client->packet.buf_len, packet_obj, &packet_type, &packet_qos,
979
642
                &packet_id, 1);
980
642
            break;
981
1.45k
        }
982
39
        case MQTT_PACKET_TYPE_AUTH:
983
39
        {
984
39
        #ifdef WOLFMQTT_V5
985
39
            rc = MqttClient_DecodePacket(client, client->rx_buf,
986
39
                client->packet.buf_len, packet_obj, &packet_type, &packet_qos,
987
39
                &packet_id, 1);
988
        #else
989
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE);
990
        #endif
991
39
            break;
992
1.45k
        }
993
994
914
        case MQTT_PACKET_TYPE_DISCONNECT:
995
914
        {
996
914
        #ifdef WOLFMQTT_V5
997
914
            rc = MqttClient_DecodePacket(client, client->rx_buf,
998
914
                client->packet.buf_len, packet_obj, &packet_type, &packet_qos,
999
914
                &packet_id, 1);
1000
        #else
1001
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE);
1002
        #endif
1003
914
            break;
1004
1.45k
        }
1005
0
        case MQTT_PACKET_TYPE_CONNECT:
1006
0
        case MQTT_PACKET_TYPE_SUBSCRIBE:
1007
0
        case MQTT_PACKET_TYPE_UNSUBSCRIBE:
1008
0
        case MQTT_PACKET_TYPE_PING_REQ:
1009
0
        case MQTT_PACKET_TYPE_ANY:
1010
0
        case MQTT_PACKET_TYPE_RESERVED:
1011
0
        default:
1012
            /* these types are only sent from client and should not be sent
1013
             * by broker */
1014
0
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE);
1015
0
            break;
1016
8.49k
    } /* switch (packet_type) */
1017
1018
#ifdef WOLFMQTT_DEBUG_CLIENT
1019
    if (rc < 0) {
1020
        PRINTF("MqttClient_HandlePacket: Rc %d, Type %s (%d), QoS %d, ID %d",
1021
            rc, MqttPacket_TypeDesc(packet_type), packet_type, packet_qos,
1022
            packet_id);
1023
    }
1024
#endif
1025
1026
7.14k
    return rc;
1027
8.49k
}
1028
1029
static inline int MqttIsPubRespPacket(int packet_type)
1030
13.3k
{
1031
13.3k
    return (packet_type == MQTT_PACKET_TYPE_PUBLISH_ACK /* Acknowledgment */ ||
1032
13.3k
            packet_type == MQTT_PACKET_TYPE_PUBLISH_REC /* Received */ ||
1033
13.3k
            packet_type == MQTT_PACKET_TYPE_PUBLISH_REL /* Release */ ||
1034
13.3k
            packet_type == MQTT_PACKET_TYPE_PUBLISH_COMP /* Complete */);
1035
13.3k
}
1036
1037
#ifdef WOLFMQTT_MULTITHREAD
1038
/* this function will return:
1039
 * MQTT_CODE_CONTINUE indicating found, but not marked done
1040
 * MQTT_CODE_ERROR_NOT_FOUND: Not found
1041
 * Any other response is from the the packet_ret
1042
 */
1043
static int MqttClient_CheckPendResp(MqttClient *client, byte wait_type,
1044
    word16 wait_packet_id)
1045
{
1046
    int rc;
1047
    MqttPendResp *pendResp = NULL;
1048
1049
    /* Check to see if packet type and id have already completed */
1050
    rc = wm_SemLock(&client->lockClient);
1051
    if (rc == 0) {
1052
        if (MqttClient_RespList_Find(client, (MqttPacketType)wait_type,
1053
            wait_packet_id, &pendResp))
1054
        {
1055
            if ((pendResp != NULL) && (pendResp->packetDone)) {
1056
                /* pending response is already done, so return */
1057
                rc = pendResp->packet_ret;
1058
            #ifdef WOLFMQTT_DEBUG_CLIENT
1059
                PRINTF("PendResp Check Done %p: Rc %d", pendResp, rc);
1060
            #endif
1061
                MqttClient_RespList_Remove(client, pendResp);
1062
            }
1063
            else {
1064
                /* item not done */
1065
                rc = MQTT_CODE_CONTINUE;
1066
            }
1067
        }
1068
        else {
1069
            /* item not found */
1070
            rc = MQTT_CODE_ERROR_NOT_FOUND;
1071
        }
1072
        wm_SemUnlock(&client->lockClient);
1073
    }
1074
    return rc;
1075
}
1076
#endif /* WOLFMQTT_MULTITHREAD */
1077
1078
/* Helper for clearing the contents of an object buffer based on packet type */
1079
static void MqttClient_PacketReset(MqttPacketType packet_type, void* packet_obj)
1080
6.59k
{
1081
6.59k
    size_t objSz = 0;
1082
6.59k
    size_t offset = sizeof(MqttMsgStat);
1083
#ifdef WOLFMQTT_MULTITHREAD
1084
    offset += sizeof(MqttPendResp);
1085
#endif
1086
6.59k
    switch (packet_type) {
1087
0
        case MQTT_PACKET_TYPE_CONNECT:
1088
0
            objSz = sizeof(MqttConnect);
1089
0
            break;
1090
792
        case MQTT_PACKET_TYPE_CONNECT_ACK:
1091
792
            objSz = sizeof(MqttConnectAck);
1092
792
            break;
1093
1.30k
        case MQTT_PACKET_TYPE_PUBLISH:
1094
1.30k
            objSz = sizeof(MqttPublish);
1095
1.30k
            break;
1096
361
        case MQTT_PACKET_TYPE_PUBLISH_ACK:
1097
685
        case MQTT_PACKET_TYPE_PUBLISH_REC:
1098
1.11k
        case MQTT_PACKET_TYPE_PUBLISH_REL:
1099
1.45k
        case MQTT_PACKET_TYPE_PUBLISH_COMP:
1100
1.45k
            objSz = sizeof(MqttPublishResp);
1101
1.45k
            break;
1102
0
        case MQTT_PACKET_TYPE_SUBSCRIBE:
1103
0
            objSz = sizeof(MqttSubscribe);
1104
0
            break;
1105
885
        case MQTT_PACKET_TYPE_SUBSCRIBE_ACK:
1106
885
            objSz = sizeof(MqttSubscribeAck);
1107
885
            break;
1108
0
        case MQTT_PACKET_TYPE_UNSUBSCRIBE:
1109
0
            objSz = sizeof(MqttUnsubscribe);
1110
0
            break;
1111
573
        case MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK:
1112
573
            objSz = sizeof(MqttUnsubscribeAck);
1113
573
            break;
1114
0
        case MQTT_PACKET_TYPE_PING_REQ:
1115
642
        case MQTT_PACKET_TYPE_PING_RESP:
1116
642
            objSz = sizeof(MqttPing);
1117
642
            break;
1118
39
        case MQTT_PACKET_TYPE_AUTH:
1119
39
        #ifdef WOLFMQTT_V5
1120
39
            objSz = sizeof(MqttAuth);
1121
39
        #endif
1122
39
            break;
1123
914
        case MQTT_PACKET_TYPE_DISCONNECT:
1124
914
        #ifdef WOLFMQTT_V5
1125
914
            objSz = sizeof(MqttDisconnect);
1126
914
        #endif
1127
914
            break;
1128
0
        case MQTT_PACKET_TYPE_ANY:
1129
0
        case MQTT_PACKET_TYPE_RESERVED:
1130
0
        default:
1131
0
            break;
1132
6.59k
    } /* switch (packet_type) */
1133
6.59k
    if (objSz > offset) {
1134
5.95k
        XMEMSET((byte*)packet_obj + offset, 0, objSz - offset);
1135
5.95k
    }
1136
6.59k
}
1137
1138
static int MqttClient_WaitType(MqttClient *client, void *packet_obj,
1139
    byte wait_type, word16 wait_packet_id, int timeout_ms)
1140
5.42k
{
1141
5.42k
    int rc = MQTT_CODE_SUCCESS;
1142
5.42k
    word16         packet_id;
1143
5.42k
    MqttPacketType packet_type;
1144
5.42k
    MqttQoS        packet_qos = MQTT_QOS_0;
1145
#ifdef WOLFMQTT_MULTITHREAD
1146
    MqttPendResp *pendResp;
1147
#endif
1148
5.42k
    MqttMsgStat* mms_stat;
1149
5.42k
    int waitMatchFound;
1150
5.42k
    void* use_packet_obj = NULL;
1151
1152
5.42k
    if (client == NULL || packet_obj == NULL) {
1153
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
1154
0
    }
1155
1156
    /* all packet type structures must have MqttMsgStat at top */
1157
5.42k
    mms_stat = (MqttMsgStat*)packet_obj;
1158
1159
11.4k
wait_again:
1160
1161
    /* initialize variables */
1162
11.4k
    packet_id = 0;
1163
11.4k
    packet_type = MQTT_PACKET_TYPE_RESERVED;
1164
#ifdef WOLFMQTT_MULTITHREAD
1165
    pendResp = NULL;
1166
#endif
1167
11.4k
    waitMatchFound = 0;
1168
1169
#ifdef WOLFMQTT_DEBUG_CLIENT
1170
    #ifdef WOLFMQTT_NONBLOCK
1171
    if (client->lastRc != MQTT_CODE_CONTINUE)
1172
    #endif
1173
    {
1174
        PRINTF("MqttClient_WaitType: Type %s (%d), ID %d, State %d-%d",
1175
            MqttPacket_TypeDesc((MqttPacketType)wait_type),
1176
                wait_type, wait_packet_id, mms_stat->read, mms_stat->write);
1177
    }
1178
#endif
1179
1180
11.4k
    switch (mms_stat->read)
1181
11.4k
    {
1182
9.08k
        case MQTT_MSG_BEGIN:
1183
9.08k
        {
1184
        #ifdef WOLFMQTT_MULTITHREAD
1185
            /* Check to see if packet type and id have already completed */
1186
            rc = MqttClient_CheckPendResp(client, wait_type, wait_packet_id);
1187
            if (rc != MQTT_CODE_ERROR_NOT_FOUND && rc != MQTT_CODE_CONTINUE) {
1188
                return rc;
1189
            }
1190
        #endif
1191
1192
9.08k
            if ((rc = MqttReadStart(client, mms_stat)) != 0) {
1193
0
                return rc;
1194
0
            }
1195
1196
9.08k
            mms_stat->read = MQTT_MSG_WAIT;
1197
9.08k
        }
1198
9.08k
        FALL_THROUGH;
1199
1200
9.08k
        case MQTT_MSG_WAIT:
1201
9.47k
        case MQTT_MSG_HEADER:
1202
9.47k
        {
1203
            /* Wait for packet */
1204
9.47k
            rc = MqttPacket_Read(client, client->rx_buf, client->rx_buf_len,
1205
9.47k
                    timeout_ms);
1206
            /* handle failure */
1207
9.47k
            if (rc <= 0) {
1208
            #ifdef WOLFMQTT_NONBLOCK
1209
                if (rc == MQTT_CODE_CONTINUE &&
1210
                    (client->packet.stat > MQTT_PK_BEGIN ||
1211
                     client->read.total > 0)
1212
                ) {
1213
                    /* advance state, since we received some data */
1214
                    mms_stat->read = MQTT_MSG_HEADER;
1215
                }
1216
            #endif
1217
2.12k
                break;
1218
2.12k
            }
1219
1220
            /* advance state, since we received some data */
1221
7.35k
            mms_stat->read = MQTT_MSG_HEADER;
1222
1223
            /* capture length read */
1224
7.35k
            client->packet.buf_len = rc;
1225
1226
            /* Decode Packet - get type, qos and id */
1227
7.35k
            rc = MqttClient_DecodePacket(client, client->rx_buf,
1228
7.35k
                client->packet.buf_len, NULL, &packet_type, &packet_qos,
1229
7.35k
                &packet_id, 1);
1230
7.35k
            if (rc < 0) {
1231
758
                break;
1232
758
            }
1233
1234
6.59k
            MqttClient_PacketReset(packet_type, &client->msg);
1235
1236
        #ifdef WOLFMQTT_DEBUG_CLIENT
1237
            PRINTF("Read Packet: Len %d, Type %d, ID %d",
1238
                client->packet.buf_len, packet_type, packet_id);
1239
        #endif
1240
1241
            /* Ping response is special case, no payload */
1242
6.59k
            if (packet_type != MQTT_PACKET_TYPE_PING_RESP) {
1243
5.95k
                mms_stat->read = MQTT_MSG_PAYLOAD;
1244
5.95k
            }
1245
642
            else {
1246
642
                mms_stat->read = MQTT_MSG_WAIT;
1247
642
            }
1248
6.59k
        }
1249
6.59k
        FALL_THROUGH;
1250
1251
8.35k
        case MQTT_MSG_PAYLOAD:
1252
8.49k
        case MQTT_MSG_PAYLOAD2:
1253
8.49k
        {
1254
8.49k
            MqttPublishResp resp;
1255
8.49k
            MqttPacketType use_packet_type;
1256
1257
            /* Determine if we received data for this request */
1258
8.49k
            if ((wait_type == MQTT_PACKET_TYPE_ANY ||
1259
8.49k
                 wait_type == packet_type ||
1260
8.49k
                 (MqttIsPubRespPacket(packet_type) &&
1261
5.22k
                  MqttIsPubRespPacket(wait_type))) &&
1262
8.49k
                (wait_packet_id == 0 || wait_packet_id == packet_id))
1263
2.86k
            {
1264
2.86k
                use_packet_obj = packet_obj;
1265
            #ifdef WOLFMQTT_DEBUG_CLIENT
1266
                PRINTF("Using INCOMING packet_obj %p", use_packet_obj);
1267
            #endif
1268
2.86k
                if (packet_type == wait_type ||
1269
2.86k
                        wait_type == MQTT_PACKET_TYPE_ANY) {
1270
                    /* Only stop waiting when matched or waiting for "any" */
1271
2.86k
                    waitMatchFound = 1;
1272
2.86k
                }
1273
2.86k
            }
1274
5.62k
            else {
1275
            #ifdef WOLFMQTT_MULTITHREAD
1276
                rc = wm_SemLock(&client->lockClient);
1277
                if (rc != 0) {
1278
                    break; /* error */
1279
                }
1280
            #endif
1281
1282
                /* use generic packet object */
1283
5.62k
                use_packet_obj = &client->msg;
1284
            #ifdef WOLFMQTT_DEBUG_CLIENT
1285
                PRINTF("Using SHARED packet_obj %p", use_packet_obj);
1286
            #endif
1287
1288
            #ifdef WOLFMQTT_MULTITHREAD
1289
                wm_SemUnlock(&client->lockClient);
1290
            #endif
1291
5.62k
            }
1292
8.49k
            use_packet_type = packet_type;
1293
1294
        #ifdef WOLFMQTT_MULTITHREAD
1295
            /* Check to see if we have a pending response for this packet */
1296
            pendResp = NULL;
1297
            rc = wm_SemLock(&client->lockClient);
1298
            if (rc == 0) {
1299
                if (MqttClient_RespList_Find(client, packet_type, packet_id,
1300
                                                               &pendResp)) {
1301
                    /* we found packet match this incoming read packet */
1302
                    pendResp->packetProcessing = 1;
1303
                    if (pendResp->packet_obj != packet_obj) {
1304
                        use_packet_obj = pendResp->packet_obj;
1305
                        use_packet_type = pendResp->packet_type;
1306
                        /* req from another thread... not a match */
1307
                        waitMatchFound = 0;
1308
                    }
1309
                }
1310
                wm_SemUnlock(&client->lockClient);
1311
            }
1312
            else {
1313
                break; /* error */
1314
            }
1315
        #endif /* WOLFMQTT_MULTITHREAD */
1316
1317
            /* for payload state packet type is always publish */
1318
8.49k
            if (use_packet_type == MQTT_PACKET_TYPE_RESERVED &&
1319
8.49k
                    (mms_stat->read == MQTT_MSG_PAYLOAD ||
1320
1.89k
                     mms_stat->read == MQTT_MSG_PAYLOAD2))
1321
1.89k
            {
1322
1.89k
                use_packet_type = MQTT_PACKET_TYPE_PUBLISH;
1323
1.89k
            }
1324
            /* cache publish packet id and qos for MqttClient_HandlePacket payload */
1325
8.49k
            if (use_packet_type == MQTT_PACKET_TYPE_PUBLISH &&
1326
8.49k
                  mms_stat->read == MQTT_MSG_PAYLOAD && use_packet_obj != NULL)
1327
3.06k
            {
1328
3.06k
                MqttObject* obj = (MqttObject*)use_packet_obj;
1329
3.06k
                obj->publish.qos = packet_qos;
1330
3.06k
                obj->publish.packet_id = packet_id;
1331
3.06k
            }
1332
1333
            /* Perform packet handling for publish callback and QoS */
1334
8.49k
            XMEMSET(&resp, 0, sizeof(resp));
1335
8.49k
            rc = MqttClient_HandlePacket(client, use_packet_type,
1336
8.49k
                use_packet_obj, &resp, timeout_ms);
1337
1338
            /* if using the shared packet object, make sure the original
1339
             * state is correct for publish payload 2 (continued) */
1340
8.49k
            if (use_packet_obj != NULL && use_packet_obj != mms_stat &&
1341
8.49k
                    ((MqttMsgStat*)use_packet_obj)->read == MQTT_MSG_PAYLOAD2) {
1342
285
                mms_stat->read = MQTT_MSG_PAYLOAD2;
1343
285
            }
1344
1345
        #ifdef WOLFMQTT_NONBLOCK
1346
            if (rc == MQTT_CODE_CONTINUE) {
1347
                break;
1348
            }
1349
        #endif
1350
1351
            /* handle success case */
1352
8.49k
            if (rc >= 0) {
1353
6.85k
                rc = MQTT_CODE_SUCCESS;
1354
6.85k
            }
1355
1.64k
            else {
1356
                /* error, break */
1357
1.64k
                break;
1358
1.64k
            }
1359
1360
        #ifdef WOLFMQTT_MULTITHREAD
1361
            if (pendResp) {
1362
                /* Mark pending response entry done */
1363
                if (wm_SemLock(&client->lockClient) == 0) {
1364
                    pendResp->packetDone = 1;
1365
                    pendResp->packet_ret = rc;
1366
                #ifdef WOLFMQTT_DEBUG_CLIENT
1367
                    PRINTF("PendResp Done %p", pendResp);
1368
                #endif
1369
                    pendResp = NULL;
1370
                    wm_SemUnlock(&client->lockClient);
1371
                }
1372
            }
1373
        #endif /* WOLFMQTT_MULTITHREAD */
1374
1375
            /* Determine if we are sending ACK or done */
1376
6.85k
            if (MqttIsPubRespPacket(resp.packet_type)) {
1377
                /* if we get here, then we are sending an ACK */
1378
1.85k
                mms_stat->read = MQTT_MSG_ACK;
1379
1.85k
                mms_stat->ack = MQTT_MSG_WAIT;
1380
1381
                /* setup ACK in shared context */
1382
1.85k
                XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp));
1383
1.85k
            #ifdef WOLFMQTT_V5
1384
1.85k
                client->packetAck.protocol_level = client->protocol_level;
1385
1.85k
            #endif
1386
1.85k
            }
1387
1388
            /* done reading */
1389
6.85k
            MqttReadStop(client, mms_stat);
1390
6.85k
            break;
1391
8.49k
        }
1392
1393
122
        case MQTT_MSG_ACK:
1394
            /* go to write section below */
1395
122
            break;
1396
1397
0
        case MQTT_MSG_AUTH:
1398
0
        default:
1399
0
        {
1400
        #ifdef WOLFMQTT_DEBUG_CLIENT
1401
            PRINTF("MqttClient_WaitType: Invalid read state %d!",
1402
                mms_stat->read);
1403
        #endif
1404
0
            rc = MQTT_CODE_ERROR_STAT;
1405
0
            break;
1406
0
        }
1407
11.4k
    } /* switch (mms_stat->read) */
1408
1409
11.4k
    switch (mms_stat->ack)
1410
11.4k
    {
1411
9.63k
        case MQTT_MSG_BEGIN:
1412
            /* wait for read to set ack */
1413
9.63k
            break;
1414
1415
1.85k
        case MQTT_MSG_WAIT:
1416
1.85k
        {
1417
            /* Flag write active / lock mutex */
1418
1.85k
            if ((rc = MqttWriteStart(client, mms_stat)) != 0) {
1419
0
                break;
1420
0
            }
1421
1.85k
            mms_stat->ack = MQTT_MSG_ACK;
1422
1.85k
        }
1423
1.85k
        FALL_THROUGH;
1424
1425
1.85k
        case MQTT_MSG_ACK:
1426
1.85k
        {
1427
            /* send ack */
1428
1.85k
            rc = MqttEncode_PublishResp(client->tx_buf, client->tx_buf_len,
1429
1.85k
                client->packetAck.packet_type, &client->packetAck);
1430
        #ifdef WOLFMQTT_DEBUG_CLIENT
1431
            PRINTF("MqttEncode_PublishResp: Len %d, Type %s (%d), ID %d",
1432
                rc, MqttPacket_TypeDesc(client->packetAck.packet_type),
1433
                    client->packetAck.packet_type, client->packetAck.packet_id);
1434
        #endif
1435
1.85k
            if (rc < 0) {
1436
0
                MqttWriteStop(client, mms_stat);
1437
0
                break;
1438
0
            }
1439
1440
1.85k
            client->write.len = rc;
1441
            /* Note: static analyzer complains about set, but not used here.
1442
             * Keeping it to ensure no future issues with rc > 0 */
1443
1.85k
            rc = MQTT_CODE_SUCCESS;
1444
1.85k
            (void)rc; /* inhibit clang-analyzer-deadcode.DeadStores */
1445
1446
1.85k
            mms_stat->ack = MQTT_MSG_HEADER;
1447
1.85k
        }
1448
1.85k
        FALL_THROUGH;
1449
1450
1.85k
        case MQTT_MSG_HEADER:
1451
1.85k
        {
1452
1.85k
            int xfer = client->write.len;
1453
1454
            /* Send publish response packet */
1455
1.85k
            rc = MqttPacket_Write(client, client->tx_buf, xfer);
1456
        #ifdef WOLFMQTT_NONBLOCK
1457
            if (rc == MQTT_CODE_CONTINUE) {
1458
                /* keep send mutex locked and return to caller */
1459
                /* must keep send locked */
1460
                return rc;
1461
            }
1462
        #endif
1463
1.85k
            MqttWriteStop(client, mms_stat);
1464
1.85k
            if (rc == xfer) {
1465
1.01k
                rc = MQTT_CODE_SUCCESS; /* success */
1466
1.01k
            }
1467
1468
1.85k
            mms_stat->ack = MQTT_MSG_BEGIN; /* reset write state */
1469
1.85k
            break;
1470
1.85k
        }
1471
1472
0
        case MQTT_MSG_AUTH:
1473
0
        case MQTT_MSG_PAYLOAD:
1474
0
        case MQTT_MSG_PAYLOAD2:
1475
0
        default:
1476
        #ifdef WOLFMQTT_DEBUG_CLIENT
1477
            PRINTF("MqttClient_WaitType: Invalid ack state %d!",
1478
                mms_stat->ack);
1479
        #endif
1480
0
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_STAT);
1481
0
            break;
1482
11.4k
    } /* switch (mms_stat->ack) */
1483
1484
    /* no data read or ack done, then reset state */
1485
11.4k
    if (mms_stat->read == MQTT_MSG_WAIT) {
1486
2.36k
        mms_stat->read = MQTT_MSG_BEGIN;
1487
2.36k
    }
1488
1489
#ifdef WOLFMQTT_NONBLOCK
1490
    /* if nonblocking and some data has been read, do not release read lock */
1491
    if (rc == MQTT_CODE_CONTINUE && mms_stat->read > MQTT_MSG_WAIT) {
1492
        return rc;
1493
    }
1494
#endif
1495
1496
11.4k
    MqttReadStop(client, mms_stat);
1497
1498
#ifdef WOLFMQTT_NONBLOCK
1499
    #ifdef WOLFMQTT_DEBUG_CLIENT
1500
    #ifdef WOLFMQTT_MULTITHREAD
1501
    if (wm_SemLock(&client->lockClient) == 0)
1502
    #endif
1503
    {
1504
        client->lastRc = rc;
1505
    #ifdef WOLFMQTT_MULTITHREAD
1506
        wm_SemUnlock(&client->lockClient);
1507
    #endif
1508
    }
1509
    #endif /* WOLFMQTT_DEBUG_CLIENT */
1510
    if (rc == MQTT_CODE_CONTINUE) {
1511
        return rc;
1512
    }
1513
#endif
1514
1515
11.4k
    if (rc < 0) {
1516
    #ifdef WOLFMQTT_DEBUG_CLIENT
1517
        if (rc != MQTT_CODE_CONTINUE) {
1518
            PRINTF("MqttClient_WaitType: Failure: %s (%d)",
1519
                MqttClient_ReturnCodeToString(rc), rc);
1520
        }
1521
    #endif
1522
4.49k
        return rc;
1523
4.49k
    }
1524
1525
6.99k
    if (!waitMatchFound) {
1526
        /* if we get here, then the we are still waiting for a packet */
1527
6.06k
        mms_stat->read = MQTT_MSG_BEGIN;
1528
    #ifdef WOLFMQTT_NONBLOCK
1529
        /* for non-blocking return with code continue instead of waiting again
1530
         * if called with packet type and id of 'any' */
1531
        if (wait_type == MQTT_PACKET_TYPE_ANY && wait_packet_id == 0) {
1532
            return MQTT_CODE_CONTINUE;
1533
        }
1534
    #endif
1535
6.06k
        MQTT_TRACE_MSG("Wait Again");
1536
6.06k
        goto wait_again;
1537
6.06k
    }
1538
#ifdef WOLFMQTT_DEBUG_CLIENT
1539
    if (rc != MQTT_CODE_CONTINUE) {
1540
        PRINTF("MqttClient_WaitType: rc %d, state %d-%d-%d",
1541
            rc, mms_stat->read, mms_stat->write, mms_stat->ack);
1542
    }
1543
#endif
1544
1545
1546
927
    return rc;
1547
6.99k
}
1548
1549
1550
/* Public Functions */
1551
int MqttClient_Init(MqttClient *client, MqttNet* net,
1552
    MqttMsgCb msg_cb,
1553
    byte* tx_buf, int tx_buf_len,
1554
    byte* rx_buf, int rx_buf_len,
1555
    int cmd_timeout_ms)
1556
0
{
1557
0
    int rc = MQTT_CODE_SUCCESS;
1558
1559
    /* Check arguments */
1560
0
    if (client == NULL ||
1561
0
        tx_buf == NULL || tx_buf_len <= 0 ||
1562
0
        rx_buf == NULL || rx_buf_len <= 0) {
1563
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
1564
0
    }
1565
1566
    /* Initialize the client structure to zero */
1567
0
    XMEMSET(client, 0, sizeof(MqttClient));
1568
1569
    /* Setup client structure */
1570
0
    client->msg_cb = msg_cb;
1571
0
    client->tx_buf = tx_buf;
1572
0
    client->tx_buf_len = tx_buf_len;
1573
0
    client->rx_buf = rx_buf;
1574
0
    client->rx_buf_len = rx_buf_len;
1575
0
    client->cmd_timeout_ms = cmd_timeout_ms;
1576
0
#ifdef WOLFMQTT_V5
1577
0
    client->max_qos = MQTT_QOS_2;
1578
0
    client->retain_avail = 1;
1579
0
    client->protocol_level = MQTT_CONNECT_PROTOCOL_LEVEL;
1580
0
    rc = MqttProps_Init();
1581
0
#endif
1582
1583
#ifdef WOLFMQTT_MULTITHREAD
1584
    if (rc == 0) {
1585
        rc = wm_SemInit(&client->lockSend);
1586
    }
1587
    if (rc == 0) {
1588
        rc = wm_SemInit(&client->lockRecv);
1589
    }
1590
    if (rc == 0) {
1591
        rc = wm_SemInit(&client->lockClient);
1592
    }
1593
    #ifdef ENABLE_MQTT_CURL
1594
    if (rc == 0) {
1595
        rc = wm_SemInit(&client->lockCURL);
1596
    }
1597
    #endif
1598
#endif
1599
1600
0
    if (rc == 0) {
1601
        /* Init socket */
1602
0
        rc = MqttSocket_Init(client, net);
1603
0
    }
1604
1605
0
    if (rc != 0) {
1606
        /* Cleanup if init failed */
1607
0
        MqttClient_DeInit(client);
1608
0
    }
1609
1610
0
    return rc;
1611
0
}
1612
1613
void MqttClient_DeInit(MqttClient *client)
1614
0
{
1615
0
    if (client != NULL) {
1616
#ifdef WOLFMQTT_MULTITHREAD
1617
        (void)wm_SemFree(&client->lockSend);
1618
        (void)wm_SemFree(&client->lockRecv);
1619
        (void)wm_SemFree(&client->lockClient);
1620
    #ifdef ENABLE_MQTT_CURL
1621
        (void)wm_SemFree(&client->lockCURL);
1622
    #endif
1623
#endif
1624
0
    }
1625
0
#ifdef WOLFMQTT_V5
1626
0
    (void)MqttProps_ShutDown();
1627
0
#endif
1628
0
}
1629
1630
#ifdef WOLFMQTT_DISCONNECT_CB
1631
int MqttClient_SetDisconnectCallback(MqttClient *client,
1632
        MqttDisconnectCb discCb, void* ctx)
1633
0
{
1634
0
    if (client == NULL)
1635
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
1636
1637
0
    client->disconnect_cb = discCb;
1638
0
    client->disconnect_ctx = ctx;
1639
1640
0
    return MQTT_CODE_SUCCESS;
1641
0
}
1642
#endif
1643
1644
#ifdef WOLFMQTT_PROPERTY_CB
1645
int MqttClient_SetPropertyCallback(MqttClient *client, MqttPropertyCb propCb,
1646
    void* ctx)
1647
0
{
1648
0
    if (client == NULL)
1649
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
1650
1651
0
    client->property_cb = propCb;
1652
0
    client->property_ctx = ctx;
1653
1654
0
    return MQTT_CODE_SUCCESS;
1655
0
}
1656
#endif
1657
1658
int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect)
1659
2.69k
{
1660
2.69k
    int rc;
1661
1662
    /* Validate required arguments */
1663
2.69k
    if (client == NULL || mc_connect == NULL) {
1664
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
1665
0
    }
1666
1667
2.69k
    if (mc_connect->stat.write == MQTT_MSG_BEGIN) {
1668
        /* Flag write active / lock mutex */
1669
2.69k
        if ((rc = MqttWriteStart(client, &mc_connect->stat)) != 0) {
1670
0
            return rc;
1671
0
        }
1672
1673
2.69k
    #ifdef WOLFMQTT_V5
1674
        /* Use specified protocol version if set */
1675
2.69k
        mc_connect->protocol_level = client->protocol_level;
1676
2.69k
    #endif
1677
1678
        /* Encode the connect packet */
1679
2.69k
        rc = MqttEncode_Connect(client->tx_buf, client->tx_buf_len, mc_connect);
1680
    #ifdef WOLFMQTT_DEBUG_CLIENT
1681
        PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d",
1682
            rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_CONNECT),
1683
            MQTT_PACKET_TYPE_CONNECT, 0, 0);
1684
    #endif
1685
2.69k
        if (rc <= 0) {
1686
35
            MqttWriteStop(client, &mc_connect->stat);
1687
35
            return rc;
1688
35
        }
1689
2.65k
        client->write.len = rc;
1690
1691
    #ifdef WOLFMQTT_MULTITHREAD
1692
        rc = wm_SemLock(&client->lockClient);
1693
        if (rc == 0) {
1694
            /* inform other threads of expected response */
1695
            rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_CONNECT_ACK,
1696
                    0, &mc_connect->pendResp, &mc_connect->ack);
1697
            wm_SemUnlock(&client->lockClient);
1698
        }
1699
        if (rc != 0) {
1700
            MqttWriteStop(client, &mc_connect->stat);
1701
            return rc; /* Error locking client */
1702
        }
1703
    #endif
1704
1705
2.65k
        mc_connect->stat.write = MQTT_MSG_HEADER;
1706
2.65k
    }
1707
2.65k
    if (mc_connect->stat.write == MQTT_MSG_HEADER) {
1708
2.65k
        int xfer = client->write.len;
1709
1710
        /* Send connect packet */
1711
2.65k
        rc = MqttPacket_Write(client, client->tx_buf, xfer);
1712
    #ifdef WOLFMQTT_NONBLOCK
1713
        if (rc == MQTT_CODE_CONTINUE
1714
        #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
1715
            && client->write.total > 0
1716
        #endif
1717
        ) {
1718
            /* keep send locked and return early */
1719
            return rc;
1720
        }
1721
    #endif
1722
2.65k
        MqttWriteStop(client, &mc_connect->stat);
1723
2.65k
        if (rc != xfer) {
1724
1.78k
            MqttClient_CancelMessage(client, (MqttObject*)mc_connect);
1725
1.78k
            return rc;
1726
1.78k
        }
1727
1728
874
    #ifdef WOLFMQTT_V5
1729
        /* Enhanced authentication */
1730
874
        if (client->enable_eauth == 1) {
1731
0
            mc_connect->stat.write = MQTT_MSG_AUTH;
1732
0
        }
1733
874
        else
1734
874
    #endif
1735
874
        {
1736
874
            mc_connect->stat.write = MQTT_MSG_WAIT;
1737
874
        }
1738
874
    }
1739
1740
874
#ifdef WOLFMQTT_V5
1741
    /* Enhanced authentication */
1742
874
    if (mc_connect->protocol_level > MQTT_CONNECT_PROTOCOL_LEVEL_4 &&
1743
874
            mc_connect->stat.write == MQTT_MSG_AUTH)
1744
0
    {
1745
0
        MqttAuth auth, *p_auth = &auth;
1746
0
        MqttProp* prop, *conn_prop;
1747
1748
        /* Find the AUTH property in the connect structure */
1749
0
        for (conn_prop = mc_connect->props;
1750
0
             (conn_prop != NULL) && (conn_prop->type != MQTT_PROP_AUTH_METHOD);
1751
0
             conn_prop = conn_prop->next) {
1752
0
        }
1753
0
        if (conn_prop == NULL) {
1754
        #ifdef WOLFMQTT_MULTITHREAD
1755
            if (wm_SemLock(&client->lockClient) == 0) {
1756
                MqttClient_RespList_Remove(client, &mc_connect->pendResp);
1757
                wm_SemUnlock(&client->lockClient);
1758
            }
1759
        #endif
1760
            /* AUTH property was not set in connect structure */
1761
0
            return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
1762
0
        }
1763
1764
0
        XMEMSET((void*)p_auth, 0, sizeof(MqttAuth));
1765
1766
        /* Set the authentication reason */
1767
0
        p_auth->reason_code = MQTT_REASON_CONT_AUTH;
1768
1769
        /* Use the same authentication method property from connect */
1770
0
        prop = MqttProps_Add(&p_auth->props);
1771
0
        prop->type = MQTT_PROP_AUTH_METHOD;
1772
0
        prop->data_str.str = conn_prop->data_str.str;
1773
0
        prop->data_str.len = conn_prop->data_str.len;
1774
1775
        /* Send the AUTH packet */
1776
0
        rc = MqttClient_Auth(client, p_auth);
1777
0
        MqttClient_PropsFree(p_auth->props);
1778
    #ifdef WOLFMQTT_NONBLOCK
1779
        if (rc == MQTT_CODE_CONTINUE)
1780
            return rc;
1781
    #endif
1782
0
        if (rc < 0) {
1783
        #ifdef WOLFMQTT_MULTITHREAD
1784
            if (wm_SemLock(&client->lockClient) == 0) {
1785
                MqttClient_RespList_Remove(client, &mc_connect->pendResp);
1786
                wm_SemUnlock(&client->lockClient);
1787
            }
1788
        #endif
1789
0
            return rc;
1790
0
        }
1791
0
        mc_connect->stat.write = MQTT_MSG_WAIT;
1792
0
    }
1793
874
#endif /* WOLFMQTT_V5 */
1794
1795
    /* Wait for connect ack packet */
1796
874
    rc = MqttClient_WaitType(client, &mc_connect->ack,
1797
874
        MQTT_PACKET_TYPE_CONNECT_ACK, 0, client->cmd_timeout_ms);
1798
#if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD)
1799
    if (rc == MQTT_CODE_CONTINUE)
1800
        return rc;
1801
#endif
1802
1803
#ifdef WOLFMQTT_MULTITHREAD
1804
    if (wm_SemLock(&client->lockClient) == 0) {
1805
        MqttClient_RespList_Remove(client, &mc_connect->pendResp);
1806
        wm_SemUnlock(&client->lockClient);
1807
    }
1808
#endif
1809
1810
    /* reset state */
1811
874
    mc_connect->stat.write = MQTT_MSG_BEGIN;
1812
1813
874
    return rc;
1814
874
}
1815
1816
static int MqttClient_Publish_ReadPayload(MqttClient* client,
1817
    MqttPublish* publish, int timeout_ms)
1818
1.85k
{
1819
1.85k
    int rc = MQTT_CODE_SUCCESS;
1820
1.85k
    byte msg_done;
1821
1822
    /* Handle packet callback and read remaining payload */
1823
3.80k
    do {
1824
        /* Determine if message is done */
1825
3.80k
        msg_done = ((publish->buffer_pos + publish->buffer_len) >=
1826
3.80k
                    publish->total_len) ? 1 : 0;
1827
1828
3.80k
        if (publish->buffer_new) {
1829
            /* Issue callback for new message (first time only) */
1830
1.53k
            if (client->msg_cb) {
1831
                /* if using the temp publish message buffer,
1832
                   then populate message context with client context */
1833
1.53k
                if (publish->ctx == NULL && &client->msg.publish == publish) {
1834
1.53k
                    publish->ctx = client->ctx;
1835
1.53k
                }
1836
1.53k
                rc = client->msg_cb(client, publish, publish->buffer_new,
1837
1.53k
                                    msg_done);
1838
1.53k
                if (rc != MQTT_CODE_SUCCESS) {
1839
0
                    return rc;
1840
1.53k
                };
1841
1.53k
            }
1842
1843
            /* Reset topic name since valid on new message only */
1844
1.53k
            publish->topic_name = NULL;
1845
1.53k
            publish->topic_name_len = 0;
1846
1847
1.53k
            publish->buffer_new = 0;
1848
1.53k
        }
1849
1850
        /* Read payload */
1851
3.80k
        if (!msg_done) {
1852
2.27k
            int msg_len;
1853
1854
            /* add last length to position and reset len */
1855
2.27k
            publish->buffer_pos += publish->buffer_len;
1856
2.27k
            publish->buffer_len = 0;
1857
1858
            /* set state to reading payload */
1859
2.27k
            publish->stat.read = MQTT_MSG_PAYLOAD2;
1860
1861
2.27k
            msg_len = (publish->total_len - publish->buffer_pos);
1862
2.27k
            if (msg_len > client->rx_buf_len) {
1863
641
                msg_len = client->rx_buf_len;
1864
641
            }
1865
1866
            /* make sure there is something to read */
1867
2.27k
            if (msg_len > 0) {
1868
2.27k
                rc = MqttSocket_Read(client, client->rx_buf, msg_len,
1869
2.27k
                        timeout_ms);
1870
2.27k
                if (rc < 0) {
1871
297
                    break;
1872
297
                }
1873
1874
                /* Update message */
1875
1.98k
                publish->buffer = client->rx_buf;
1876
1.98k
                publish->buffer_len = rc;
1877
1.98k
                rc = MQTT_CODE_SUCCESS; /* mark success */
1878
1879
1.98k
                msg_done = ((publish->buffer_pos + publish->buffer_len) >=
1880
1.98k
                    publish->total_len) ? 1 : 0;
1881
1882
                /* Issue callback for additional publish payload */
1883
1.98k
                if (client->msg_cb) {
1884
1.98k
                    rc = client->msg_cb(client, publish, publish->buffer_new,
1885
1.98k
                                        msg_done);
1886
1.98k
                    if (rc != MQTT_CODE_SUCCESS) {
1887
0
                        return rc;
1888
1.98k
                    };
1889
1.98k
                }
1890
1.98k
            }
1891
2.27k
        }
1892
3.80k
    } while (!msg_done);
1893
1894
1.85k
    return rc;
1895
1.85k
}
1896
1897
static int MqttClient_Publish_WritePayload(MqttClient *client,
1898
    MqttPublish *publish, MqttPublishCb pubCb)
1899
223
{
1900
223
    int rc = MQTT_CODE_SUCCESS;
1901
1902
223
    if (client == NULL || publish == NULL)
1903
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
1904
1905
223
    if (pubCb) { /* use publish callback to get data */
1906
0
        word32 tmp_len = publish->buffer_len;
1907
1908
0
        do {
1909
            /* use the client->write.len to handle non-blocking re-entry when
1910
             * new publish callback data is needed */
1911
0
            if (client->write.len == 0) {
1912
                /* Use the callback to get payload */
1913
0
                if ((client->write.len = pubCb(publish)) < 0) {
1914
                #ifdef WOLFMQTT_DEBUG_CLIENT
1915
                    PRINTF("Publish callback error %d", client->write.len);
1916
                #endif
1917
0
                    return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK);
1918
0
                }
1919
0
            }
1920
1921
0
            if ((word32)client->write.len < publish->buffer_len) {
1922
                /* Last read */
1923
0
                tmp_len = (word32)client->write.len;
1924
0
            }
1925
1926
            /* Send payload */
1927
0
            do {
1928
0
                if (client->write.len > client->tx_buf_len) {
1929
0
                    client->write.len = client->tx_buf_len;
1930
0
                }
1931
0
                publish->intBuf_len = client->write.len;
1932
0
                XMEMCPY(client->tx_buf, &publish->buffer[publish->intBuf_pos],
1933
0
                    client->write.len);
1934
1935
0
                rc = MqttPacket_Write(client, client->tx_buf,
1936
0
                        client->write.len);
1937
0
                if (rc < 0) {
1938
0
                    return rc;
1939
0
                }
1940
1941
0
                publish->intBuf_pos += publish->intBuf_len;
1942
0
                publish->intBuf_len = 0;
1943
1944
0
            } while (publish->intBuf_pos < tmp_len);
1945
1946
0
            publish->buffer_pos += publish->intBuf_pos;
1947
0
            publish->intBuf_pos = 0;
1948
0
            client->write.len = 0; /* reset current write len */
1949
1950
0
        } while (publish->buffer_pos < publish->total_len);
1951
0
    }
1952
223
    else if (publish->buffer_pos < publish->total_len) {
1953
188
        if (publish->buffer_pos > 0) {
1954
178
            client->write.len = (publish->total_len - publish->buffer_pos);
1955
178
            if (client->write.len > client->tx_buf_len) {
1956
113
                client->write.len = client->tx_buf_len;
1957
113
            }
1958
1959
178
            XMEMCPY(client->tx_buf, &publish->buffer[publish->buffer_pos],
1960
178
                client->write.len);
1961
1962
178
        #ifndef WOLFMQTT_NONBLOCK
1963
178
            publish->intBuf_pos += client->write.len;
1964
178
        #endif
1965
178
        }
1966
1967
        /* Send packet and payload */
1968
    #ifdef WOLFMQTT_NONBLOCK
1969
            rc = MqttPacket_Write(client, client->tx_buf, client->write.len);
1970
            if (rc < 0) {
1971
                return rc;
1972
            }
1973
1974
            /* ONLY if send was successful, update buffer position.
1975
             * Otherwise, MqttPacket_Write() will resume where it left off. */
1976
            publish->buffer_pos += client->write.len;
1977
1978
            /* Check if we are done sending publish message */
1979
            if (publish->buffer_pos < publish->buffer_len) {
1980
            #ifdef WOLFMQTT_DEBUG_CLIENT
1981
                PRINTF("Publish Write: not done (%d remain)",
1982
                    publish->buffer_len - publish->buffer_pos);
1983
            #endif
1984
                return MQTT_CODE_PUB_CONTINUE;
1985
            }
1986
        #ifdef WOLFMQTT_DEBUG_CLIENT
1987
            else {
1988
                PRINTF("Publish Write: done");
1989
            }
1990
        #endif
1991
    #else
1992
650
        do {
1993
650
            rc = MqttPacket_Write(client, client->tx_buf, client->write.len);
1994
650
            if (rc < 0) {
1995
131
                return rc;
1996
131
            }
1997
1998
519
            publish->intBuf_pos += publish->intBuf_len;
1999
519
            publish->intBuf_len = 0;
2000
2001
            /* Check if we are done sending publish message */
2002
519
            if (publish->intBuf_pos >= publish->buffer_len) {
2003
57
                rc = MQTT_CODE_SUCCESS;
2004
57
                break;
2005
57
            }
2006
2007
            /* Build packet payload to send */
2008
462
            client->write.len = (publish->buffer_len - publish->intBuf_pos);
2009
462
            if (client->write.len > client->tx_buf_len) {
2010
397
                client->write.len = client->tx_buf_len;
2011
397
            }
2012
462
            publish->intBuf_len = client->write.len;
2013
462
            XMEMCPY(client->tx_buf, &publish->buffer[publish->intBuf_pos],
2014
462
                client->write.len);
2015
462
        } while (publish->intBuf_pos < publish->buffer_len);
2016
57
    #endif
2017
2018
57
        if (rc >= 0) {
2019
            /* If transferring more chunks */
2020
57
            publish->buffer_pos += publish->intBuf_pos;
2021
57
            if (publish->buffer_pos < publish->total_len) {
2022
            #ifdef WOLFMQTT_DEBUG_CLIENT
2023
                PRINTF("Publish Write: chunk (%d remain)",
2024
                    publish->total_len - publish->buffer_pos);
2025
            #endif
2026
2027
                /* Build next payload to send */
2028
0
                client->write.len = (publish->total_len - publish->buffer_pos);
2029
0
                if (client->write.len > client->tx_buf_len) {
2030
0
                    client->write.len = client->tx_buf_len;
2031
0
                }
2032
0
                rc = MQTT_CODE_PUB_CONTINUE;
2033
0
            }
2034
        #ifdef WOLFMQTT_DEBUG_CLIENT
2035
            else {
2036
                PRINTF("Publish Write: chunked done");
2037
            }
2038
        #endif
2039
57
        }
2040
57
    }
2041
92
    return rc;
2042
223
}
2043
2044
static int MqttPublishMsg(MqttClient *client, MqttPublish *publish,
2045
                          MqttPublishCb pubCb, int writeOnly)
2046
721
{
2047
721
    int rc = MQTT_CODE_SUCCESS;
2048
721
    MqttPacketType resp_type;
2049
2050
    /* Validate required arguments */
2051
721
    if (client == NULL || publish == NULL) {
2052
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2053
0
    }
2054
2055
721
#ifdef WOLFMQTT_V5
2056
    /* Use specified protocol version if set */
2057
721
    publish->protocol_level = client->protocol_level;
2058
2059
    /* Validate publish request against server properties */
2060
721
    if ((publish->qos > client->max_qos) ||
2061
721
        ((publish->retain == 1) && (client->retain_avail == 0)))
2062
154
    {
2063
154
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SERVER_PROP);
2064
154
    }
2065
567
#endif
2066
2067
567
    switch (publish->stat.write)
2068
567
    {
2069
567
        case MQTT_MSG_BEGIN:
2070
567
        {
2071
            /* Flag write active / lock mutex */
2072
567
            if ((rc = MqttWriteStart(client, &publish->stat)) != 0) {
2073
0
                return rc;
2074
0
            }
2075
2076
            /* Encode the publish packet */
2077
567
            rc = MqttEncode_Publish(client->tx_buf, client->tx_buf_len,
2078
567
                    publish, pubCb ? 1 : 0);
2079
        #ifdef WOLFMQTT_DEBUG_CLIENT
2080
            PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d,"
2081
                    " QoS %d",
2082
                rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_PUBLISH),
2083
                MQTT_PACKET_TYPE_PUBLISH, publish->packet_id,
2084
                publish->qos);
2085
        #endif
2086
567
            if (rc <= 0) {
2087
32
                MqttWriteStop(client, &publish->stat);
2088
32
                return rc;
2089
32
            }
2090
535
            client->write.len = rc;
2091
2092
        #ifdef WOLFMQTT_MULTITHREAD
2093
            if (publish->qos > MQTT_QOS_0) {
2094
                resp_type = (publish->qos == MQTT_QOS_1) ?
2095
                        MQTT_PACKET_TYPE_PUBLISH_ACK :
2096
                        MQTT_PACKET_TYPE_PUBLISH_COMP;
2097
2098
                rc = wm_SemLock(&client->lockClient);
2099
                if (rc == 0) {
2100
                    /* inform other threads of expected response */
2101
                    rc = MqttClient_RespList_Add(client, resp_type,
2102
                        publish->packet_id, &publish->pendResp, &publish->resp);
2103
                    wm_SemUnlock(&client->lockClient);
2104
                }
2105
                if (rc != 0) {
2106
                    MqttWriteStop(client, &publish->stat);
2107
                    return rc; /* Error locking client */
2108
                }
2109
            }
2110
        #endif
2111
2112
535
            publish->stat.write = MQTT_MSG_HEADER;
2113
535
        }
2114
535
        FALL_THROUGH;
2115
2116
535
        case MQTT_MSG_HEADER:
2117
535
        {
2118
535
            int xfer = client->write.len;
2119
2120
            /* Send publish packet */
2121
535
            rc = MqttPacket_Write(client, client->tx_buf, xfer);
2122
        #ifdef WOLFMQTT_NONBLOCK
2123
            if (rc == MQTT_CODE_CONTINUE
2124
            #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
2125
                && client->write.total > 0
2126
            #endif
2127
            ) {
2128
                /* keep send locked and return early */
2129
                return rc;
2130
            }
2131
        #endif
2132
535
            client->write.len = 0; /* reset len, so publish chunk resets */
2133
2134
            /* if failure or no data was written yet */
2135
535
            if (rc != xfer) {
2136
312
                MqttWriteStop(client, &publish->stat);
2137
312
                MqttClient_CancelMessage(client, (MqttObject*)publish);
2138
312
                return rc;
2139
312
            }
2140
2141
            /* advance state */
2142
223
            publish->stat.write = MQTT_MSG_PAYLOAD;
2143
223
        }
2144
223
        FALL_THROUGH;
2145
2146
223
        case MQTT_MSG_PAYLOAD:
2147
223
        {
2148
223
            rc = MqttClient_Publish_WritePayload(client, publish, pubCb);
2149
        #ifdef WOLFMQTT_NONBLOCK
2150
            if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE)
2151
                return rc;
2152
        #endif
2153
223
            MqttWriteStop(client, &publish->stat);
2154
223
            if (rc < 0) {
2155
131
                MqttClient_CancelMessage(client, (MqttObject*)publish);
2156
131
                break;
2157
131
            }
2158
2159
            /* if not expecting a reply then we are done */
2160
92
            if (publish->qos == MQTT_QOS_0) {
2161
92
                break;
2162
92
            }
2163
0
            publish->stat.write = MQTT_MSG_WAIT;
2164
0
        }
2165
0
        FALL_THROUGH;
2166
2167
0
        case MQTT_MSG_WAIT:
2168
0
        {
2169
            /* Handle QoS */
2170
0
            if (publish->qos > MQTT_QOS_0) {
2171
                /* Determine packet type to wait for */
2172
0
                resp_type = (publish->qos == MQTT_QOS_1) ?
2173
0
                    MQTT_PACKET_TYPE_PUBLISH_ACK :
2174
0
                    MQTT_PACKET_TYPE_PUBLISH_COMP;
2175
2176
            #ifdef WOLFMQTT_MULTITHREAD
2177
                if (writeOnly) {
2178
                    /* another thread will handle response */
2179
                    /* check if response already received from other thread */
2180
                    rc = MqttClient_CheckPendResp(client, resp_type,
2181
                        publish->packet_id);
2182
                #ifndef WOLFMQTT_NONBLOCK
2183
                    if (rc == MQTT_CODE_CONTINUE) {
2184
                        /* mark success, let other thread handle response */
2185
                        rc = MQTT_CODE_SUCCESS;
2186
                    }
2187
                #endif
2188
                }
2189
                else
2190
            #endif
2191
0
                {
2192
0
                    (void)writeOnly; /* not used */
2193
2194
                    /* Wait for publish response packet */
2195
0
                    rc = MqttClient_WaitType(client, &publish->resp, resp_type,
2196
0
                        publish->packet_id, client->cmd_timeout_ms);
2197
0
                }
2198
2199
            #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD)
2200
                if (rc == MQTT_CODE_CONTINUE)
2201
                    break;
2202
            #endif
2203
            #ifdef WOLFMQTT_MULTITHREAD
2204
                if (wm_SemLock(&client->lockClient) == 0) {
2205
                    MqttClient_RespList_Remove(client, &publish->pendResp);
2206
                    wm_SemUnlock(&client->lockClient);
2207
                }
2208
            #endif
2209
0
            }
2210
0
            break;
2211
0
        }
2212
2213
0
        case MQTT_MSG_ACK:
2214
0
        case MQTT_MSG_AUTH:
2215
0
        case MQTT_MSG_PAYLOAD2:
2216
        #ifdef WOLFMQTT_DEBUG_CLIENT
2217
            PRINTF("MqttClient_Publish: Invalid state %d!",
2218
                publish->stat.write);
2219
        #endif
2220
0
            rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_STAT);
2221
0
            break;
2222
567
    } /* switch (publish->stat) */
2223
2224
    /* reset state */
2225
223
    if ((rc != MQTT_CODE_PUB_CONTINUE)
2226
#ifdef WOLFMQTT_NONBLOCK
2227
         && (rc != MQTT_CODE_CONTINUE)
2228
#endif
2229
223
        )
2230
223
    {
2231
223
        publish->stat.write = MQTT_MSG_BEGIN;
2232
223
    }
2233
223
    if (rc > 0) {
2234
0
        rc = MQTT_CODE_SUCCESS;
2235
0
    }
2236
2237
223
    return rc;
2238
567
}
2239
2240
int MqttClient_Publish(MqttClient *client, MqttPublish *publish)
2241
721
{
2242
721
    return MqttPublishMsg(client, publish, NULL, 0);
2243
721
}
2244
2245
int MqttClient_Publish_ex(MqttClient *client, MqttPublish *publish,
2246
    MqttPublishCb pubCb)
2247
0
{
2248
0
    return MqttPublishMsg(client, publish, pubCb, 0);
2249
0
}
2250
2251
#ifdef WOLFMQTT_MULTITHREAD
2252
int MqttClient_Publish_WriteOnly(MqttClient *client, MqttPublish *publish,
2253
    MqttPublishCb pubCb)
2254
{
2255
    return MqttPublishMsg(client, publish, pubCb, 1);
2256
}
2257
#endif
2258
2259
2260
int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe)
2261
825
{
2262
825
    int rc, i;
2263
825
    MqttTopic* topic;
2264
2265
    /* Validate required arguments */
2266
825
    if (client == NULL || subscribe == NULL) {
2267
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2268
0
    }
2269
2270
825
#ifdef WOLFMQTT_V5
2271
    /* Use specified protocol version if set */
2272
825
    subscribe->protocol_level = client->protocol_level;
2273
825
#endif
2274
2275
825
    if (subscribe->stat.write == MQTT_MSG_BEGIN) {
2276
        /* Flag write active / lock mutex */
2277
825
        if ((rc = MqttWriteStart(client, &subscribe->stat)) != 0) {
2278
0
            return rc;
2279
0
        }
2280
2281
        /* Encode the subscribe packet */
2282
825
        rc = MqttEncode_Subscribe(client->tx_buf, client->tx_buf_len,
2283
825
                subscribe);
2284
    #ifdef WOLFMQTT_DEBUG_CLIENT
2285
        PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d",
2286
            rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_SUBSCRIBE),
2287
            MQTT_PACKET_TYPE_SUBSCRIBE, subscribe->packet_id);
2288
    #endif
2289
825
        if (rc <= 0) {
2290
49
            MqttWriteStop(client, &subscribe->stat);
2291
49
            return rc;
2292
49
        }
2293
776
        client->write.len = rc;
2294
2295
    #ifdef WOLFMQTT_MULTITHREAD
2296
        rc = wm_SemLock(&client->lockClient);
2297
        if (rc == 0) {
2298
            /* inform other threads of expected response */
2299
            rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_SUBSCRIBE_ACK,
2300
                subscribe->packet_id, &subscribe->pendResp, &subscribe->ack);
2301
            wm_SemUnlock(&client->lockClient);
2302
        }
2303
        if (rc != 0) {
2304
            MqttWriteStop(client, &subscribe->stat);
2305
            return rc; /* Error locking client */
2306
        }
2307
    #endif
2308
2309
776
        subscribe->stat.write = MQTT_MSG_HEADER;
2310
776
    }
2311
776
    if (subscribe->stat.write == MQTT_MSG_HEADER) {
2312
776
        int xfer = client->write.len;
2313
2314
        /* Send subscribe packet */
2315
776
        rc = MqttPacket_Write(client, client->tx_buf, xfer);
2316
    #ifdef WOLFMQTT_NONBLOCK
2317
        if (rc == MQTT_CODE_CONTINUE
2318
        #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
2319
            && client->write.total > 0
2320
        #endif
2321
        ) {
2322
            /* keep send locked and return early */
2323
            return rc;
2324
        }
2325
    #endif
2326
776
        MqttWriteStop(client, &subscribe->stat);
2327
776
        if (rc != xfer) {
2328
561
            MqttClient_CancelMessage(client, (MqttObject*)subscribe);
2329
561
            return rc;
2330
561
        }
2331
2332
215
        subscribe->stat.write = MQTT_MSG_WAIT;
2333
215
    }
2334
2335
    /* Wait for subscribe ack packet */
2336
215
    rc = MqttClient_WaitType(client, &subscribe->ack,
2337
215
        MQTT_PACKET_TYPE_SUBSCRIBE_ACK, subscribe->packet_id,
2338
215
        client->cmd_timeout_ms);
2339
#if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD)
2340
    if (rc == MQTT_CODE_CONTINUE)
2341
        return rc;
2342
#endif
2343
2344
#ifdef WOLFMQTT_MULTITHREAD
2345
    if (wm_SemLock(&client->lockClient) == 0) {
2346
        MqttClient_RespList_Remove(client, &subscribe->pendResp);
2347
        wm_SemUnlock(&client->lockClient);
2348
    }
2349
#endif
2350
2351
    /* Populate return codes */
2352
215
    if (rc == MQTT_CODE_SUCCESS) {
2353
279
        for (i = 0; i < subscribe->topic_count && i < MAX_MQTT_TOPICS; i++) {
2354
220
            topic = &subscribe->topics[i];
2355
220
            topic->return_code = subscribe->ack.return_codes[i];
2356
220
        }
2357
59
    }
2358
2359
    /* reset state */
2360
215
    subscribe->stat.write = MQTT_MSG_BEGIN;
2361
2362
215
    return rc;
2363
776
}
2364
2365
int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe)
2366
402
{
2367
402
    int rc;
2368
2369
    /* Validate required arguments */
2370
402
    if (client == NULL || unsubscribe == NULL) {
2371
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2372
0
    }
2373
2374
402
#ifdef WOLFMQTT_V5
2375
    /* Use specified protocol version if set */
2376
402
    unsubscribe->protocol_level = client->protocol_level;
2377
402
#endif
2378
2379
402
    if (unsubscribe->stat.write == MQTT_MSG_BEGIN) {
2380
        /* Flag write active / lock mutex */
2381
402
        if ((rc = MqttWriteStart(client, &unsubscribe->stat)) != 0) {
2382
0
            return rc;
2383
0
        }
2384
2385
        /* Encode the subscribe packet */
2386
402
        rc = MqttEncode_Unsubscribe(client->tx_buf, client->tx_buf_len,
2387
402
            unsubscribe);
2388
    #ifdef WOLFMQTT_DEBUG_CLIENT
2389
        PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d",
2390
            rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_UNSUBSCRIBE),
2391
            MQTT_PACKET_TYPE_UNSUBSCRIBE, unsubscribe->packet_id, 0);
2392
    #endif
2393
402
        if (rc <= 0) {
2394
68
            MqttWriteStop(client, &unsubscribe->stat);
2395
68
            return rc;
2396
68
        }
2397
334
        client->write.len = rc;
2398
2399
    #ifdef WOLFMQTT_MULTITHREAD
2400
        rc = wm_SemLock(&client->lockClient);
2401
        if (rc == 0) {
2402
            /* inform other threads of expected response */
2403
            rc = MqttClient_RespList_Add(client,
2404
                MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK, unsubscribe->packet_id,
2405
                &unsubscribe->pendResp, &unsubscribe->ack);
2406
            wm_SemUnlock(&client->lockClient);
2407
        }
2408
        if (rc != 0) {
2409
            MqttWriteStop(client, &unsubscribe->stat);
2410
            return rc;
2411
        }
2412
    #endif
2413
2414
334
        unsubscribe->stat.write = MQTT_MSG_HEADER;
2415
334
    }
2416
334
    if (unsubscribe->stat.write == MQTT_MSG_HEADER) {
2417
334
        int xfer = client->write.len;
2418
2419
        /* Send unsubscribe packet */
2420
334
        rc = MqttPacket_Write(client, client->tx_buf, xfer);
2421
    #ifdef WOLFMQTT_NONBLOCK
2422
        if (rc == MQTT_CODE_CONTINUE
2423
        #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
2424
            && client->write.total > 0
2425
        #endif
2426
        ) {
2427
            /* keep send locked and return early */
2428
            return rc;
2429
        }
2430
    #endif
2431
334
        MqttWriteStop(client, &unsubscribe->stat);
2432
334
        if (rc != xfer) {
2433
256
            MqttClient_CancelMessage(client, (MqttObject*)unsubscribe);
2434
256
            return rc;
2435
256
        }
2436
2437
78
        unsubscribe->stat.write = MQTT_MSG_WAIT;
2438
78
    }
2439
2440
    /* Wait for unsubscribe ack packet */
2441
78
    rc = MqttClient_WaitType(client, &unsubscribe->ack,
2442
78
        MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK, unsubscribe->packet_id,
2443
78
        client->cmd_timeout_ms);
2444
#if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD)
2445
    if (rc == MQTT_CODE_CONTINUE)
2446
        return rc;
2447
#endif
2448
2449
#ifdef WOLFMQTT_MULTITHREAD
2450
    if (wm_SemLock(&client->lockClient) == 0) {
2451
        MqttClient_RespList_Remove(client, &unsubscribe->pendResp);
2452
        wm_SemUnlock(&client->lockClient);
2453
    }
2454
#endif
2455
2456
78
#ifdef WOLFMQTT_V5
2457
78
    if (unsubscribe->ack.props != NULL) {
2458
        /* Release the allocated properties */
2459
0
        MqttClient_PropsFree(unsubscribe->ack.props);
2460
0
    }
2461
78
#endif
2462
2463
    /* reset state */
2464
78
    unsubscribe->stat.write = MQTT_MSG_BEGIN;
2465
2466
78
    return rc;
2467
334
}
2468
2469
int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping)
2470
973
{
2471
973
    int rc;
2472
2473
    /* Validate required arguments */
2474
973
    if (client == NULL || ping == NULL) {
2475
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2476
0
    }
2477
2478
973
    if (ping->stat.write == MQTT_MSG_BEGIN) {
2479
        /* Flag write active / lock mutex */
2480
973
        if ((rc = MqttWriteStart(client, &ping->stat)) != 0) {
2481
0
            return rc;
2482
0
        }
2483
2484
        /* Encode the subscribe packet */
2485
973
        rc = MqttEncode_Ping(client->tx_buf, client->tx_buf_len, ping);
2486
    #ifdef WOLFMQTT_DEBUG_CLIENT
2487
        PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d",
2488
            rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_PING_REQ),
2489
            MQTT_PACKET_TYPE_PING_REQ, 0, 0);
2490
    #endif
2491
973
        if (rc <= 0) {
2492
0
            MqttWriteStop(client, &ping->stat);
2493
0
            return rc;
2494
0
        }
2495
973
        client->write.len = rc;
2496
2497
    #ifdef WOLFMQTT_MULTITHREAD
2498
        rc = wm_SemLock(&client->lockClient);
2499
        if (rc == 0) {
2500
            /* inform other threads of expected response */
2501
            rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_PING_RESP, 0,
2502
                &ping->pendResp, ping);
2503
            wm_SemUnlock(&client->lockClient);
2504
        }
2505
        if (rc != 0) {
2506
            MqttWriteStop(client, &ping->stat);
2507
            return rc; /* Error locking client */
2508
        }
2509
    #endif
2510
2511
973
        ping->stat.write = MQTT_MSG_HEADER;
2512
973
    }
2513
973
    if (ping->stat.write == MQTT_MSG_HEADER) {
2514
973
        int xfer = client->write.len;
2515
2516
        /* Send ping req packet */
2517
973
        rc = MqttPacket_Write(client, client->tx_buf, xfer);
2518
    #ifdef WOLFMQTT_NONBLOCK
2519
        if (rc == MQTT_CODE_CONTINUE
2520
        #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
2521
            && client->write.total > 0
2522
        #endif
2523
        ) {
2524
            /* keep send locked and return early */
2525
            return rc;
2526
        }
2527
    #endif
2528
973
        MqttWriteStop(client, &ping->stat);
2529
973
        if (rc != xfer) {
2530
174
            MqttClient_CancelMessage(client, (MqttObject*)ping);
2531
174
            return rc;
2532
174
        }
2533
2534
799
        ping->stat.write = MQTT_MSG_WAIT;
2535
799
    }
2536
2537
    /* Wait for ping resp packet */
2538
799
    rc = MqttClient_WaitType(client, ping, MQTT_PACKET_TYPE_PING_RESP, 0,
2539
799
        client->cmd_timeout_ms);
2540
#if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD)
2541
    if (rc == MQTT_CODE_CONTINUE)
2542
        return rc;
2543
#endif
2544
2545
#ifdef WOLFMQTT_MULTITHREAD
2546
    if (wm_SemLock(&client->lockClient) == 0) {
2547
        MqttClient_RespList_Remove(client, &ping->pendResp);
2548
        wm_SemUnlock(&client->lockClient);
2549
    }
2550
#endif
2551
2552
    /* reset state */
2553
799
    ping->stat.write = MQTT_MSG_BEGIN;
2554
2555
799
    return rc;
2556
973
}
2557
2558
int MqttClient_Ping(MqttClient *client)
2559
0
{
2560
0
    return MqttClient_Ping_ex(client, &client->msg.ping);
2561
0
}
2562
2563
int MqttClient_Disconnect(MqttClient *client)
2564
0
{
2565
0
    return MqttClient_Disconnect_ex(client, NULL);
2566
0
}
2567
2568
int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect)
2569
0
{
2570
0
    int rc, xfer;
2571
0
    MqttDisconnect *disconnect = p_disconnect, lcl_disconnect;
2572
2573
    /* Validate required arguments */
2574
0
    if (client == NULL) {
2575
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2576
0
    }
2577
0
    if (disconnect == NULL) {
2578
0
        disconnect = &lcl_disconnect;
2579
0
        XMEMSET(disconnect, 0, sizeof(*disconnect));
2580
0
    }
2581
2582
0
    if (disconnect->stat.write == MQTT_MSG_BEGIN) {
2583
0
    #ifdef WOLFMQTT_V5
2584
        /* Use specified protocol version if set */
2585
0
        disconnect->protocol_level = client->protocol_level;
2586
0
    #endif
2587
2588
        /* Flag write active / lock mutex */
2589
0
        if ((rc = MqttWriteStart(client, &disconnect->stat)) != 0) {
2590
0
            return rc;
2591
0
        }
2592
2593
        /* Encode the disconnect packet */
2594
0
        rc = MqttEncode_Disconnect(client->tx_buf, client->tx_buf_len,
2595
0
            disconnect);
2596
    #ifdef WOLFMQTT_DEBUG_CLIENT
2597
        PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d",
2598
            rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_DISCONNECT),
2599
            MQTT_PACKET_TYPE_DISCONNECT, 0, 0);
2600
    #endif
2601
0
        if (rc <= 0) {
2602
0
            MqttWriteStop(client, &disconnect->stat);
2603
0
            return rc;
2604
0
        }
2605
0
        client->write.len = rc;
2606
2607
0
        disconnect->stat.write = MQTT_MSG_HEADER;
2608
0
    }
2609
2610
    /* Send disconnect packet */
2611
0
    xfer = client->write.len;
2612
0
    rc = MqttPacket_Write(client, client->tx_buf, xfer);
2613
#ifdef WOLFMQTT_NONBLOCK
2614
    /* if disconnect context avail allow partial write in non-blocking mode */
2615
    if (p_disconnect != NULL && rc == MQTT_CODE_CONTINUE
2616
    #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
2617
        && client->write.total > 0
2618
    #endif
2619
    ) {
2620
        /* keep send locked and return early */
2621
        return rc;
2622
    }
2623
#endif
2624
0
    MqttWriteStop(client, &disconnect->stat);
2625
0
    if (rc == xfer) {
2626
0
        rc = MQTT_CODE_SUCCESS;
2627
0
    }
2628
2629
#if defined(WOLFMQTT_DISCONNECT_CB) && defined(WOLFMQTT_USE_CB_ON_DISCONNECT)
2630
    /* Trigger disconnect callback - for intentional disconnect
2631
     * This callback may occur on a network failure during an intentional
2632
     * disconnect if the transport/socket is not setup yet. */
2633
    if (client->disconnect_cb
2634
    #ifdef WOLFMQTT_NONBLOCK
2635
        && rc != MQTT_CODE_CONTINUE
2636
    #endif
2637
        ) {
2638
        client->disconnect_cb(client, rc, client->disconnect_ctx);
2639
    }
2640
#endif
2641
2642
    /* No response for MQTT disconnect packet */
2643
2644
    /* reset state */
2645
0
    disconnect->stat.write = MQTT_MSG_BEGIN;
2646
2647
0
    return rc;
2648
0
}
2649
2650
#ifdef WOLFMQTT_V5
2651
int MqttClient_Auth(MqttClient *client, MqttAuth* auth)
2652
0
{
2653
0
    int rc;
2654
2655
    /* Validate required arguments */
2656
0
    if (client == NULL) {
2657
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2658
0
    }
2659
2660
0
    if (auth->stat.write == MQTT_MSG_BEGIN) {
2661
        /* Flag write active / lock mutex */
2662
0
        if ((rc = MqttWriteStart(client, &auth->stat)) != 0) {
2663
0
            return rc;
2664
0
        }
2665
2666
        /* Encode the authentication packet */
2667
0
        rc = MqttEncode_Auth(client->tx_buf, client->tx_buf_len, auth);
2668
    #ifdef WOLFMQTT_DEBUG_CLIENT
2669
        PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d",
2670
            rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_AUTH),
2671
            MQTT_PACKET_TYPE_AUTH, 0, 0);
2672
    #endif
2673
0
        if (rc <= 0) {
2674
0
            MqttWriteStop(client, &auth->stat);
2675
0
            return rc;
2676
0
        }
2677
0
        client->write.len = rc;
2678
2679
    #ifdef WOLFMQTT_MULTITHREAD
2680
        rc = wm_SemLock(&client->lockClient);
2681
        if (rc == 0) {
2682
            /* inform other threads of expected response */
2683
            rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_AUTH, 0,
2684
                &auth->pendResp, auth);
2685
            wm_SemUnlock(&client->lockClient);
2686
        }
2687
        if (rc != 0) {
2688
            MqttWriteStop(client, &auth->stat);
2689
            return rc; /* Error locking client */
2690
        }
2691
    #endif
2692
2693
0
        auth->stat.write = MQTT_MSG_HEADER;
2694
0
    }
2695
0
    if (auth->stat.write == MQTT_MSG_BEGIN) {
2696
0
        int xfer = client->write.len;
2697
2698
        /* Send authentication packet */
2699
0
        rc = MqttPacket_Write(client, client->tx_buf, xfer);
2700
    #ifdef WOLFMQTT_NONBLOCK
2701
        if (rc == MQTT_CODE_CONTINUE
2702
        #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK
2703
            && client->write.total > 0
2704
        #endif
2705
        ) {
2706
            /* keep send locked and return early */
2707
            return rc;
2708
        }
2709
    #endif
2710
0
        MqttWriteStop(client, &auth->stat);
2711
0
        if (rc != xfer) {
2712
0
            MqttClient_CancelMessage(client, (MqttObject*)auth);
2713
0
            return rc;
2714
0
        }
2715
2716
0
        auth->stat.write = MQTT_MSG_WAIT;
2717
0
    }
2718
2719
    /* Wait for auth packet */
2720
0
    rc = MqttClient_WaitType(client, auth, MQTT_PACKET_TYPE_AUTH, 0,
2721
0
        client->cmd_timeout_ms);
2722
#if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD)
2723
    if (rc == MQTT_CODE_CONTINUE)
2724
        return rc;
2725
#endif
2726
2727
#ifdef WOLFMQTT_MULTITHREAD
2728
    if (wm_SemLock(&client->lockClient) == 0) {
2729
        MqttClient_RespList_Remove(client, &auth->pendResp);
2730
        wm_SemUnlock(&client->lockClient);
2731
    }
2732
#endif
2733
2734
    /* reset state */
2735
0
    auth->stat.write = MQTT_MSG_BEGIN;
2736
2737
0
    return rc;
2738
0
}
2739
2740
MqttProp* MqttClient_PropsAdd(MqttProp **head)
2741
0
{
2742
0
    return MqttProps_Add(head);
2743
0
}
2744
2745
int MqttClient_PropsFree(MqttProp *head)
2746
0
{
2747
0
    return MqttProps_Free(head);
2748
0
}
2749
2750
#endif /* WOLFMQTT_V5 */
2751
2752
int MqttClient_WaitMessage_ex(MqttClient *client, MqttObject* msg,
2753
        int timeout_ms)
2754
3.46k
{
2755
3.46k
    return MqttClient_WaitType(client, msg, MQTT_PACKET_TYPE_ANY, 0,
2756
3.46k
        timeout_ms);
2757
3.46k
}
2758
int MqttClient_WaitMessage(MqttClient *client, int timeout_ms)
2759
3.46k
{
2760
3.46k
    if (client == NULL)
2761
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2762
3.46k
    return MqttClient_WaitMessage_ex(client, &client->msg, timeout_ms);
2763
3.46k
}
2764
2765
#if !defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_NONBLOCK)
2766
static
2767
#endif
2768
int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg)
2769
3.21k
{
2770
3.21k
    int rc = MQTT_CODE_SUCCESS;
2771
3.21k
    MqttMsgStat* mms_stat;
2772
#ifdef WOLFMQTT_MULTITHREAD
2773
    MqttPendResp* tmpResp;
2774
#endif
2775
2776
3.21k
    if (client == NULL || msg == NULL) {
2777
0
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2778
0
    }
2779
2780
    /* all packet type structures must have MqttMsgStat at top */
2781
3.21k
    mms_stat = (MqttMsgStat*)msg;
2782
2783
#ifdef WOLFMQTT_DEBUG_CLIENT
2784
    PRINTF("Cancel Msg: %p", msg);
2785
#endif
2786
2787
    /* reset states */
2788
3.21k
    mms_stat->write = MQTT_MSG_BEGIN;
2789
3.21k
    mms_stat->read = MQTT_MSG_BEGIN;
2790
2791
#ifdef WOLFMQTT_MULTITHREAD
2792
    /* Remove any pending responses expected */
2793
    rc = wm_SemLock(&client->lockClient);
2794
    if (rc != MQTT_CODE_SUCCESS) {
2795
        return rc;
2796
    }
2797
2798
    for (tmpResp = client->firstPendResp;
2799
         tmpResp != NULL;
2800
         tmpResp = tmpResp->next)
2801
    {
2802
    #ifdef WOLFMQTT_DEBUG_CLIENT
2803
        PRINTF("\tMsg: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d",
2804
            tmpResp, tmpResp->packet_obj,
2805
            MqttPacket_TypeDesc(tmpResp->packet_type),
2806
            tmpResp->packet_type, tmpResp->packet_id,
2807
            tmpResp->packetProcessing, tmpResp->packetDone);
2808
    #endif
2809
        if ((size_t)tmpResp->packet_obj == (size_t)msg ||
2810
            (size_t)tmpResp - OFFSETOF(MqttMessage, pendResp) == (size_t)msg) {
2811
        #ifdef WOLFMQTT_DEBUG_CLIENT
2812
            PRINTF("Found Cancel Msg: %p (obj %p), Type %s (%d), ID %d, "
2813
                   "InProc %d, Done %d",
2814
                tmpResp, tmpResp->packet_obj,
2815
                MqttPacket_TypeDesc(tmpResp->packet_type),
2816
                tmpResp->packet_type, tmpResp->packet_id,
2817
                tmpResp->packetProcessing, tmpResp->packetDone);
2818
        #endif
2819
            MqttClient_RespList_Remove(client, tmpResp);
2820
            break;
2821
        }
2822
    }
2823
    wm_SemUnlock(&client->lockClient);
2824
#endif /* WOLFMQTT_MULTITHREAD */
2825
2826
    /* cancel any active flags / locks */
2827
3.21k
    if (mms_stat->isReadActive) {
2828
    #ifdef WOLFMQTT_DEBUG_CLIENT
2829
        PRINTF("Cancel Read Lock");
2830
    #endif
2831
0
        MqttReadStop(client, mms_stat);
2832
0
    }
2833
3.21k
    if (mms_stat->isWriteActive) {
2834
    #ifdef WOLFMQTT_DEBUG_CLIENT
2835
        PRINTF("Cancel Write Lock");
2836
    #endif
2837
0
        MqttWriteStop(client, mms_stat);
2838
0
    }
2839
2840
3.21k
    return rc;
2841
3.21k
}
2842
2843
#ifdef WOLFMQTT_NONBLOCK
2844
static inline int IsMessageActive(MqttObject *msg)
2845
{
2846
    return (msg->stat.read  != MQTT_MSG_BEGIN ||
2847
            msg->stat.write != MQTT_MSG_BEGIN);
2848
}
2849
2850
int MqttClient_IsMessageActive(
2851
    MqttClient *client,
2852
    MqttObject *msg)
2853
{
2854
    int rc;
2855
2856
    /* must supply either client or msg */
2857
    if (client == NULL && msg == NULL) {
2858
        return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
2859
    }
2860
2861
    /* if msg is null then client->msg is used */
2862
    if ((client != NULL && &client->msg == msg) || msg == NULL) {
2863
    #ifdef WOLFMQTT_MULTITHREAD
2864
        rc = wm_SemLock(&client->lockClient);
2865
        if (rc == 0)
2866
    #endif
2867
        {
2868
            rc = IsMessageActive(&client->msg);
2869
        #ifdef WOLFMQTT_MULTITHREAD
2870
            wm_SemUnlock(&client->lockClient);
2871
        #endif
2872
        }
2873
    }
2874
    else {
2875
        rc = IsMessageActive(msg);
2876
    }
2877
    return rc;
2878
}
2879
2880
2881
#endif /* WOLFMQTT_NONBLOCK */
2882
2883
2884
int MqttClient_NetConnect(MqttClient *client, const char* host,
2885
    word16 port, int timeout_ms, int use_tls, MqttTlsCb cb)
2886
2.69k
{
2887
2.69k
    return MqttSocket_Connect(client, host, port, timeout_ms, use_tls, cb);
2888
2.69k
}
2889
2890
int MqttClient_NetDisconnect(MqttClient *client)
2891
104
{
2892
#ifdef WOLFMQTT_MULTITHREAD
2893
    MqttPendResp *tmpResp;
2894
    int rc;
2895
#endif
2896
2897
104
    if (client == NULL) {
2898
0
        return MQTT_CODE_ERROR_BAD_ARG;
2899
0
    }
2900
2901
#ifdef WOLFMQTT_MULTITHREAD
2902
    /* Get client lock on to ensure no other threads are active */
2903
    rc = wm_SemLock(&client->lockClient);
2904
    if (rc == 0) {
2905
    #ifdef WOLFMQTT_DEBUG_CLIENT
2906
        PRINTF("Net Disconnect: Removing pending responses");
2907
    #endif
2908
        for (tmpResp = client->firstPendResp;
2909
             tmpResp != NULL;
2910
             tmpResp = tmpResp->next) {
2911
        #ifdef WOLFMQTT_DEBUG_CLIENT
2912
            PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d",
2913
                tmpResp, tmpResp->packet_obj,
2914
                MqttPacket_TypeDesc(tmpResp->packet_type),
2915
                tmpResp->packet_type, tmpResp->packet_id,
2916
                tmpResp->packetProcessing, tmpResp->packetDone);
2917
        #endif
2918
            MqttClient_RespList_Remove(client, tmpResp);
2919
        }
2920
        wm_SemUnlock(&client->lockClient);
2921
    }
2922
    else {
2923
        return rc;
2924
    }
2925
#endif
2926
2927
104
    return MqttSocket_Disconnect(client);
2928
104
}
2929
2930
int MqttClient_GetProtocolVersion(MqttClient *client)
2931
0
{
2932
0
#ifdef WOLFMQTT_V5
2933
0
    if (client && client->protocol_level == MQTT_CONNECT_PROTOCOL_LEVEL_5)
2934
0
        return MQTT_CONNECT_PROTOCOL_LEVEL_5;
2935
#else
2936
    (void)client;
2937
#endif
2938
0
    return MQTT_CONNECT_PROTOCOL_LEVEL_4;
2939
0
}
2940
const char* MqttClient_GetProtocolVersionString(MqttClient *client)
2941
0
{
2942
0
    const char* str = NULL;
2943
0
    int ver = MqttClient_GetProtocolVersion(client);
2944
0
    switch (ver) {
2945
0
        case MQTT_CONNECT_PROTOCOL_LEVEL_4:
2946
0
            return "v3.1.1";
2947
0
    #ifdef WOLFMQTT_V5
2948
0
        case MQTT_CONNECT_PROTOCOL_LEVEL_5:
2949
0
            return "v5";
2950
0
    #endif
2951
0
        default:
2952
0
            break;
2953
0
    }
2954
0
    return str;
2955
0
}
2956
2957
#ifndef WOLFMQTT_NO_ERROR_STRINGS
2958
const char* MqttClient_ReturnCodeToString(int return_code)
2959
0
{
2960
0
    switch(return_code) {
2961
0
        case MQTT_CODE_SUCCESS:
2962
0
            return "Success";
2963
0
        case MQTT_CODE_CONTINUE:
2964
0
            return "Continue"; /* would block */
2965
0
        case MQTT_CODE_STDIN_WAKE:
2966
0
            return "STDIN Wake";
2967
0
        case MQTT_CODE_PUB_CONTINUE:
2968
0
            return "Continue calling publish"; /* Chunked publish */
2969
0
        case MQTT_CODE_ERROR_BAD_ARG:
2970
0
            return "Error (Bad argument)";
2971
0
        case MQTT_CODE_ERROR_OUT_OF_BUFFER:
2972
0
            return "Error (Out of buffer)";
2973
0
        case MQTT_CODE_ERROR_MALFORMED_DATA:
2974
0
            return "Error (Malformed Remaining Length)";
2975
0
        case MQTT_CODE_ERROR_PACKET_TYPE:
2976
0
            return "Error (Packet Type Mismatch)";
2977
0
        case MQTT_CODE_ERROR_PACKET_ID:
2978
0
            return "Error (Packet Id Mismatch)";
2979
0
        case MQTT_CODE_ERROR_TLS_CONNECT:
2980
0
            return "Error (TLS Connect)";
2981
0
        case MQTT_CODE_ERROR_TIMEOUT:
2982
0
            return "Error (Timeout)";
2983
0
        case MQTT_CODE_ERROR_NETWORK:
2984
0
            return "Error (Network)";
2985
0
        case MQTT_CODE_ERROR_MEMORY:
2986
0
            return "Error (Memory)";
2987
0
        case MQTT_CODE_ERROR_STAT:
2988
0
            return "Error (State)";
2989
0
        case MQTT_CODE_ERROR_PROPERTY:
2990
0
            return "Error (Property)";
2991
0
        case MQTT_CODE_ERROR_SERVER_PROP:
2992
0
            return "Error (Server Property)";
2993
0
        case MQTT_CODE_ERROR_CALLBACK:
2994
0
            return "Error (Error in Callback)";
2995
0
        case MQTT_CODE_ERROR_SYSTEM:
2996
0
            return "Error (System resource failed)";
2997
0
        case MQTT_CODE_ERROR_NOT_FOUND:
2998
0
            return "Error (Not found)";
2999
#if defined(ENABLE_MQTT_CURL)
3000
        case MQTT_CODE_ERROR_CURL:
3001
            return "Error (libcurl)";
3002
#endif
3003
3004
0
#ifdef WOLFMQTT_V5
3005
        /* MQTT v5 Reason code strings */
3006
0
        case MQTT_REASON_UNSPECIFIED_ERR:
3007
0
            return "Unspecified error";
3008
0
        case MQTT_REASON_MALFORMED_PACKET:
3009
0
            return "Malformed Packet";
3010
0
        case MQTT_REASON_PROTOCOL_ERR:
3011
0
            return "Protocol Error";
3012
0
        case MQTT_REASON_IMPL_SPECIFIC_ERR:
3013
0
            return "Implementation specific error";
3014
0
        case MQTT_REASON_UNSUP_PROTO_VER:
3015
0
            return "Unsupported Protocol Version";
3016
0
        case MQTT_REASON_CLIENT_ID_NOT_VALID:
3017
0
            return "Client Identifier not valid";
3018
0
        case MQTT_REASON_BAD_USER_OR_PASS:
3019
0
            return "Bad User Name or Password";
3020
0
        case MQTT_REASON_NOT_AUTHORIZED:
3021
0
            return "Not authorized";
3022
0
        case MQTT_REASON_SERVER_UNAVAILABLE:
3023
0
            return "Server unavailable";
3024
0
        case MQTT_REASON_SERVER_BUSY:
3025
0
            return "Server busy";
3026
0
        case MQTT_REASON_BANNED:
3027
0
            return "Banned";
3028
0
        case MQTT_REASON_SERVER_SHUTTING_DOWN:
3029
0
            return "Server shutting down";
3030
0
        case MQTT_REASON_BAD_AUTH_METHOD:
3031
0
            return "Bad authentication method";
3032
0
        case MQTT_REASON_KEEP_ALIVE_TIMEOUT:
3033
0
            return "Keep Alive timeout";
3034
0
        case MQTT_REASON_SESSION_TAKEN_OVER:
3035
0
            return "Session taken over";
3036
0
        case MQTT_REASON_TOPIC_FILTER_INVALID:
3037
0
            return "Topic Filter invalid";
3038
0
        case MQTT_REASON_TOPIC_NAME_INVALID:
3039
0
            return "Topic Name invalid";
3040
0
        case MQTT_REASON_PACKET_ID_IN_USE:
3041
0
            return "Packet Identifier in use";
3042
0
        case MQTT_REASON_PACKET_ID_NOT_FOUND:
3043
0
            return "Packet Identifier not found";
3044
0
        case MQTT_REASON_RX_MAX_EXCEEDED:
3045
0
            return "Receive Maximum exceeded";
3046
0
        case MQTT_REASON_TOPIC_ALIAS_INVALID:
3047
0
            return "Topic Alias invalid";
3048
0
        case MQTT_REASON_PACKET_TOO_LARGE:
3049
0
            return "Packet too large";
3050
0
        case MQTT_REASON_MSG_RATE_TOO_HIGH:
3051
0
            return "Message rate too high";
3052
0
        case MQTT_REASON_QUOTA_EXCEEDED:
3053
0
            return "Quota exceeded";
3054
0
        case MQTT_REASON_ADMIN_ACTION:
3055
0
            return "Administrative action";
3056
0
        case MQTT_REASON_PAYLOAD_FORMAT_INVALID:
3057
0
            return "Payload format invalid";
3058
0
        case MQTT_REASON_RETAIN_NOT_SUPPORTED:
3059
0
            return "Retain not supported";
3060
0
        case MQTT_REASON_QOS_NOT_SUPPORTED:
3061
0
            return "QoS not supported";
3062
0
        case MQTT_REASON_USE_ANOTHER_SERVER:
3063
0
            return "Use another server";
3064
0
        case MQTT_REASON_SERVER_MOVED:
3065
0
            return "Server moved";
3066
0
        case MQTT_REASON_SS_NOT_SUPPORTED:
3067
0
            return "Shared Subscriptions not supported";
3068
0
        case MQTT_REASON_CON_RATE_EXCEED:
3069
0
            return "Connection rate exceeded";
3070
0
        case MQTT_REASON_MAX_CON_TIME:
3071
0
            return "Maximum connect time";
3072
0
        case MQTT_REASON_SUB_ID_NOT_SUP:
3073
0
            return "Subscription Identifiers not supported";
3074
0
        case MQTT_REASON_WILDCARD_SUB_NOT_SUP:
3075
0
            return "Wildcard Subscriptions not supported";
3076
0
#endif
3077
0
    }
3078
0
    return "Unknown";
3079
0
}
3080
#endif /* !WOLFMQTT_NO_ERROR_STRINGS */
3081
3082
word32 MqttClient_Flags(MqttClient *client,  word32 mask, word32 flags)
3083
36.7k
{
3084
36.7k
    word32 ret = 0;
3085
36.7k
    if (client != NULL) {
3086
#ifdef WOLFMQTT_MULTITHREAD
3087
        /* Get client lock on to ensure no other threads are active */
3088
        if (wm_SemLock(&client->lockClient) == 0)
3089
#endif
3090
36.7k
        {
3091
36.7k
            client->flags &= ~mask;
3092
36.7k
            client->flags |= flags;
3093
36.7k
            ret = client->flags;
3094
#ifdef WOLFMQTT_MULTITHREAD
3095
            wm_SemUnlock(&client->lockClient);
3096
#endif
3097
36.7k
        }
3098
36.7k
    }
3099
36.7k
    return ret;
3100
36.7k
}