/src/wolfmqtt/src/mqtt_client.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* mqtt_client.c |
2 | | * |
3 | | * Copyright (C) 2006-2025 wolfSSL Inc. |
4 | | * |
5 | | * This file is part of wolfMQTT. |
6 | | * |
7 | | * wolfMQTT is free software; you can redistribute it and/or modify |
8 | | * it under the terms of the GNU General Public License as published by |
9 | | * the Free Software Foundation; either version 2 of the License, or |
10 | | * (at your option) any later version. |
11 | | * |
12 | | * wolfMQTT is distributed in the hope that it will be useful, |
13 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
15 | | * GNU General Public License for more details. |
16 | | * |
17 | | * You should have received a copy of the GNU General Public License |
18 | | * along with this program; if not, write to the Free Software |
19 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA |
20 | | */ |
21 | | |
22 | | /* Include the autoconf generated config.h */ |
23 | | #ifdef HAVE_CONFIG_H |
24 | | #include <config.h> |
25 | | #endif |
26 | | |
27 | | #include "wolfmqtt/mqtt_client.h" |
28 | | |
29 | | /* DOCUMENTED BUILD OPTIONS: |
30 | | * |
31 | | * WOLFMQTT_MULTITHREAD: Enables multi-thread support with mutex protection on |
32 | | * client struct, write and read. When a pending response is needed its added |
33 | | * to a linked list and if another thread reads the expected response it is |
34 | | * flagged, so the other thread knows it completed. |
35 | | * |
36 | | * WOLFMQTT_NONBLOCK: Enabled transport support for returning WANT READ/WRITE, |
37 | | * which becomes WOLFMQTT_CODE_CONTINUE. This prevents blocking if the |
38 | | * transport (socket) has no data. |
39 | | * |
40 | | * WOLFMQTT_V5: Enables MQTT v5.0 support |
41 | | * |
42 | | * WOLFMQTT_ALLOW_NODATA_UNLOCK: Used with multi-threading and non-blocking to |
43 | | * allow unlock if no data was sent/received. Note the TLS stack typically |
44 | | * requires an attempt to write to continue with same write, not different. |
45 | | * By default if we attempt a write we keep the mutex locked and return |
46 | | * MQTT_CODE_CONTINUE |
47 | | * |
48 | | * WOLFMQTT_USER_THREADING: Allows custom mutex functions to be defined by the |
49 | | * user. Example: wm_SemInit |
50 | | * |
51 | | * WOLFMQTT_DEBUG_CLIENT: Enables verbose PRINTF for the client code. |
52 | | */ |
53 | | |
54 | | |
55 | | /* Private functions */ |
56 | | |
57 | | /* forward declarations */ |
58 | | static int MqttClient_Publish_ReadPayload(MqttClient* client, |
59 | | MqttPublish* publish, int timeout_ms); |
60 | | #if !defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_NONBLOCK) |
61 | | static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); |
62 | | #endif |
63 | | |
64 | | |
65 | | #ifdef WOLFMQTT_MULTITHREAD |
66 | | |
67 | | #ifdef WOLFMQTT_USER_THREADING |
68 | | |
69 | | /* User will supply their own semaphore functions. |
70 | | * int wm_SemInit(wm_Sem *s) |
71 | | * int wm_SemFree(wm_Sem *s) |
72 | | * int wm_SemLock(wm_Sem *s) |
73 | | * int wm_SemUnlock(wm_Sem *s) |
74 | | */ |
75 | | |
76 | | #elif defined(__MACH__) |
77 | | |
78 | | /* Apple style dispatch semaphore */ |
79 | | int wm_SemInit(wm_Sem *s) { |
80 | | /* dispatch_release() fails hard, with Trace/BPT trap signal, if the |
81 | | * sem's internal count is less than the value passed in with |
82 | | * dispatch_semaphore_create(). work around this by initializing |
83 | | * with 0, then incrementing it afterwards. |
84 | | */ |
85 | | s->sem = dispatch_semaphore_create(0); |
86 | | if (s->sem == NULL) |
87 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_MEMORY); |
88 | | if (dispatch_semaphore_signal(s->sem) < 0) { |
89 | | dispatch_release(s->sem); |
90 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); |
91 | | } |
92 | | |
93 | | return 0; |
94 | | } |
95 | | int wm_SemFree(wm_Sem *s) { |
96 | | if ((s == NULL) || |
97 | | (s->sem == NULL)) |
98 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
99 | | dispatch_release(s->sem); |
100 | | s->sem = NULL; |
101 | | return 0; |
102 | | } |
103 | | int wm_SemLock(wm_Sem *s) { |
104 | | dispatch_semaphore_wait(s->sem, DISPATCH_TIME_FOREVER); |
105 | | return 0; |
106 | | } |
107 | | int wm_SemUnlock(wm_Sem *s){ |
108 | | dispatch_semaphore_signal(s->sem); |
109 | | return 0; |
110 | | } |
111 | | #elif defined(WOLFMQTT_POSIX_SEMAPHORES) |
112 | | /* Posix style semaphore */ |
113 | | int wm_SemInit(wm_Sem *s) { |
114 | | #ifndef WOLFMQTT_NO_COND_SIGNAL |
115 | | s->lockCount = 0; |
116 | | pthread_cond_init(&s->cond, NULL); |
117 | | #endif |
118 | | pthread_mutex_init(&s->mutex, NULL); |
119 | | return 0; |
120 | | } |
121 | | int wm_SemFree(wm_Sem *s) { |
122 | | pthread_mutex_destroy(&s->mutex); |
123 | | #ifndef WOLFMQTT_NO_COND_SIGNAL |
124 | | pthread_cond_destroy(&s->cond); |
125 | | #endif |
126 | | return 0; |
127 | | } |
128 | | int wm_SemLock(wm_Sem *s) { |
129 | | pthread_mutex_lock(&s->mutex); |
130 | | #ifndef WOLFMQTT_NO_COND_SIGNAL |
131 | | while (s->lockCount > 0) |
132 | | pthread_cond_wait(&s->cond, &s->mutex); |
133 | | s->lockCount++; |
134 | | pthread_mutex_unlock(&s->mutex); |
135 | | #endif |
136 | | return 0; |
137 | | } |
138 | | int wm_SemUnlock(wm_Sem *s) { |
139 | | #ifndef WOLFMQTT_NO_COND_SIGNAL |
140 | | pthread_mutex_lock(&s->mutex); |
141 | | if (s->lockCount > 0) { |
142 | | s->lockCount--; |
143 | | pthread_cond_signal(&s->cond); |
144 | | } |
145 | | #endif |
146 | | pthread_mutex_unlock(&s->mutex); |
147 | | return 0; |
148 | | } |
149 | | #elif defined(FREERTOS) |
150 | | /* FreeRTOS binary semaphore */ |
151 | | int wm_SemInit(wm_Sem *s) { |
152 | | *s = xSemaphoreCreateBinary(); |
153 | | xSemaphoreGive(*s); |
154 | | return 0; |
155 | | } |
156 | | int wm_SemFree(wm_Sem *s) { |
157 | | vSemaphoreDelete(*s); |
158 | | *s = NULL; |
159 | | return 0; |
160 | | } |
161 | | int wm_SemLock(wm_Sem *s) { |
162 | | xSemaphoreTake(*s, portMAX_DELAY); |
163 | | return 0; |
164 | | } |
165 | | int wm_SemUnlock(wm_Sem *s) { |
166 | | xSemaphoreGive(*s); |
167 | | return 0; |
168 | | } |
169 | | #elif defined(USE_WINDOWS_API) |
170 | | /* Windows semaphore object */ |
171 | | int wm_SemInit(wm_Sem *s) { |
172 | | *s = CreateSemaphoreW( NULL, 1, 1, NULL); |
173 | | return 0; |
174 | | } |
175 | | int wm_SemFree(wm_Sem *s) { |
176 | | CloseHandle(*s); |
177 | | *s = NULL; |
178 | | return 0; |
179 | | } |
180 | | int wm_SemLock(wm_Sem *s) { |
181 | | WaitForSingleObject(*s, INFINITE); |
182 | | return 0; |
183 | | } |
184 | | int wm_SemUnlock(wm_Sem *s) { |
185 | | ReleaseSemaphore(*s, 1, NULL); |
186 | | return 0; |
187 | | } |
188 | | |
189 | | #elif defined(THREADX) |
190 | | /* ThreadX semaphore */ |
191 | | int wm_SemInit(wm_Sem *s) { |
192 | | if (tx_semaphore_create(s, NULL, 1) != TX_SUCCESS) { |
193 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); |
194 | | } |
195 | | return 0; |
196 | | } |
197 | | int wm_SemFree(wm_Sem *s) { |
198 | | if (tx_semaphore_delete(s) != TX_SUCCESS) { |
199 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); |
200 | | } |
201 | | return 0; |
202 | | } |
203 | | |
204 | | static UINT semstatus; |
205 | | int wm_SemLock(wm_Sem *s) { |
206 | | semstatus = tx_semaphore_get(s, TX_WAIT_FOREVER); |
207 | | if (semstatus != TX_SUCCESS) { |
208 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); |
209 | | } |
210 | | return 0; |
211 | | } |
212 | | int wm_SemUnlock(wm_Sem *s) { |
213 | | if (tx_semaphore_put(s) != TX_SUCCESS) { |
214 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); |
215 | | } |
216 | | return 0; |
217 | | } |
218 | | #endif /* MUTEX */ |
219 | | #endif /* WOLFMQTT_MULTITHREAD */ |
220 | | |
221 | | static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat) |
222 | 7.31k | { |
223 | 7.31k | int rc = MQTT_CODE_SUCCESS; |
224 | | |
225 | 7.31k | #if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK) |
226 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
227 | | if (stat->isWriteActive) { |
228 | | MQTT_TRACE_MSG("Warning, send already locked!"); |
229 | | rc = MQTT_CODE_ERROR_SYSTEM; |
230 | | } |
231 | | #endif |
232 | 7.31k | #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK |
233 | | /* detect if a write is already in progress */ |
234 | | #ifdef WOLFMQTT_MULTITHREAD |
235 | | if (wm_SemLock(&client->lockClient) == 0) |
236 | | #endif |
237 | 7.31k | { |
238 | 7.31k | if (client->write.isActive) { |
239 | 0 | MQTT_TRACE_MSG("Partial write in progress!"); |
240 | 0 | rc = MQTT_CODE_CONTINUE; /* can't write yet */ |
241 | 0 | } |
242 | | #ifdef WOLFMQTT_MULTITHREAD |
243 | | wm_SemUnlock(&client->lockClient); |
244 | | #endif |
245 | 7.31k | } |
246 | 7.31k | #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */ |
247 | 7.31k | if (rc != MQTT_CODE_SUCCESS) { |
248 | 0 | return rc; |
249 | 0 | } |
250 | 7.31k | #endif |
251 | | |
252 | | #ifdef WOLFMQTT_MULTITHREAD |
253 | | rc = wm_SemLock(&client->lockSend); |
254 | | #endif |
255 | 7.31k | if (rc == MQTT_CODE_SUCCESS) { |
256 | 7.31k | stat->isWriteActive = 1; |
257 | | |
258 | | #ifdef WOLFMQTT_MULTITHREAD |
259 | | if (wm_SemLock(&client->lockClient) == 0) |
260 | | #endif |
261 | 7.31k | { |
262 | 7.31k | client->write.isActive = 1; |
263 | | #ifdef WOLFMQTT_MULTITHREAD |
264 | | wm_SemUnlock(&client->lockClient); |
265 | | #endif |
266 | 7.31k | } |
267 | | |
268 | 7.31k | MQTT_TRACE_MSG("lockSend"); |
269 | 7.31k | } |
270 | | |
271 | 7.31k | return rc; |
272 | 7.31k | } |
273 | | static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) |
274 | 7.31k | { |
275 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
276 | | if (!stat->isWriteActive) { |
277 | | MQTT_TRACE_MSG("Warning, send not locked!"); |
278 | | return; |
279 | | } |
280 | | #endif |
281 | | |
282 | | #ifdef WOLFMQTT_MULTITHREAD |
283 | | if (wm_SemLock(&client->lockClient) == 0) |
284 | | #endif |
285 | 7.31k | { |
286 | | /* reset write */ |
287 | 7.31k | XMEMSET(&client->write, 0, sizeof(client->write)); |
288 | | #ifdef WOLFMQTT_MULTITHREAD |
289 | | wm_SemUnlock(&client->lockClient); |
290 | | #endif |
291 | 7.31k | } |
292 | | |
293 | 7.31k | if (stat->isWriteActive) { |
294 | 7.31k | MQTT_TRACE_MSG("unlockSend"); |
295 | 7.31k | stat->isWriteActive = 0; |
296 | | #ifdef WOLFMQTT_MULTITHREAD |
297 | | wm_SemUnlock(&client->lockSend); |
298 | | #endif |
299 | 7.31k | } |
300 | 7.31k | } |
301 | | |
302 | | static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) |
303 | 9.08k | { |
304 | 9.08k | int rc = MQTT_CODE_SUCCESS; |
305 | | |
306 | 9.08k | #if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK) |
307 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
308 | | if (stat->isReadActive) { |
309 | | MQTT_TRACE_MSG("Warning, recv already locked!"); |
310 | | rc = MQTT_CODE_ERROR_SYSTEM; |
311 | | } |
312 | | #endif /* WOLFMQTT_DEBUG_CLIENT */ |
313 | 9.08k | #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK |
314 | | /* detect if a read is already in progress */ |
315 | | #ifdef WOLFMQTT_MULTITHREAD |
316 | | if (wm_SemLock(&client->lockClient) == 0) |
317 | | #endif |
318 | 9.08k | { |
319 | 9.08k | if (client->read.isActive) { |
320 | 0 | MQTT_TRACE_MSG("Partial read in progress!"); |
321 | 0 | rc = MQTT_CODE_CONTINUE; /* can't read yet */ |
322 | 0 | } |
323 | | #ifdef WOLFMQTT_MULTITHREAD |
324 | | wm_SemUnlock(&client->lockClient); |
325 | | #endif |
326 | 9.08k | } |
327 | 9.08k | #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */ |
328 | 9.08k | if (rc != MQTT_CODE_SUCCESS) { |
329 | 0 | return rc; |
330 | 0 | } |
331 | 9.08k | #endif /* WOLFMQTT_DEBUG_CLIENT || !WOLFMQTT_ALLOW_NODATA_UNLOCK */ |
332 | | |
333 | | #ifdef WOLFMQTT_MULTITHREAD |
334 | | rc = wm_SemLock(&client->lockRecv); |
335 | | #endif |
336 | 9.08k | if (rc == MQTT_CODE_SUCCESS) { |
337 | 9.08k | stat->isReadActive = 1; |
338 | | |
339 | | #ifdef WOLFMQTT_MULTITHREAD |
340 | | if (wm_SemLock(&client->lockClient) == 0) |
341 | | #endif |
342 | 9.08k | { |
343 | | /* mark read active */ |
344 | 9.08k | client->read.isActive = 1; |
345 | | |
346 | | /* reset the packet state used by MqttPacket_Read */ |
347 | 9.08k | client->packet.stat = MQTT_PK_BEGIN; |
348 | | |
349 | | #ifdef WOLFMQTT_MULTITHREAD |
350 | | wm_SemUnlock(&client->lockClient); |
351 | | #endif |
352 | 9.08k | } |
353 | | |
354 | 9.08k | MQTT_TRACE_MSG("lockRecv"); |
355 | 9.08k | } |
356 | | |
357 | 9.08k | return rc; |
358 | 9.08k | } |
359 | | static void MqttReadStop(MqttClient* client, MqttMsgStat* stat) |
360 | 18.3k | { |
361 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
362 | | if (!stat->isReadActive) { |
363 | | MQTT_TRACE_MSG("Warning, recv not locked!"); |
364 | | return; |
365 | | } |
366 | | #endif |
367 | | |
368 | | #ifdef WOLFMQTT_MULTITHREAD |
369 | | if (wm_SemLock(&client->lockClient) == 0) |
370 | | #endif |
371 | 18.3k | { |
372 | | /* reset read */ |
373 | 18.3k | XMEMSET(&client->read, 0, sizeof(client->read)); |
374 | | #ifdef WOLFMQTT_MULTITHREAD |
375 | | wm_SemUnlock(&client->lockClient); |
376 | | #endif |
377 | 18.3k | } |
378 | | |
379 | 18.3k | if (stat->isReadActive) { |
380 | 9.08k | MQTT_TRACE_MSG("unlockRecv"); |
381 | 9.08k | stat->isReadActive = 0; |
382 | | #ifdef WOLFMQTT_MULTITHREAD |
383 | | wm_SemUnlock(&client->lockRecv); |
384 | | #endif |
385 | 9.08k | } |
386 | 18.3k | } |
387 | | |
388 | | #ifdef WOLFMQTT_MULTITHREAD |
389 | | |
390 | | /* These RespList functions assume caller has locked client->lockClient mutex */ |
391 | | int MqttClient_RespList_Add(MqttClient *client, |
392 | | MqttPacketType packet_type, word16 packet_id, MqttPendResp *newResp, |
393 | | void *packet_obj) |
394 | | { |
395 | | MqttPendResp *tmpResp; |
396 | | |
397 | | if (client == NULL) |
398 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
399 | | |
400 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
401 | | PRINTF("PendResp Add: %p, Type %s (%d), ID %d", |
402 | | newResp, MqttPacket_TypeDesc(packet_type), packet_type, packet_id); |
403 | | #endif |
404 | | |
405 | | /* verify newResp is not already in the list */ |
406 | | for (tmpResp = client->firstPendResp; |
407 | | tmpResp != NULL; |
408 | | tmpResp = tmpResp->next) |
409 | | { |
410 | | if (tmpResp == newResp) { |
411 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
412 | | PRINTF("Pending Response already in list!"); |
413 | | #endif |
414 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
415 | | } |
416 | | } |
417 | | |
418 | | /* Initialize new response */ |
419 | | XMEMSET(newResp, 0, sizeof(MqttPendResp)); |
420 | | newResp->packet_id = packet_id; |
421 | | newResp->packet_type = packet_type; |
422 | | /* opaque pointer to struct based on type */ |
423 | | newResp->packet_obj = packet_obj; |
424 | | |
425 | | if (client->lastPendResp == NULL) { |
426 | | /* This is the only list item */ |
427 | | client->firstPendResp = newResp; |
428 | | client->lastPendResp = newResp; |
429 | | } |
430 | | else { |
431 | | /* Append to end of list */ |
432 | | newResp->prev = client->lastPendResp; |
433 | | client->lastPendResp->next = newResp; |
434 | | client->lastPendResp = newResp; |
435 | | } |
436 | | return MQTT_CODE_SUCCESS; |
437 | | } |
438 | | |
439 | | void MqttClient_RespList_Remove(MqttClient *client, MqttPendResp *rmResp) |
440 | | { |
441 | | MqttPendResp *tmpResp; |
442 | | |
443 | | if (client == NULL) |
444 | | return; |
445 | | |
446 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
447 | | PRINTF("PendResp Remove: %p", rmResp); |
448 | | #endif |
449 | | |
450 | | /* Find the response entry */ |
451 | | for (tmpResp = client->firstPendResp; |
452 | | tmpResp != NULL; |
453 | | tmpResp = tmpResp->next) |
454 | | { |
455 | | if (tmpResp == rmResp) { |
456 | | break; |
457 | | } |
458 | | } |
459 | | if (tmpResp) { |
460 | | /* Fix up the first and last pointers */ |
461 | | if (client->firstPendResp == tmpResp) { |
462 | | client->firstPendResp = tmpResp->next; |
463 | | } |
464 | | if (client->lastPendResp == tmpResp) { |
465 | | client->lastPendResp = tmpResp->prev; |
466 | | } |
467 | | |
468 | | /* Remove the entry from the list */ |
469 | | if (tmpResp->next != NULL) { |
470 | | tmpResp->next->prev = tmpResp->prev; |
471 | | } |
472 | | if (tmpResp->prev != NULL) { |
473 | | tmpResp->prev->next = tmpResp->next; |
474 | | } |
475 | | } |
476 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
477 | | else { |
478 | | PRINTF("\tPendResp not found"); |
479 | | } |
480 | | #endif |
481 | | } |
482 | | |
483 | | /* return codes: 0=not found, 1=found */ |
484 | | int MqttClient_RespList_Find(MqttClient *client, |
485 | | MqttPacketType packet_type, word16 packet_id, MqttPendResp **retResp) |
486 | | { |
487 | | int rc = 0; |
488 | | MqttPendResp *tmpResp; |
489 | | |
490 | | if (client == NULL) |
491 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
492 | | |
493 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
494 | | #ifdef WOLFMQTT_NONBLOCK |
495 | | if (client->lastRc != MQTT_CODE_CONTINUE) |
496 | | #endif |
497 | | { |
498 | | PRINTF("PendResp Find: Type %s (%d), ID %d", |
499 | | MqttPacket_TypeDesc(packet_type), packet_type, packet_id); |
500 | | } |
501 | | #endif |
502 | | |
503 | | if (retResp) |
504 | | *retResp = NULL; /* clear */ |
505 | | |
506 | | /* Find pending response entry */ |
507 | | for (tmpResp = client->firstPendResp; |
508 | | tmpResp != NULL; |
509 | | tmpResp = tmpResp->next) |
510 | | { |
511 | | if (packet_type == tmpResp->packet_type && |
512 | | (packet_id == tmpResp->packet_id)) |
513 | | { |
514 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
515 | | #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_DEBUG_CLIENT) |
516 | | if (client->lastRc != MQTT_CODE_CONTINUE) |
517 | | #endif |
518 | | { |
519 | | PRINTF("PendResp Found: %p, Type %s (%d), ID %d, InProc %d, Done %d", |
520 | | tmpResp, MqttPacket_TypeDesc(tmpResp->packet_type), |
521 | | tmpResp->packet_type, tmpResp->packet_id, |
522 | | tmpResp->packetProcessing, tmpResp->packetDone); |
523 | | } |
524 | | #endif |
525 | | |
526 | | if (retResp) |
527 | | *retResp = tmpResp; |
528 | | rc = 1; |
529 | | break; |
530 | | } |
531 | | } |
532 | | return rc; |
533 | | } |
534 | | #endif /* WOLFMQTT_MULTITHREAD */ |
535 | | |
536 | | #ifdef WOLFMQTT_V5 |
537 | | static int Handle_Props(MqttClient* client, MqttProp* props, byte use_cb, |
538 | | byte free_props) |
539 | 12.3k | { |
540 | 12.3k | int rc = MQTT_CODE_SUCCESS; |
541 | | |
542 | | /* If no properties, just return */ |
543 | 12.3k | if (props != NULL) { |
544 | 151 | #ifdef WOLFMQTT_PROPERTY_CB |
545 | | /* Check for properties set by the server */ |
546 | 151 | if ((use_cb == 1) && (client->property_cb != NULL)) { |
547 | | /* capture error if returned */ |
548 | 0 | int rc_err = client->property_cb(client, props, |
549 | 0 | client->property_ctx); |
550 | 0 | if (rc_err < 0) { |
551 | 0 | rc = rc_err; |
552 | 0 | } |
553 | 0 | } |
554 | | #else |
555 | | (void)client; |
556 | | (void)use_cb; |
557 | | #endif |
558 | 151 | if (free_props) { |
559 | | /* Free the properties */ |
560 | 151 | MqttProps_Free(props); |
561 | 151 | } |
562 | 151 | } |
563 | 12.3k | return rc; |
564 | 12.3k | } |
565 | | #endif |
566 | | |
567 | | |
568 | | /* Returns length decoded or error (as negative) */ |
569 | | /*! \brief Take a received MQTT packet and try and decode it |
570 | | * \param client MQTT client context |
571 | | * \param rx_buf Incoming buffer data |
572 | | * \param rx_len Incoming buffer length |
573 | | * \param p_decode Opaque pointer to packet structure based on type |
574 | | * \param ppacket_type Decoded packet type |
575 | | * \param ppacket_qos Decoded QoS level |
576 | | * \param ppacket_id Decoded packet id |
577 | | * \param doProps True: Call Handle_Props to free prop struct |
578 | | |
579 | | * \return Returns length decoded or error (as negative) MQTT_CODE_ERROR_* |
580 | | (see enum MqttPacketResponseCodes) |
581 | | */ |
582 | | static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf, |
583 | | word32 rx_len, void *packet_obj, MqttPacketType* ppacket_type, |
584 | | MqttQoS* ppacket_qos, word16* ppacket_id, int doProps) |
585 | 15.7k | { |
586 | 15.7k | int rc = MQTT_CODE_SUCCESS; |
587 | 15.7k | MqttPacket* header; |
588 | 15.7k | MqttPacketType packet_type; |
589 | 15.7k | MqttQoS packet_qos; |
590 | 15.7k | word16 packet_id = 0; |
591 | | |
592 | | /* must have rx buffer with at least 2 byes for header */ |
593 | 15.7k | if (rx_buf == NULL || rx_len < MQTT_PACKET_HEADER_MIN_SIZE) { |
594 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
595 | 0 | } |
596 | | |
597 | | /* Decode header */ |
598 | 15.7k | header = (MqttPacket*)rx_buf; |
599 | 15.7k | packet_type = (MqttPacketType)MQTT_PACKET_TYPE_GET(header->type_flags); |
600 | 15.7k | if (ppacket_type) { |
601 | 15.7k | *ppacket_type = packet_type; |
602 | 15.7k | } |
603 | 15.7k | packet_qos = (MqttQoS)MQTT_PACKET_FLAGS_GET_QOS(header->type_flags); |
604 | 15.7k | if (ppacket_qos) { |
605 | 15.7k | *ppacket_qos = packet_qos; |
606 | 15.7k | } |
607 | | |
608 | | /* Decode packet specific data (if requested) */ |
609 | 15.7k | if (ppacket_id || packet_obj) { |
610 | 15.7k | switch (packet_type) { |
611 | 1.69k | case MQTT_PACKET_TYPE_CONNECT_ACK: |
612 | 1.69k | { |
613 | 1.69k | MqttConnectAck connect_ack, *p_connect_ack = &connect_ack; |
614 | 1.69k | if (packet_obj) { |
615 | 904 | p_connect_ack = (MqttConnectAck*)packet_obj; |
616 | 904 | } |
617 | 792 | else { |
618 | 792 | XMEMSET(p_connect_ack, 0, sizeof(MqttConnectAck)); |
619 | 792 | } |
620 | 1.69k | #ifdef WOLFMQTT_V5 |
621 | 1.69k | p_connect_ack->protocol_level = client->protocol_level; |
622 | 1.69k | #endif |
623 | 1.69k | rc = MqttDecode_ConnectAck(rx_buf, rx_len, p_connect_ack); |
624 | 1.69k | #ifdef WOLFMQTT_V5 |
625 | 1.69k | if (rc >= 0 && doProps) { |
626 | 1.62k | int tmp = Handle_Props(client, p_connect_ack->props, |
627 | 1.62k | (packet_obj != NULL), 1); |
628 | 1.62k | p_connect_ack->props = NULL; |
629 | 1.62k | if (tmp != MQTT_CODE_SUCCESS) { |
630 | 0 | rc = tmp; |
631 | 0 | } |
632 | 1.62k | } |
633 | 1.69k | #endif |
634 | 1.69k | break; |
635 | 0 | } |
636 | 3.02k | case MQTT_PACKET_TYPE_PUBLISH: |
637 | 3.02k | { |
638 | 3.02k | MqttPublish publish, *p_publish; |
639 | 3.02k | if (packet_obj) { |
640 | 1.66k | p_publish = (MqttPublish*)packet_obj; |
641 | 1.66k | #ifdef WOLFMQTT_V5 |
642 | | /* setting the protocol level will enable parsing of the |
643 | | * properties. The properties are allocated from a list, |
644 | | * so only parse if we are using a return packet object */ |
645 | 1.66k | p_publish->protocol_level = client->protocol_level; |
646 | 1.66k | #endif |
647 | 1.66k | } |
648 | 1.35k | else { |
649 | 1.35k | p_publish = &publish; |
650 | 1.35k | XMEMSET(p_publish, 0, sizeof(MqttPublish)); |
651 | 1.35k | } |
652 | 3.02k | rc = MqttDecode_Publish(rx_buf, rx_len, p_publish); |
653 | 3.02k | if (rc >= 0) { |
654 | 2.83k | packet_id = p_publish->packet_id; |
655 | 2.83k | #ifdef WOLFMQTT_V5 |
656 | 2.83k | if (doProps) { |
657 | | /* Do not free property list here. It will be freed |
658 | | after the message callback. */ |
659 | 2.83k | int tmp = Handle_Props(client, p_publish->props, |
660 | 2.83k | (packet_obj != NULL), 0); |
661 | 2.83k | if (tmp != MQTT_CODE_SUCCESS) { |
662 | 0 | rc = tmp; |
663 | 0 | } |
664 | 2.83k | } |
665 | 2.83k | #endif |
666 | 2.83k | } |
667 | 3.02k | break; |
668 | 0 | } |
669 | 752 | case MQTT_PACKET_TYPE_PUBLISH_ACK: |
670 | 1.44k | case MQTT_PACKET_TYPE_PUBLISH_REC: |
671 | 2.36k | case MQTT_PACKET_TYPE_PUBLISH_REL: |
672 | 3.13k | case MQTT_PACKET_TYPE_PUBLISH_COMP: |
673 | 3.13k | { |
674 | 3.13k | MqttPublishResp publish_resp, *p_publish_resp = &publish_resp; |
675 | 3.13k | if (packet_obj) { |
676 | 1.63k | p_publish_resp = (MqttPublishResp*)packet_obj; |
677 | 1.63k | } |
678 | 1.49k | else { |
679 | 1.49k | XMEMSET(p_publish_resp, 0, sizeof(MqttPublishResp)); |
680 | 1.49k | } |
681 | | |
682 | 3.13k | #ifdef WOLFMQTT_V5 |
683 | 3.13k | p_publish_resp->protocol_level = client->protocol_level; |
684 | 3.13k | #endif |
685 | 3.13k | rc = MqttDecode_PublishResp(rx_buf, rx_len, packet_type, |
686 | 3.13k | p_publish_resp); |
687 | 3.13k | if (rc >= 0) { |
688 | 2.93k | packet_id = p_publish_resp->packet_id; |
689 | 2.93k | #ifdef WOLFMQTT_V5 |
690 | 2.93k | if (doProps) { |
691 | 2.93k | int tmp = Handle_Props(client, p_publish_resp->props, |
692 | 2.93k | (packet_obj != NULL), 1); |
693 | 2.93k | p_publish_resp->props = NULL; |
694 | 2.93k | if (tmp != MQTT_CODE_SUCCESS) { |
695 | 0 | rc = tmp; |
696 | 0 | } |
697 | 2.93k | } |
698 | 2.93k | #endif |
699 | 2.93k | } |
700 | 3.13k | break; |
701 | 2.36k | } |
702 | 1.95k | case MQTT_PACKET_TYPE_SUBSCRIBE_ACK: |
703 | 1.95k | { |
704 | 1.95k | MqttSubscribeAck subscribe_ack, *p_subscribe_ack = &subscribe_ack; |
705 | 1.95k | if (packet_obj) { |
706 | 1.03k | p_subscribe_ack = (MqttSubscribeAck*)packet_obj; |
707 | 1.03k | } |
708 | 916 | else { |
709 | 916 | XMEMSET(p_subscribe_ack, 0, sizeof(MqttSubscribeAck)); |
710 | 916 | } |
711 | 1.95k | #ifdef WOLFMQTT_V5 |
712 | 1.95k | p_subscribe_ack->protocol_level = client->protocol_level; |
713 | 1.95k | #endif |
714 | 1.95k | rc = MqttDecode_SubscribeAck(rx_buf, rx_len, p_subscribe_ack); |
715 | 1.95k | if (rc >= 0) { |
716 | 1.80k | packet_id = p_subscribe_ack->packet_id; |
717 | 1.80k | #ifdef WOLFMQTT_V5 |
718 | 1.80k | if (doProps) { |
719 | 1.80k | int tmp = Handle_Props(client, p_subscribe_ack->props, |
720 | 1.80k | (packet_obj != NULL), 1); |
721 | 1.80k | p_subscribe_ack->props = NULL; |
722 | 1.80k | if (tmp != MQTT_CODE_SUCCESS) { |
723 | 0 | rc = tmp; |
724 | 0 | } |
725 | 1.80k | } |
726 | 1.80k | #endif |
727 | 1.80k | } |
728 | 1.95k | break; |
729 | 2.36k | } |
730 | 1.34k | case MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK: |
731 | 1.34k | { |
732 | 1.34k | MqttUnsubscribeAck unsubscribe_ack, |
733 | 1.34k | *p_unsubscribe_ack = &unsubscribe_ack; |
734 | 1.34k | if (packet_obj) { |
735 | 724 | p_unsubscribe_ack = (MqttUnsubscribeAck*)packet_obj; |
736 | 724 | } |
737 | 625 | else { |
738 | 625 | XMEMSET(p_unsubscribe_ack, 0, sizeof(MqttUnsubscribeAck)); |
739 | 625 | } |
740 | 1.34k | #ifdef WOLFMQTT_V5 |
741 | 1.34k | p_unsubscribe_ack->protocol_level = client->protocol_level; |
742 | 1.34k | #endif |
743 | 1.34k | rc = MqttDecode_UnsubscribeAck(rx_buf, rx_len, p_unsubscribe_ack); |
744 | 1.34k | if (rc >= 0) { |
745 | 1.16k | packet_id = p_unsubscribe_ack->packet_id; |
746 | 1.16k | #ifdef WOLFMQTT_V5 |
747 | 1.16k | if (doProps) { |
748 | 1.16k | int tmp = Handle_Props(client, p_unsubscribe_ack->props, |
749 | 1.16k | (packet_obj != NULL), 1); |
750 | 1.16k | p_unsubscribe_ack->props = NULL; |
751 | 1.16k | if (tmp != MQTT_CODE_SUCCESS) { |
752 | 0 | rc = tmp; |
753 | 0 | } |
754 | 1.16k | } |
755 | 1.16k | #endif |
756 | 1.16k | } |
757 | 1.34k | break; |
758 | 2.36k | } |
759 | 1.35k | case MQTT_PACKET_TYPE_PING_RESP: |
760 | 1.35k | { |
761 | 1.35k | MqttPing ping, *p_ping = &ping; |
762 | 1.35k | if (packet_obj) { |
763 | 714 | p_ping = (MqttPing*)packet_obj; |
764 | 714 | } |
765 | 642 | else { |
766 | 642 | XMEMSET(p_ping, 0, sizeof(MqttPing)); |
767 | 642 | } |
768 | 1.35k | rc = MqttDecode_Ping(rx_buf, rx_len, p_ping); |
769 | 1.35k | break; |
770 | 2.36k | } |
771 | 765 | case MQTT_PACKET_TYPE_AUTH: |
772 | 765 | { |
773 | 765 | #ifdef WOLFMQTT_V5 |
774 | 765 | MqttAuth auth, *p_auth = &auth; |
775 | 765 | if (packet_obj) { |
776 | 403 | p_auth = (MqttAuth*)packet_obj; |
777 | 403 | } |
778 | 362 | else { |
779 | 362 | XMEMSET(p_auth, 0, sizeof(MqttAuth)); |
780 | 362 | } |
781 | 765 | rc = MqttDecode_Auth(rx_buf, rx_len, p_auth); |
782 | 765 | if (rc >= 0 && doProps) { |
783 | 88 | int tmp = Handle_Props(client, p_auth->props, |
784 | 88 | (packet_obj != NULL), 1); |
785 | 88 | p_auth->props = NULL; |
786 | 88 | if (tmp != MQTT_CODE_SUCCESS) { |
787 | 0 | rc = tmp; |
788 | 0 | } |
789 | 88 | } |
790 | | #else |
791 | | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE); |
792 | | #endif /* WOLFMQTT_V5 */ |
793 | 765 | break; |
794 | 2.36k | } |
795 | 2.34k | case MQTT_PACKET_TYPE_DISCONNECT: |
796 | 2.34k | { |
797 | 2.34k | #ifdef WOLFMQTT_V5 |
798 | 2.34k | MqttDisconnect disc, *p_disc = &disc; |
799 | 2.34k | if (packet_obj) { |
800 | 1.24k | p_disc = (MqttDisconnect*)packet_obj; |
801 | 1.24k | } |
802 | 1.09k | else { |
803 | 1.09k | XMEMSET(p_disc, 0, sizeof(MqttDisconnect)); |
804 | 1.09k | } |
805 | 2.34k | rc = MqttDecode_Disconnect(rx_buf, rx_len, p_disc); |
806 | 2.34k | if (rc >= 0 && doProps) { |
807 | 1.86k | int tmp = Handle_Props(client, p_disc->props, |
808 | 1.86k | (packet_obj != NULL), 1); |
809 | 1.86k | p_disc->props = NULL; |
810 | 1.86k | if (tmp != MQTT_CODE_SUCCESS) { |
811 | 0 | rc = tmp; |
812 | 0 | } |
813 | 1.86k | } |
814 | 2.34k | #ifdef WOLFMQTT_DISCONNECT_CB |
815 | | /* Call disconnect callback with reason code */ |
816 | 2.34k | if ((packet_obj != NULL) && client->disconnect_cb) { |
817 | 0 | client->disconnect_cb(client, p_disc->reason_code, |
818 | 0 | client->disconnect_ctx); |
819 | 0 | } |
820 | 2.34k | #endif |
821 | | #else |
822 | | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE); |
823 | | #endif /* WOLFMQTT_V5 */ |
824 | 2.34k | break; |
825 | 2.36k | } |
826 | 2 | case MQTT_PACKET_TYPE_CONNECT: |
827 | 2 | case MQTT_PACKET_TYPE_SUBSCRIBE: |
828 | 21 | case MQTT_PACKET_TYPE_UNSUBSCRIBE: |
829 | 21 | case MQTT_PACKET_TYPE_PING_REQ: |
830 | 21 | case MQTT_PACKET_TYPE_ANY: |
831 | 99 | case MQTT_PACKET_TYPE_RESERVED: |
832 | 99 | default: |
833 | | /* these type are only encoded by client */ |
834 | 99 | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE); |
835 | 99 | break; |
836 | 15.7k | } /* switch (packet_type) */ |
837 | 15.7k | } |
838 | | |
839 | 15.7k | if (ppacket_id) { |
840 | 15.7k | *ppacket_id = packet_id; |
841 | 15.7k | } |
842 | | |
843 | 15.7k | (void)client; |
844 | 15.7k | (void)doProps; |
845 | | |
846 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
847 | | PRINTF("MqttClient_DecodePacket: Rc %d, Len %d, Type %s (%d), ID %d," |
848 | | " QoS %d, doProps %d", |
849 | | rc, rx_len, MqttPacket_TypeDesc(packet_type), packet_type, packet_id, |
850 | | packet_qos, doProps); |
851 | | #endif |
852 | | |
853 | 15.7k | return rc; |
854 | 15.7k | } |
855 | | |
856 | | static int MqttClient_HandlePacket(MqttClient* client, |
857 | | MqttPacketType packet_type, void *packet_obj, MqttPublishResp* resp, |
858 | | int timeout_ms) |
859 | 8.49k | { |
860 | 8.49k | int rc = MQTT_CODE_SUCCESS; |
861 | 8.49k | MqttQoS packet_qos = MQTT_QOS_0; |
862 | 8.49k | word16 packet_id = 0; |
863 | | |
864 | 8.49k | if (client == NULL || packet_obj == NULL) { |
865 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
866 | 0 | } |
867 | | |
868 | | /* make sure the response defaults to no ACK */ |
869 | 8.49k | resp->packet_type = MQTT_PACKET_TYPE_RESERVED; |
870 | | |
871 | 8.49k | switch (packet_type) |
872 | 8.49k | { |
873 | 792 | case MQTT_PACKET_TYPE_CONNECT_ACK: |
874 | 792 | { |
875 | 792 | rc = MqttClient_DecodePacket(client, client->rx_buf, |
876 | 792 | client->packet.buf_len, packet_obj, &packet_type, &packet_qos, |
877 | 792 | &packet_id, 1); |
878 | 792 | break; |
879 | 0 | } |
880 | 3.19k | case MQTT_PACKET_TYPE_PUBLISH: |
881 | 3.19k | { |
882 | 3.19k | MqttPublish* publish = (MqttPublish*)packet_obj; |
883 | 3.19k | if (publish->stat.read != MQTT_MSG_PAYLOAD2) { |
884 | 3.06k | rc = MqttClient_DecodePacket(client, client->rx_buf, |
885 | 3.06k | client->packet.buf_len, packet_obj, &packet_type, |
886 | 3.06k | &packet_qos, &packet_id, 1); |
887 | 3.06k | if (rc <= 0) { |
888 | 1.34k | return rc; |
889 | 1.34k | } |
890 | 3.06k | } |
891 | 135 | else { |
892 | | /* packet ID and QoS were already established */ |
893 | 135 | packet_id = publish->packet_id; |
894 | 135 | packet_qos = publish->qos; |
895 | 135 | } |
896 | | |
897 | 1.85k | rc = MqttClient_Publish_ReadPayload(client, publish, timeout_ms); |
898 | 1.85k | if (rc < 0) { |
899 | 297 | break; |
900 | 297 | } |
901 | | /* Note: Getting here means the Publish Read is done */ |
902 | 1.55k | publish->stat.read = MQTT_MSG_BEGIN; /* reset state */ |
903 | | |
904 | 1.55k | #ifdef WOLFMQTT_V5 |
905 | | /* Free the properties */ |
906 | 1.55k | MqttProps_Free(publish->props); |
907 | 1.55k | publish->props = NULL; |
908 | 1.55k | #endif |
909 | | |
910 | | /* Handle QoS */ |
911 | 1.55k | if (packet_qos == MQTT_QOS_0) { |
912 | | /* we are done, no QoS response */ |
913 | 450 | break; |
914 | 450 | } |
915 | | |
916 | 1.10k | #ifdef WOLFMQTT_V5 |
917 | | /* Copy response code in case changed by callback */ |
918 | 1.10k | resp->reason_code = publish->resp.reason_code; |
919 | 1.10k | #endif |
920 | | /* Populate information needed for ack */ |
921 | 1.10k | resp->packet_type = (packet_qos == MQTT_QOS_1) ? |
922 | 358 | MQTT_PACKET_TYPE_PUBLISH_ACK : |
923 | 1.10k | MQTT_PACKET_TYPE_PUBLISH_REC; |
924 | 1.10k | resp->packet_id = packet_id; |
925 | 1.10k | break; |
926 | 1.55k | } |
927 | 361 | case MQTT_PACKET_TYPE_PUBLISH_ACK: |
928 | 685 | case MQTT_PACKET_TYPE_PUBLISH_REC: |
929 | 1.11k | case MQTT_PACKET_TYPE_PUBLISH_REL: |
930 | 1.45k | case MQTT_PACKET_TYPE_PUBLISH_COMP: |
931 | 1.45k | { |
932 | | #if defined(WOLFMQTT_V5) && defined(WOLFMQTT_DEBUG_CLIENT) |
933 | | MqttPublishResp* publish_resp = (MqttPublishResp*)packet_obj; |
934 | | #endif |
935 | 1.45k | rc = MqttClient_DecodePacket(client, client->rx_buf, |
936 | 1.45k | client->packet.buf_len, packet_obj, &packet_type, |
937 | 1.45k | &packet_qos, &packet_id, 1); |
938 | 1.45k | if (rc <= 0) { |
939 | 0 | return rc; |
940 | 0 | } |
941 | | |
942 | | #if defined(WOLFMQTT_V5) && defined(WOLFMQTT_DEBUG_CLIENT) |
943 | | PRINTF("\tPublish response: reason code %d, Type %s (%d)," |
944 | | " ID %d, QoS %d", |
945 | | publish_resp->reason_code, |
946 | | MqttPacket_TypeDesc(packet_type), |
947 | | packet_type, packet_id, packet_qos); |
948 | | #endif |
949 | | |
950 | | /* Only ACK publish Received or Release QoS levels */ |
951 | 1.45k | if (packet_type != MQTT_PACKET_TYPE_PUBLISH_REC && |
952 | 1.45k | packet_type != MQTT_PACKET_TYPE_PUBLISH_REL) { |
953 | 697 | break; |
954 | 697 | } |
955 | | |
956 | | /* Populate information needed for ack */ |
957 | 756 | resp->packet_type = packet_type+1; /* next ack */ |
958 | 756 | resp->packet_id = packet_id; |
959 | 756 | break; |
960 | 1.45k | } |
961 | 885 | case MQTT_PACKET_TYPE_SUBSCRIBE_ACK: |
962 | 885 | { |
963 | 885 | rc = MqttClient_DecodePacket(client, client->rx_buf, |
964 | 885 | client->packet.buf_len, packet_obj, &packet_type, &packet_qos, |
965 | 885 | &packet_id, 1); |
966 | 885 | break; |
967 | 1.45k | } |
968 | 573 | case MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK: |
969 | 573 | { |
970 | 573 | rc = MqttClient_DecodePacket(client, client->rx_buf, |
971 | 573 | client->packet.buf_len, packet_obj, &packet_type, &packet_qos, |
972 | 573 | &packet_id, 1); |
973 | 573 | break; |
974 | 1.45k | } |
975 | 642 | case MQTT_PACKET_TYPE_PING_RESP: |
976 | 642 | { |
977 | 642 | rc = MqttClient_DecodePacket(client, client->rx_buf, |
978 | 642 | client->packet.buf_len, packet_obj, &packet_type, &packet_qos, |
979 | 642 | &packet_id, 1); |
980 | 642 | break; |
981 | 1.45k | } |
982 | 39 | case MQTT_PACKET_TYPE_AUTH: |
983 | 39 | { |
984 | 39 | #ifdef WOLFMQTT_V5 |
985 | 39 | rc = MqttClient_DecodePacket(client, client->rx_buf, |
986 | 39 | client->packet.buf_len, packet_obj, &packet_type, &packet_qos, |
987 | 39 | &packet_id, 1); |
988 | | #else |
989 | | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE); |
990 | | #endif |
991 | 39 | break; |
992 | 1.45k | } |
993 | | |
994 | 914 | case MQTT_PACKET_TYPE_DISCONNECT: |
995 | 914 | { |
996 | 914 | #ifdef WOLFMQTT_V5 |
997 | 914 | rc = MqttClient_DecodePacket(client, client->rx_buf, |
998 | 914 | client->packet.buf_len, packet_obj, &packet_type, &packet_qos, |
999 | 914 | &packet_id, 1); |
1000 | | #else |
1001 | | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE); |
1002 | | #endif |
1003 | 914 | break; |
1004 | 1.45k | } |
1005 | 0 | case MQTT_PACKET_TYPE_CONNECT: |
1006 | 0 | case MQTT_PACKET_TYPE_SUBSCRIBE: |
1007 | 0 | case MQTT_PACKET_TYPE_UNSUBSCRIBE: |
1008 | 0 | case MQTT_PACKET_TYPE_PING_REQ: |
1009 | 0 | case MQTT_PACKET_TYPE_ANY: |
1010 | 0 | case MQTT_PACKET_TYPE_RESERVED: |
1011 | 0 | default: |
1012 | | /* these types are only sent from client and should not be sent |
1013 | | * by broker */ |
1014 | 0 | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE); |
1015 | 0 | break; |
1016 | 8.49k | } /* switch (packet_type) */ |
1017 | | |
1018 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1019 | | if (rc < 0) { |
1020 | | PRINTF("MqttClient_HandlePacket: Rc %d, Type %s (%d), QoS %d, ID %d", |
1021 | | rc, MqttPacket_TypeDesc(packet_type), packet_type, packet_qos, |
1022 | | packet_id); |
1023 | | } |
1024 | | #endif |
1025 | | |
1026 | 7.14k | return rc; |
1027 | 8.49k | } |
1028 | | |
1029 | | static inline int MqttIsPubRespPacket(int packet_type) |
1030 | 13.3k | { |
1031 | 13.3k | return (packet_type == MQTT_PACKET_TYPE_PUBLISH_ACK /* Acknowledgment */ || |
1032 | 13.3k | packet_type == MQTT_PACKET_TYPE_PUBLISH_REC /* Received */ || |
1033 | 13.3k | packet_type == MQTT_PACKET_TYPE_PUBLISH_REL /* Release */ || |
1034 | 13.3k | packet_type == MQTT_PACKET_TYPE_PUBLISH_COMP /* Complete */); |
1035 | 13.3k | } |
1036 | | |
1037 | | #ifdef WOLFMQTT_MULTITHREAD |
1038 | | /* this function will return: |
1039 | | * MQTT_CODE_CONTINUE indicating found, but not marked done |
1040 | | * MQTT_CODE_ERROR_NOT_FOUND: Not found |
1041 | | * Any other response is from the the packet_ret |
1042 | | */ |
1043 | | static int MqttClient_CheckPendResp(MqttClient *client, byte wait_type, |
1044 | | word16 wait_packet_id) |
1045 | | { |
1046 | | int rc; |
1047 | | MqttPendResp *pendResp = NULL; |
1048 | | |
1049 | | /* Check to see if packet type and id have already completed */ |
1050 | | rc = wm_SemLock(&client->lockClient); |
1051 | | if (rc == 0) { |
1052 | | if (MqttClient_RespList_Find(client, (MqttPacketType)wait_type, |
1053 | | wait_packet_id, &pendResp)) |
1054 | | { |
1055 | | if ((pendResp != NULL) && (pendResp->packetDone)) { |
1056 | | /* pending response is already done, so return */ |
1057 | | rc = pendResp->packet_ret; |
1058 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1059 | | PRINTF("PendResp Check Done %p: Rc %d", pendResp, rc); |
1060 | | #endif |
1061 | | MqttClient_RespList_Remove(client, pendResp); |
1062 | | } |
1063 | | else { |
1064 | | /* item not done */ |
1065 | | rc = MQTT_CODE_CONTINUE; |
1066 | | } |
1067 | | } |
1068 | | else { |
1069 | | /* item not found */ |
1070 | | rc = MQTT_CODE_ERROR_NOT_FOUND; |
1071 | | } |
1072 | | wm_SemUnlock(&client->lockClient); |
1073 | | } |
1074 | | return rc; |
1075 | | } |
1076 | | #endif /* WOLFMQTT_MULTITHREAD */ |
1077 | | |
1078 | | /* Helper for clearing the contents of an object buffer based on packet type */ |
1079 | | static void MqttClient_PacketReset(MqttPacketType packet_type, void* packet_obj) |
1080 | 6.59k | { |
1081 | 6.59k | size_t objSz = 0; |
1082 | 6.59k | size_t offset = sizeof(MqttMsgStat); |
1083 | | #ifdef WOLFMQTT_MULTITHREAD |
1084 | | offset += sizeof(MqttPendResp); |
1085 | | #endif |
1086 | 6.59k | switch (packet_type) { |
1087 | 0 | case MQTT_PACKET_TYPE_CONNECT: |
1088 | 0 | objSz = sizeof(MqttConnect); |
1089 | 0 | break; |
1090 | 792 | case MQTT_PACKET_TYPE_CONNECT_ACK: |
1091 | 792 | objSz = sizeof(MqttConnectAck); |
1092 | 792 | break; |
1093 | 1.30k | case MQTT_PACKET_TYPE_PUBLISH: |
1094 | 1.30k | objSz = sizeof(MqttPublish); |
1095 | 1.30k | break; |
1096 | 361 | case MQTT_PACKET_TYPE_PUBLISH_ACK: |
1097 | 685 | case MQTT_PACKET_TYPE_PUBLISH_REC: |
1098 | 1.11k | case MQTT_PACKET_TYPE_PUBLISH_REL: |
1099 | 1.45k | case MQTT_PACKET_TYPE_PUBLISH_COMP: |
1100 | 1.45k | objSz = sizeof(MqttPublishResp); |
1101 | 1.45k | break; |
1102 | 0 | case MQTT_PACKET_TYPE_SUBSCRIBE: |
1103 | 0 | objSz = sizeof(MqttSubscribe); |
1104 | 0 | break; |
1105 | 885 | case MQTT_PACKET_TYPE_SUBSCRIBE_ACK: |
1106 | 885 | objSz = sizeof(MqttSubscribeAck); |
1107 | 885 | break; |
1108 | 0 | case MQTT_PACKET_TYPE_UNSUBSCRIBE: |
1109 | 0 | objSz = sizeof(MqttUnsubscribe); |
1110 | 0 | break; |
1111 | 573 | case MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK: |
1112 | 573 | objSz = sizeof(MqttUnsubscribeAck); |
1113 | 573 | break; |
1114 | 0 | case MQTT_PACKET_TYPE_PING_REQ: |
1115 | 642 | case MQTT_PACKET_TYPE_PING_RESP: |
1116 | 642 | objSz = sizeof(MqttPing); |
1117 | 642 | break; |
1118 | 39 | case MQTT_PACKET_TYPE_AUTH: |
1119 | 39 | #ifdef WOLFMQTT_V5 |
1120 | 39 | objSz = sizeof(MqttAuth); |
1121 | 39 | #endif |
1122 | 39 | break; |
1123 | 914 | case MQTT_PACKET_TYPE_DISCONNECT: |
1124 | 914 | #ifdef WOLFMQTT_V5 |
1125 | 914 | objSz = sizeof(MqttDisconnect); |
1126 | 914 | #endif |
1127 | 914 | break; |
1128 | 0 | case MQTT_PACKET_TYPE_ANY: |
1129 | 0 | case MQTT_PACKET_TYPE_RESERVED: |
1130 | 0 | default: |
1131 | 0 | break; |
1132 | 6.59k | } /* switch (packet_type) */ |
1133 | 6.59k | if (objSz > offset) { |
1134 | 5.95k | XMEMSET((byte*)packet_obj + offset, 0, objSz - offset); |
1135 | 5.95k | } |
1136 | 6.59k | } |
1137 | | |
1138 | | static int MqttClient_WaitType(MqttClient *client, void *packet_obj, |
1139 | | byte wait_type, word16 wait_packet_id, int timeout_ms) |
1140 | 5.42k | { |
1141 | 5.42k | int rc = MQTT_CODE_SUCCESS; |
1142 | 5.42k | word16 packet_id; |
1143 | 5.42k | MqttPacketType packet_type; |
1144 | 5.42k | MqttQoS packet_qos = MQTT_QOS_0; |
1145 | | #ifdef WOLFMQTT_MULTITHREAD |
1146 | | MqttPendResp *pendResp; |
1147 | | #endif |
1148 | 5.42k | MqttMsgStat* mms_stat; |
1149 | 5.42k | int waitMatchFound; |
1150 | 5.42k | void* use_packet_obj = NULL; |
1151 | | |
1152 | 5.42k | if (client == NULL || packet_obj == NULL) { |
1153 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
1154 | 0 | } |
1155 | | |
1156 | | /* all packet type structures must have MqttMsgStat at top */ |
1157 | 5.42k | mms_stat = (MqttMsgStat*)packet_obj; |
1158 | | |
1159 | 11.4k | wait_again: |
1160 | | |
1161 | | /* initialize variables */ |
1162 | 11.4k | packet_id = 0; |
1163 | 11.4k | packet_type = MQTT_PACKET_TYPE_RESERVED; |
1164 | | #ifdef WOLFMQTT_MULTITHREAD |
1165 | | pendResp = NULL; |
1166 | | #endif |
1167 | 11.4k | waitMatchFound = 0; |
1168 | | |
1169 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1170 | | #ifdef WOLFMQTT_NONBLOCK |
1171 | | if (client->lastRc != MQTT_CODE_CONTINUE) |
1172 | | #endif |
1173 | | { |
1174 | | PRINTF("MqttClient_WaitType: Type %s (%d), ID %d, State %d-%d", |
1175 | | MqttPacket_TypeDesc((MqttPacketType)wait_type), |
1176 | | wait_type, wait_packet_id, mms_stat->read, mms_stat->write); |
1177 | | } |
1178 | | #endif |
1179 | | |
1180 | 11.4k | switch (mms_stat->read) |
1181 | 11.4k | { |
1182 | 9.08k | case MQTT_MSG_BEGIN: |
1183 | 9.08k | { |
1184 | | #ifdef WOLFMQTT_MULTITHREAD |
1185 | | /* Check to see if packet type and id have already completed */ |
1186 | | rc = MqttClient_CheckPendResp(client, wait_type, wait_packet_id); |
1187 | | if (rc != MQTT_CODE_ERROR_NOT_FOUND && rc != MQTT_CODE_CONTINUE) { |
1188 | | return rc; |
1189 | | } |
1190 | | #endif |
1191 | | |
1192 | 9.08k | if ((rc = MqttReadStart(client, mms_stat)) != 0) { |
1193 | 0 | return rc; |
1194 | 0 | } |
1195 | | |
1196 | 9.08k | mms_stat->read = MQTT_MSG_WAIT; |
1197 | 9.08k | } |
1198 | 9.08k | FALL_THROUGH; |
1199 | | |
1200 | 9.08k | case MQTT_MSG_WAIT: |
1201 | 9.47k | case MQTT_MSG_HEADER: |
1202 | 9.47k | { |
1203 | | /* Wait for packet */ |
1204 | 9.47k | rc = MqttPacket_Read(client, client->rx_buf, client->rx_buf_len, |
1205 | 9.47k | timeout_ms); |
1206 | | /* handle failure */ |
1207 | 9.47k | if (rc <= 0) { |
1208 | | #ifdef WOLFMQTT_NONBLOCK |
1209 | | if (rc == MQTT_CODE_CONTINUE && |
1210 | | (client->packet.stat > MQTT_PK_BEGIN || |
1211 | | client->read.total > 0) |
1212 | | ) { |
1213 | | /* advance state, since we received some data */ |
1214 | | mms_stat->read = MQTT_MSG_HEADER; |
1215 | | } |
1216 | | #endif |
1217 | 2.12k | break; |
1218 | 2.12k | } |
1219 | | |
1220 | | /* advance state, since we received some data */ |
1221 | 7.35k | mms_stat->read = MQTT_MSG_HEADER; |
1222 | | |
1223 | | /* capture length read */ |
1224 | 7.35k | client->packet.buf_len = rc; |
1225 | | |
1226 | | /* Decode Packet - get type, qos and id */ |
1227 | 7.35k | rc = MqttClient_DecodePacket(client, client->rx_buf, |
1228 | 7.35k | client->packet.buf_len, NULL, &packet_type, &packet_qos, |
1229 | 7.35k | &packet_id, 1); |
1230 | 7.35k | if (rc < 0) { |
1231 | 758 | break; |
1232 | 758 | } |
1233 | | |
1234 | 6.59k | MqttClient_PacketReset(packet_type, &client->msg); |
1235 | | |
1236 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1237 | | PRINTF("Read Packet: Len %d, Type %d, ID %d", |
1238 | | client->packet.buf_len, packet_type, packet_id); |
1239 | | #endif |
1240 | | |
1241 | | /* Ping response is special case, no payload */ |
1242 | 6.59k | if (packet_type != MQTT_PACKET_TYPE_PING_RESP) { |
1243 | 5.95k | mms_stat->read = MQTT_MSG_PAYLOAD; |
1244 | 5.95k | } |
1245 | 642 | else { |
1246 | 642 | mms_stat->read = MQTT_MSG_WAIT; |
1247 | 642 | } |
1248 | 6.59k | } |
1249 | 6.59k | FALL_THROUGH; |
1250 | | |
1251 | 8.35k | case MQTT_MSG_PAYLOAD: |
1252 | 8.49k | case MQTT_MSG_PAYLOAD2: |
1253 | 8.49k | { |
1254 | 8.49k | MqttPublishResp resp; |
1255 | 8.49k | MqttPacketType use_packet_type; |
1256 | | |
1257 | | /* Determine if we received data for this request */ |
1258 | 8.49k | if ((wait_type == MQTT_PACKET_TYPE_ANY || |
1259 | 8.49k | wait_type == packet_type || |
1260 | 8.49k | (MqttIsPubRespPacket(packet_type) && |
1261 | 5.22k | MqttIsPubRespPacket(wait_type))) && |
1262 | 8.49k | (wait_packet_id == 0 || wait_packet_id == packet_id)) |
1263 | 2.86k | { |
1264 | 2.86k | use_packet_obj = packet_obj; |
1265 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1266 | | PRINTF("Using INCOMING packet_obj %p", use_packet_obj); |
1267 | | #endif |
1268 | 2.86k | if (packet_type == wait_type || |
1269 | 2.86k | wait_type == MQTT_PACKET_TYPE_ANY) { |
1270 | | /* Only stop waiting when matched or waiting for "any" */ |
1271 | 2.86k | waitMatchFound = 1; |
1272 | 2.86k | } |
1273 | 2.86k | } |
1274 | 5.62k | else { |
1275 | | #ifdef WOLFMQTT_MULTITHREAD |
1276 | | rc = wm_SemLock(&client->lockClient); |
1277 | | if (rc != 0) { |
1278 | | break; /* error */ |
1279 | | } |
1280 | | #endif |
1281 | | |
1282 | | /* use generic packet object */ |
1283 | 5.62k | use_packet_obj = &client->msg; |
1284 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1285 | | PRINTF("Using SHARED packet_obj %p", use_packet_obj); |
1286 | | #endif |
1287 | | |
1288 | | #ifdef WOLFMQTT_MULTITHREAD |
1289 | | wm_SemUnlock(&client->lockClient); |
1290 | | #endif |
1291 | 5.62k | } |
1292 | 8.49k | use_packet_type = packet_type; |
1293 | | |
1294 | | #ifdef WOLFMQTT_MULTITHREAD |
1295 | | /* Check to see if we have a pending response for this packet */ |
1296 | | pendResp = NULL; |
1297 | | rc = wm_SemLock(&client->lockClient); |
1298 | | if (rc == 0) { |
1299 | | if (MqttClient_RespList_Find(client, packet_type, packet_id, |
1300 | | &pendResp)) { |
1301 | | /* we found packet match this incoming read packet */ |
1302 | | pendResp->packetProcessing = 1; |
1303 | | if (pendResp->packet_obj != packet_obj) { |
1304 | | use_packet_obj = pendResp->packet_obj; |
1305 | | use_packet_type = pendResp->packet_type; |
1306 | | /* req from another thread... not a match */ |
1307 | | waitMatchFound = 0; |
1308 | | } |
1309 | | } |
1310 | | wm_SemUnlock(&client->lockClient); |
1311 | | } |
1312 | | else { |
1313 | | break; /* error */ |
1314 | | } |
1315 | | #endif /* WOLFMQTT_MULTITHREAD */ |
1316 | | |
1317 | | /* for payload state packet type is always publish */ |
1318 | 8.49k | if (use_packet_type == MQTT_PACKET_TYPE_RESERVED && |
1319 | 8.49k | (mms_stat->read == MQTT_MSG_PAYLOAD || |
1320 | 1.89k | mms_stat->read == MQTT_MSG_PAYLOAD2)) |
1321 | 1.89k | { |
1322 | 1.89k | use_packet_type = MQTT_PACKET_TYPE_PUBLISH; |
1323 | 1.89k | } |
1324 | | /* cache publish packet id and qos for MqttClient_HandlePacket payload */ |
1325 | 8.49k | if (use_packet_type == MQTT_PACKET_TYPE_PUBLISH && |
1326 | 8.49k | mms_stat->read == MQTT_MSG_PAYLOAD && use_packet_obj != NULL) |
1327 | 3.06k | { |
1328 | 3.06k | MqttObject* obj = (MqttObject*)use_packet_obj; |
1329 | 3.06k | obj->publish.qos = packet_qos; |
1330 | 3.06k | obj->publish.packet_id = packet_id; |
1331 | 3.06k | } |
1332 | | |
1333 | | /* Perform packet handling for publish callback and QoS */ |
1334 | 8.49k | XMEMSET(&resp, 0, sizeof(resp)); |
1335 | 8.49k | rc = MqttClient_HandlePacket(client, use_packet_type, |
1336 | 8.49k | use_packet_obj, &resp, timeout_ms); |
1337 | | |
1338 | | /* if using the shared packet object, make sure the original |
1339 | | * state is correct for publish payload 2 (continued) */ |
1340 | 8.49k | if (use_packet_obj != NULL && use_packet_obj != mms_stat && |
1341 | 8.49k | ((MqttMsgStat*)use_packet_obj)->read == MQTT_MSG_PAYLOAD2) { |
1342 | 285 | mms_stat->read = MQTT_MSG_PAYLOAD2; |
1343 | 285 | } |
1344 | | |
1345 | | #ifdef WOLFMQTT_NONBLOCK |
1346 | | if (rc == MQTT_CODE_CONTINUE) { |
1347 | | break; |
1348 | | } |
1349 | | #endif |
1350 | | |
1351 | | /* handle success case */ |
1352 | 8.49k | if (rc >= 0) { |
1353 | 6.85k | rc = MQTT_CODE_SUCCESS; |
1354 | 6.85k | } |
1355 | 1.64k | else { |
1356 | | /* error, break */ |
1357 | 1.64k | break; |
1358 | 1.64k | } |
1359 | | |
1360 | | #ifdef WOLFMQTT_MULTITHREAD |
1361 | | if (pendResp) { |
1362 | | /* Mark pending response entry done */ |
1363 | | if (wm_SemLock(&client->lockClient) == 0) { |
1364 | | pendResp->packetDone = 1; |
1365 | | pendResp->packet_ret = rc; |
1366 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1367 | | PRINTF("PendResp Done %p", pendResp); |
1368 | | #endif |
1369 | | pendResp = NULL; |
1370 | | wm_SemUnlock(&client->lockClient); |
1371 | | } |
1372 | | } |
1373 | | #endif /* WOLFMQTT_MULTITHREAD */ |
1374 | | |
1375 | | /* Determine if we are sending ACK or done */ |
1376 | 6.85k | if (MqttIsPubRespPacket(resp.packet_type)) { |
1377 | | /* if we get here, then we are sending an ACK */ |
1378 | 1.85k | mms_stat->read = MQTT_MSG_ACK; |
1379 | 1.85k | mms_stat->ack = MQTT_MSG_WAIT; |
1380 | | |
1381 | | /* setup ACK in shared context */ |
1382 | 1.85k | XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); |
1383 | 1.85k | #ifdef WOLFMQTT_V5 |
1384 | 1.85k | client->packetAck.protocol_level = client->protocol_level; |
1385 | 1.85k | #endif |
1386 | 1.85k | } |
1387 | | |
1388 | | /* done reading */ |
1389 | 6.85k | MqttReadStop(client, mms_stat); |
1390 | 6.85k | break; |
1391 | 8.49k | } |
1392 | | |
1393 | 122 | case MQTT_MSG_ACK: |
1394 | | /* go to write section below */ |
1395 | 122 | break; |
1396 | | |
1397 | 0 | case MQTT_MSG_AUTH: |
1398 | 0 | default: |
1399 | 0 | { |
1400 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1401 | | PRINTF("MqttClient_WaitType: Invalid read state %d!", |
1402 | | mms_stat->read); |
1403 | | #endif |
1404 | 0 | rc = MQTT_CODE_ERROR_STAT; |
1405 | 0 | break; |
1406 | 0 | } |
1407 | 11.4k | } /* switch (mms_stat->read) */ |
1408 | | |
1409 | 11.4k | switch (mms_stat->ack) |
1410 | 11.4k | { |
1411 | 9.63k | case MQTT_MSG_BEGIN: |
1412 | | /* wait for read to set ack */ |
1413 | 9.63k | break; |
1414 | | |
1415 | 1.85k | case MQTT_MSG_WAIT: |
1416 | 1.85k | { |
1417 | | /* Flag write active / lock mutex */ |
1418 | 1.85k | if ((rc = MqttWriteStart(client, mms_stat)) != 0) { |
1419 | 0 | break; |
1420 | 0 | } |
1421 | 1.85k | mms_stat->ack = MQTT_MSG_ACK; |
1422 | 1.85k | } |
1423 | 1.85k | FALL_THROUGH; |
1424 | | |
1425 | 1.85k | case MQTT_MSG_ACK: |
1426 | 1.85k | { |
1427 | | /* send ack */ |
1428 | 1.85k | rc = MqttEncode_PublishResp(client->tx_buf, client->tx_buf_len, |
1429 | 1.85k | client->packetAck.packet_type, &client->packetAck); |
1430 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1431 | | PRINTF("MqttEncode_PublishResp: Len %d, Type %s (%d), ID %d", |
1432 | | rc, MqttPacket_TypeDesc(client->packetAck.packet_type), |
1433 | | client->packetAck.packet_type, client->packetAck.packet_id); |
1434 | | #endif |
1435 | 1.85k | if (rc < 0) { |
1436 | 0 | MqttWriteStop(client, mms_stat); |
1437 | 0 | break; |
1438 | 0 | } |
1439 | | |
1440 | 1.85k | client->write.len = rc; |
1441 | | /* Note: static analyzer complains about set, but not used here. |
1442 | | * Keeping it to ensure no future issues with rc > 0 */ |
1443 | 1.85k | rc = MQTT_CODE_SUCCESS; |
1444 | 1.85k | (void)rc; /* inhibit clang-analyzer-deadcode.DeadStores */ |
1445 | | |
1446 | 1.85k | mms_stat->ack = MQTT_MSG_HEADER; |
1447 | 1.85k | } |
1448 | 1.85k | FALL_THROUGH; |
1449 | | |
1450 | 1.85k | case MQTT_MSG_HEADER: |
1451 | 1.85k | { |
1452 | 1.85k | int xfer = client->write.len; |
1453 | | |
1454 | | /* Send publish response packet */ |
1455 | 1.85k | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
1456 | | #ifdef WOLFMQTT_NONBLOCK |
1457 | | if (rc == MQTT_CODE_CONTINUE) { |
1458 | | /* keep send mutex locked and return to caller */ |
1459 | | /* must keep send locked */ |
1460 | | return rc; |
1461 | | } |
1462 | | #endif |
1463 | 1.85k | MqttWriteStop(client, mms_stat); |
1464 | 1.85k | if (rc == xfer) { |
1465 | 1.01k | rc = MQTT_CODE_SUCCESS; /* success */ |
1466 | 1.01k | } |
1467 | | |
1468 | 1.85k | mms_stat->ack = MQTT_MSG_BEGIN; /* reset write state */ |
1469 | 1.85k | break; |
1470 | 1.85k | } |
1471 | | |
1472 | 0 | case MQTT_MSG_AUTH: |
1473 | 0 | case MQTT_MSG_PAYLOAD: |
1474 | 0 | case MQTT_MSG_PAYLOAD2: |
1475 | 0 | default: |
1476 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1477 | | PRINTF("MqttClient_WaitType: Invalid ack state %d!", |
1478 | | mms_stat->ack); |
1479 | | #endif |
1480 | 0 | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_STAT); |
1481 | 0 | break; |
1482 | 11.4k | } /* switch (mms_stat->ack) */ |
1483 | | |
1484 | | /* no data read or ack done, then reset state */ |
1485 | 11.4k | if (mms_stat->read == MQTT_MSG_WAIT) { |
1486 | 2.36k | mms_stat->read = MQTT_MSG_BEGIN; |
1487 | 2.36k | } |
1488 | | |
1489 | | #ifdef WOLFMQTT_NONBLOCK |
1490 | | /* if nonblocking and some data has been read, do not release read lock */ |
1491 | | if (rc == MQTT_CODE_CONTINUE && mms_stat->read > MQTT_MSG_WAIT) { |
1492 | | return rc; |
1493 | | } |
1494 | | #endif |
1495 | | |
1496 | 11.4k | MqttReadStop(client, mms_stat); |
1497 | | |
1498 | | #ifdef WOLFMQTT_NONBLOCK |
1499 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1500 | | #ifdef WOLFMQTT_MULTITHREAD |
1501 | | if (wm_SemLock(&client->lockClient) == 0) |
1502 | | #endif |
1503 | | { |
1504 | | client->lastRc = rc; |
1505 | | #ifdef WOLFMQTT_MULTITHREAD |
1506 | | wm_SemUnlock(&client->lockClient); |
1507 | | #endif |
1508 | | } |
1509 | | #endif /* WOLFMQTT_DEBUG_CLIENT */ |
1510 | | if (rc == MQTT_CODE_CONTINUE) { |
1511 | | return rc; |
1512 | | } |
1513 | | #endif |
1514 | | |
1515 | 11.4k | if (rc < 0) { |
1516 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1517 | | if (rc != MQTT_CODE_CONTINUE) { |
1518 | | PRINTF("MqttClient_WaitType: Failure: %s (%d)", |
1519 | | MqttClient_ReturnCodeToString(rc), rc); |
1520 | | } |
1521 | | #endif |
1522 | 4.49k | return rc; |
1523 | 4.49k | } |
1524 | | |
1525 | 6.99k | if (!waitMatchFound) { |
1526 | | /* if we get here, then the we are still waiting for a packet */ |
1527 | 6.06k | mms_stat->read = MQTT_MSG_BEGIN; |
1528 | | #ifdef WOLFMQTT_NONBLOCK |
1529 | | /* for non-blocking return with code continue instead of waiting again |
1530 | | * if called with packet type and id of 'any' */ |
1531 | | if (wait_type == MQTT_PACKET_TYPE_ANY && wait_packet_id == 0) { |
1532 | | return MQTT_CODE_CONTINUE; |
1533 | | } |
1534 | | #endif |
1535 | 6.06k | MQTT_TRACE_MSG("Wait Again"); |
1536 | 6.06k | goto wait_again; |
1537 | 6.06k | } |
1538 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1539 | | if (rc != MQTT_CODE_CONTINUE) { |
1540 | | PRINTF("MqttClient_WaitType: rc %d, state %d-%d-%d", |
1541 | | rc, mms_stat->read, mms_stat->write, mms_stat->ack); |
1542 | | } |
1543 | | #endif |
1544 | | |
1545 | | |
1546 | 927 | return rc; |
1547 | 6.99k | } |
1548 | | |
1549 | | |
1550 | | /* Public Functions */ |
1551 | | int MqttClient_Init(MqttClient *client, MqttNet* net, |
1552 | | MqttMsgCb msg_cb, |
1553 | | byte* tx_buf, int tx_buf_len, |
1554 | | byte* rx_buf, int rx_buf_len, |
1555 | | int cmd_timeout_ms) |
1556 | 0 | { |
1557 | 0 | int rc = MQTT_CODE_SUCCESS; |
1558 | | |
1559 | | /* Check arguments */ |
1560 | 0 | if (client == NULL || |
1561 | 0 | tx_buf == NULL || tx_buf_len <= 0 || |
1562 | 0 | rx_buf == NULL || rx_buf_len <= 0) { |
1563 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
1564 | 0 | } |
1565 | | |
1566 | | /* Initialize the client structure to zero */ |
1567 | 0 | XMEMSET(client, 0, sizeof(MqttClient)); |
1568 | | |
1569 | | /* Setup client structure */ |
1570 | 0 | client->msg_cb = msg_cb; |
1571 | 0 | client->tx_buf = tx_buf; |
1572 | 0 | client->tx_buf_len = tx_buf_len; |
1573 | 0 | client->rx_buf = rx_buf; |
1574 | 0 | client->rx_buf_len = rx_buf_len; |
1575 | 0 | client->cmd_timeout_ms = cmd_timeout_ms; |
1576 | 0 | #ifdef WOLFMQTT_V5 |
1577 | 0 | client->max_qos = MQTT_QOS_2; |
1578 | 0 | client->retain_avail = 1; |
1579 | 0 | client->protocol_level = MQTT_CONNECT_PROTOCOL_LEVEL; |
1580 | 0 | rc = MqttProps_Init(); |
1581 | 0 | #endif |
1582 | |
|
1583 | | #ifdef WOLFMQTT_MULTITHREAD |
1584 | | if (rc == 0) { |
1585 | | rc = wm_SemInit(&client->lockSend); |
1586 | | } |
1587 | | if (rc == 0) { |
1588 | | rc = wm_SemInit(&client->lockRecv); |
1589 | | } |
1590 | | if (rc == 0) { |
1591 | | rc = wm_SemInit(&client->lockClient); |
1592 | | } |
1593 | | #ifdef ENABLE_MQTT_CURL |
1594 | | if (rc == 0) { |
1595 | | rc = wm_SemInit(&client->lockCURL); |
1596 | | } |
1597 | | #endif |
1598 | | #endif |
1599 | |
|
1600 | 0 | if (rc == 0) { |
1601 | | /* Init socket */ |
1602 | 0 | rc = MqttSocket_Init(client, net); |
1603 | 0 | } |
1604 | |
|
1605 | 0 | if (rc != 0) { |
1606 | | /* Cleanup if init failed */ |
1607 | 0 | MqttClient_DeInit(client); |
1608 | 0 | } |
1609 | |
|
1610 | 0 | return rc; |
1611 | 0 | } |
1612 | | |
1613 | | void MqttClient_DeInit(MqttClient *client) |
1614 | 0 | { |
1615 | 0 | if (client != NULL) { |
1616 | | #ifdef WOLFMQTT_MULTITHREAD |
1617 | | (void)wm_SemFree(&client->lockSend); |
1618 | | (void)wm_SemFree(&client->lockRecv); |
1619 | | (void)wm_SemFree(&client->lockClient); |
1620 | | #ifdef ENABLE_MQTT_CURL |
1621 | | (void)wm_SemFree(&client->lockCURL); |
1622 | | #endif |
1623 | | #endif |
1624 | 0 | } |
1625 | 0 | #ifdef WOLFMQTT_V5 |
1626 | 0 | (void)MqttProps_ShutDown(); |
1627 | 0 | #endif |
1628 | 0 | } |
1629 | | |
1630 | | #ifdef WOLFMQTT_DISCONNECT_CB |
1631 | | int MqttClient_SetDisconnectCallback(MqttClient *client, |
1632 | | MqttDisconnectCb discCb, void* ctx) |
1633 | 0 | { |
1634 | 0 | if (client == NULL) |
1635 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
1636 | | |
1637 | 0 | client->disconnect_cb = discCb; |
1638 | 0 | client->disconnect_ctx = ctx; |
1639 | |
|
1640 | 0 | return MQTT_CODE_SUCCESS; |
1641 | 0 | } |
1642 | | #endif |
1643 | | |
1644 | | #ifdef WOLFMQTT_PROPERTY_CB |
1645 | | int MqttClient_SetPropertyCallback(MqttClient *client, MqttPropertyCb propCb, |
1646 | | void* ctx) |
1647 | 0 | { |
1648 | 0 | if (client == NULL) |
1649 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
1650 | | |
1651 | 0 | client->property_cb = propCb; |
1652 | 0 | client->property_ctx = ctx; |
1653 | |
|
1654 | 0 | return MQTT_CODE_SUCCESS; |
1655 | 0 | } |
1656 | | #endif |
1657 | | |
1658 | | int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) |
1659 | 2.69k | { |
1660 | 2.69k | int rc; |
1661 | | |
1662 | | /* Validate required arguments */ |
1663 | 2.69k | if (client == NULL || mc_connect == NULL) { |
1664 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
1665 | 0 | } |
1666 | | |
1667 | 2.69k | if (mc_connect->stat.write == MQTT_MSG_BEGIN) { |
1668 | | /* Flag write active / lock mutex */ |
1669 | 2.69k | if ((rc = MqttWriteStart(client, &mc_connect->stat)) != 0) { |
1670 | 0 | return rc; |
1671 | 0 | } |
1672 | | |
1673 | 2.69k | #ifdef WOLFMQTT_V5 |
1674 | | /* Use specified protocol version if set */ |
1675 | 2.69k | mc_connect->protocol_level = client->protocol_level; |
1676 | 2.69k | #endif |
1677 | | |
1678 | | /* Encode the connect packet */ |
1679 | 2.69k | rc = MqttEncode_Connect(client->tx_buf, client->tx_buf_len, mc_connect); |
1680 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1681 | | PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d", |
1682 | | rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_CONNECT), |
1683 | | MQTT_PACKET_TYPE_CONNECT, 0, 0); |
1684 | | #endif |
1685 | 2.69k | if (rc <= 0) { |
1686 | 35 | MqttWriteStop(client, &mc_connect->stat); |
1687 | 35 | return rc; |
1688 | 35 | } |
1689 | 2.65k | client->write.len = rc; |
1690 | | |
1691 | | #ifdef WOLFMQTT_MULTITHREAD |
1692 | | rc = wm_SemLock(&client->lockClient); |
1693 | | if (rc == 0) { |
1694 | | /* inform other threads of expected response */ |
1695 | | rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_CONNECT_ACK, |
1696 | | 0, &mc_connect->pendResp, &mc_connect->ack); |
1697 | | wm_SemUnlock(&client->lockClient); |
1698 | | } |
1699 | | if (rc != 0) { |
1700 | | MqttWriteStop(client, &mc_connect->stat); |
1701 | | return rc; /* Error locking client */ |
1702 | | } |
1703 | | #endif |
1704 | | |
1705 | 2.65k | mc_connect->stat.write = MQTT_MSG_HEADER; |
1706 | 2.65k | } |
1707 | 2.65k | if (mc_connect->stat.write == MQTT_MSG_HEADER) { |
1708 | 2.65k | int xfer = client->write.len; |
1709 | | |
1710 | | /* Send connect packet */ |
1711 | 2.65k | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
1712 | | #ifdef WOLFMQTT_NONBLOCK |
1713 | | if (rc == MQTT_CODE_CONTINUE |
1714 | | #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK |
1715 | | && client->write.total > 0 |
1716 | | #endif |
1717 | | ) { |
1718 | | /* keep send locked and return early */ |
1719 | | return rc; |
1720 | | } |
1721 | | #endif |
1722 | 2.65k | MqttWriteStop(client, &mc_connect->stat); |
1723 | 2.65k | if (rc != xfer) { |
1724 | 1.78k | MqttClient_CancelMessage(client, (MqttObject*)mc_connect); |
1725 | 1.78k | return rc; |
1726 | 1.78k | } |
1727 | | |
1728 | 874 | #ifdef WOLFMQTT_V5 |
1729 | | /* Enhanced authentication */ |
1730 | 874 | if (client->enable_eauth == 1) { |
1731 | 0 | mc_connect->stat.write = MQTT_MSG_AUTH; |
1732 | 0 | } |
1733 | 874 | else |
1734 | 874 | #endif |
1735 | 874 | { |
1736 | 874 | mc_connect->stat.write = MQTT_MSG_WAIT; |
1737 | 874 | } |
1738 | 874 | } |
1739 | | |
1740 | 874 | #ifdef WOLFMQTT_V5 |
1741 | | /* Enhanced authentication */ |
1742 | 874 | if (mc_connect->protocol_level > MQTT_CONNECT_PROTOCOL_LEVEL_4 && |
1743 | 874 | mc_connect->stat.write == MQTT_MSG_AUTH) |
1744 | 0 | { |
1745 | 0 | MqttAuth auth, *p_auth = &auth; |
1746 | 0 | MqttProp* prop, *conn_prop; |
1747 | | |
1748 | | /* Find the AUTH property in the connect structure */ |
1749 | 0 | for (conn_prop = mc_connect->props; |
1750 | 0 | (conn_prop != NULL) && (conn_prop->type != MQTT_PROP_AUTH_METHOD); |
1751 | 0 | conn_prop = conn_prop->next) { |
1752 | 0 | } |
1753 | 0 | if (conn_prop == NULL) { |
1754 | | #ifdef WOLFMQTT_MULTITHREAD |
1755 | | if (wm_SemLock(&client->lockClient) == 0) { |
1756 | | MqttClient_RespList_Remove(client, &mc_connect->pendResp); |
1757 | | wm_SemUnlock(&client->lockClient); |
1758 | | } |
1759 | | #endif |
1760 | | /* AUTH property was not set in connect structure */ |
1761 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
1762 | 0 | } |
1763 | | |
1764 | 0 | XMEMSET((void*)p_auth, 0, sizeof(MqttAuth)); |
1765 | | |
1766 | | /* Set the authentication reason */ |
1767 | 0 | p_auth->reason_code = MQTT_REASON_CONT_AUTH; |
1768 | | |
1769 | | /* Use the same authentication method property from connect */ |
1770 | 0 | prop = MqttProps_Add(&p_auth->props); |
1771 | 0 | prop->type = MQTT_PROP_AUTH_METHOD; |
1772 | 0 | prop->data_str.str = conn_prop->data_str.str; |
1773 | 0 | prop->data_str.len = conn_prop->data_str.len; |
1774 | | |
1775 | | /* Send the AUTH packet */ |
1776 | 0 | rc = MqttClient_Auth(client, p_auth); |
1777 | 0 | MqttClient_PropsFree(p_auth->props); |
1778 | | #ifdef WOLFMQTT_NONBLOCK |
1779 | | if (rc == MQTT_CODE_CONTINUE) |
1780 | | return rc; |
1781 | | #endif |
1782 | 0 | if (rc < 0) { |
1783 | | #ifdef WOLFMQTT_MULTITHREAD |
1784 | | if (wm_SemLock(&client->lockClient) == 0) { |
1785 | | MqttClient_RespList_Remove(client, &mc_connect->pendResp); |
1786 | | wm_SemUnlock(&client->lockClient); |
1787 | | } |
1788 | | #endif |
1789 | 0 | return rc; |
1790 | 0 | } |
1791 | 0 | mc_connect->stat.write = MQTT_MSG_WAIT; |
1792 | 0 | } |
1793 | 874 | #endif /* WOLFMQTT_V5 */ |
1794 | | |
1795 | | /* Wait for connect ack packet */ |
1796 | 874 | rc = MqttClient_WaitType(client, &mc_connect->ack, |
1797 | 874 | MQTT_PACKET_TYPE_CONNECT_ACK, 0, client->cmd_timeout_ms); |
1798 | | #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) |
1799 | | if (rc == MQTT_CODE_CONTINUE) |
1800 | | return rc; |
1801 | | #endif |
1802 | | |
1803 | | #ifdef WOLFMQTT_MULTITHREAD |
1804 | | if (wm_SemLock(&client->lockClient) == 0) { |
1805 | | MqttClient_RespList_Remove(client, &mc_connect->pendResp); |
1806 | | wm_SemUnlock(&client->lockClient); |
1807 | | } |
1808 | | #endif |
1809 | | |
1810 | | /* reset state */ |
1811 | 874 | mc_connect->stat.write = MQTT_MSG_BEGIN; |
1812 | | |
1813 | 874 | return rc; |
1814 | 874 | } |
1815 | | |
1816 | | static int MqttClient_Publish_ReadPayload(MqttClient* client, |
1817 | | MqttPublish* publish, int timeout_ms) |
1818 | 1.85k | { |
1819 | 1.85k | int rc = MQTT_CODE_SUCCESS; |
1820 | 1.85k | byte msg_done; |
1821 | | |
1822 | | /* Handle packet callback and read remaining payload */ |
1823 | 3.80k | do { |
1824 | | /* Determine if message is done */ |
1825 | 3.80k | msg_done = ((publish->buffer_pos + publish->buffer_len) >= |
1826 | 3.80k | publish->total_len) ? 1 : 0; |
1827 | | |
1828 | 3.80k | if (publish->buffer_new) { |
1829 | | /* Issue callback for new message (first time only) */ |
1830 | 1.53k | if (client->msg_cb) { |
1831 | | /* if using the temp publish message buffer, |
1832 | | then populate message context with client context */ |
1833 | 1.53k | if (publish->ctx == NULL && &client->msg.publish == publish) { |
1834 | 1.53k | publish->ctx = client->ctx; |
1835 | 1.53k | } |
1836 | 1.53k | rc = client->msg_cb(client, publish, publish->buffer_new, |
1837 | 1.53k | msg_done); |
1838 | 1.53k | if (rc != MQTT_CODE_SUCCESS) { |
1839 | 0 | return rc; |
1840 | 1.53k | }; |
1841 | 1.53k | } |
1842 | | |
1843 | | /* Reset topic name since valid on new message only */ |
1844 | 1.53k | publish->topic_name = NULL; |
1845 | 1.53k | publish->topic_name_len = 0; |
1846 | | |
1847 | 1.53k | publish->buffer_new = 0; |
1848 | 1.53k | } |
1849 | | |
1850 | | /* Read payload */ |
1851 | 3.80k | if (!msg_done) { |
1852 | 2.27k | int msg_len; |
1853 | | |
1854 | | /* add last length to position and reset len */ |
1855 | 2.27k | publish->buffer_pos += publish->buffer_len; |
1856 | 2.27k | publish->buffer_len = 0; |
1857 | | |
1858 | | /* set state to reading payload */ |
1859 | 2.27k | publish->stat.read = MQTT_MSG_PAYLOAD2; |
1860 | | |
1861 | 2.27k | msg_len = (publish->total_len - publish->buffer_pos); |
1862 | 2.27k | if (msg_len > client->rx_buf_len) { |
1863 | 641 | msg_len = client->rx_buf_len; |
1864 | 641 | } |
1865 | | |
1866 | | /* make sure there is something to read */ |
1867 | 2.27k | if (msg_len > 0) { |
1868 | 2.27k | rc = MqttSocket_Read(client, client->rx_buf, msg_len, |
1869 | 2.27k | timeout_ms); |
1870 | 2.27k | if (rc < 0) { |
1871 | 297 | break; |
1872 | 297 | } |
1873 | | |
1874 | | /* Update message */ |
1875 | 1.98k | publish->buffer = client->rx_buf; |
1876 | 1.98k | publish->buffer_len = rc; |
1877 | 1.98k | rc = MQTT_CODE_SUCCESS; /* mark success */ |
1878 | | |
1879 | 1.98k | msg_done = ((publish->buffer_pos + publish->buffer_len) >= |
1880 | 1.98k | publish->total_len) ? 1 : 0; |
1881 | | |
1882 | | /* Issue callback for additional publish payload */ |
1883 | 1.98k | if (client->msg_cb) { |
1884 | 1.98k | rc = client->msg_cb(client, publish, publish->buffer_new, |
1885 | 1.98k | msg_done); |
1886 | 1.98k | if (rc != MQTT_CODE_SUCCESS) { |
1887 | 0 | return rc; |
1888 | 1.98k | }; |
1889 | 1.98k | } |
1890 | 1.98k | } |
1891 | 2.27k | } |
1892 | 3.80k | } while (!msg_done); |
1893 | | |
1894 | 1.85k | return rc; |
1895 | 1.85k | } |
1896 | | |
1897 | | static int MqttClient_Publish_WritePayload(MqttClient *client, |
1898 | | MqttPublish *publish, MqttPublishCb pubCb) |
1899 | 223 | { |
1900 | 223 | int rc = MQTT_CODE_SUCCESS; |
1901 | | |
1902 | 223 | if (client == NULL || publish == NULL) |
1903 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
1904 | | |
1905 | 223 | if (pubCb) { /* use publish callback to get data */ |
1906 | 0 | word32 tmp_len = publish->buffer_len; |
1907 | |
|
1908 | 0 | do { |
1909 | | /* use the client->write.len to handle non-blocking re-entry when |
1910 | | * new publish callback data is needed */ |
1911 | 0 | if (client->write.len == 0) { |
1912 | | /* Use the callback to get payload */ |
1913 | 0 | if ((client->write.len = pubCb(publish)) < 0) { |
1914 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1915 | | PRINTF("Publish callback error %d", client->write.len); |
1916 | | #endif |
1917 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK); |
1918 | 0 | } |
1919 | 0 | } |
1920 | | |
1921 | 0 | if ((word32)client->write.len < publish->buffer_len) { |
1922 | | /* Last read */ |
1923 | 0 | tmp_len = (word32)client->write.len; |
1924 | 0 | } |
1925 | | |
1926 | | /* Send payload */ |
1927 | 0 | do { |
1928 | 0 | if (client->write.len > client->tx_buf_len) { |
1929 | 0 | client->write.len = client->tx_buf_len; |
1930 | 0 | } |
1931 | 0 | publish->intBuf_len = client->write.len; |
1932 | 0 | XMEMCPY(client->tx_buf, &publish->buffer[publish->intBuf_pos], |
1933 | 0 | client->write.len); |
1934 | |
|
1935 | 0 | rc = MqttPacket_Write(client, client->tx_buf, |
1936 | 0 | client->write.len); |
1937 | 0 | if (rc < 0) { |
1938 | 0 | return rc; |
1939 | 0 | } |
1940 | | |
1941 | 0 | publish->intBuf_pos += publish->intBuf_len; |
1942 | 0 | publish->intBuf_len = 0; |
1943 | |
|
1944 | 0 | } while (publish->intBuf_pos < tmp_len); |
1945 | | |
1946 | 0 | publish->buffer_pos += publish->intBuf_pos; |
1947 | 0 | publish->intBuf_pos = 0; |
1948 | 0 | client->write.len = 0; /* reset current write len */ |
1949 | |
|
1950 | 0 | } while (publish->buffer_pos < publish->total_len); |
1951 | 0 | } |
1952 | 223 | else if (publish->buffer_pos < publish->total_len) { |
1953 | 188 | if (publish->buffer_pos > 0) { |
1954 | 178 | client->write.len = (publish->total_len - publish->buffer_pos); |
1955 | 178 | if (client->write.len > client->tx_buf_len) { |
1956 | 113 | client->write.len = client->tx_buf_len; |
1957 | 113 | } |
1958 | | |
1959 | 178 | XMEMCPY(client->tx_buf, &publish->buffer[publish->buffer_pos], |
1960 | 178 | client->write.len); |
1961 | | |
1962 | 178 | #ifndef WOLFMQTT_NONBLOCK |
1963 | 178 | publish->intBuf_pos += client->write.len; |
1964 | 178 | #endif |
1965 | 178 | } |
1966 | | |
1967 | | /* Send packet and payload */ |
1968 | | #ifdef WOLFMQTT_NONBLOCK |
1969 | | rc = MqttPacket_Write(client, client->tx_buf, client->write.len); |
1970 | | if (rc < 0) { |
1971 | | return rc; |
1972 | | } |
1973 | | |
1974 | | /* ONLY if send was successful, update buffer position. |
1975 | | * Otherwise, MqttPacket_Write() will resume where it left off. */ |
1976 | | publish->buffer_pos += client->write.len; |
1977 | | |
1978 | | /* Check if we are done sending publish message */ |
1979 | | if (publish->buffer_pos < publish->buffer_len) { |
1980 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1981 | | PRINTF("Publish Write: not done (%d remain)", |
1982 | | publish->buffer_len - publish->buffer_pos); |
1983 | | #endif |
1984 | | return MQTT_CODE_PUB_CONTINUE; |
1985 | | } |
1986 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
1987 | | else { |
1988 | | PRINTF("Publish Write: done"); |
1989 | | } |
1990 | | #endif |
1991 | | #else |
1992 | 650 | do { |
1993 | 650 | rc = MqttPacket_Write(client, client->tx_buf, client->write.len); |
1994 | 650 | if (rc < 0) { |
1995 | 131 | return rc; |
1996 | 131 | } |
1997 | | |
1998 | 519 | publish->intBuf_pos += publish->intBuf_len; |
1999 | 519 | publish->intBuf_len = 0; |
2000 | | |
2001 | | /* Check if we are done sending publish message */ |
2002 | 519 | if (publish->intBuf_pos >= publish->buffer_len) { |
2003 | 57 | rc = MQTT_CODE_SUCCESS; |
2004 | 57 | break; |
2005 | 57 | } |
2006 | | |
2007 | | /* Build packet payload to send */ |
2008 | 462 | client->write.len = (publish->buffer_len - publish->intBuf_pos); |
2009 | 462 | if (client->write.len > client->tx_buf_len) { |
2010 | 397 | client->write.len = client->tx_buf_len; |
2011 | 397 | } |
2012 | 462 | publish->intBuf_len = client->write.len; |
2013 | 462 | XMEMCPY(client->tx_buf, &publish->buffer[publish->intBuf_pos], |
2014 | 462 | client->write.len); |
2015 | 462 | } while (publish->intBuf_pos < publish->buffer_len); |
2016 | 57 | #endif |
2017 | | |
2018 | 57 | if (rc >= 0) { |
2019 | | /* If transferring more chunks */ |
2020 | 57 | publish->buffer_pos += publish->intBuf_pos; |
2021 | 57 | if (publish->buffer_pos < publish->total_len) { |
2022 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2023 | | PRINTF("Publish Write: chunk (%d remain)", |
2024 | | publish->total_len - publish->buffer_pos); |
2025 | | #endif |
2026 | | |
2027 | | /* Build next payload to send */ |
2028 | 0 | client->write.len = (publish->total_len - publish->buffer_pos); |
2029 | 0 | if (client->write.len > client->tx_buf_len) { |
2030 | 0 | client->write.len = client->tx_buf_len; |
2031 | 0 | } |
2032 | 0 | rc = MQTT_CODE_PUB_CONTINUE; |
2033 | 0 | } |
2034 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2035 | | else { |
2036 | | PRINTF("Publish Write: chunked done"); |
2037 | | } |
2038 | | #endif |
2039 | 57 | } |
2040 | 57 | } |
2041 | 92 | return rc; |
2042 | 223 | } |
2043 | | |
2044 | | static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, |
2045 | | MqttPublishCb pubCb, int writeOnly) |
2046 | 721 | { |
2047 | 721 | int rc = MQTT_CODE_SUCCESS; |
2048 | 721 | MqttPacketType resp_type; |
2049 | | |
2050 | | /* Validate required arguments */ |
2051 | 721 | if (client == NULL || publish == NULL) { |
2052 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2053 | 0 | } |
2054 | | |
2055 | 721 | #ifdef WOLFMQTT_V5 |
2056 | | /* Use specified protocol version if set */ |
2057 | 721 | publish->protocol_level = client->protocol_level; |
2058 | | |
2059 | | /* Validate publish request against server properties */ |
2060 | 721 | if ((publish->qos > client->max_qos) || |
2061 | 721 | ((publish->retain == 1) && (client->retain_avail == 0))) |
2062 | 154 | { |
2063 | 154 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SERVER_PROP); |
2064 | 154 | } |
2065 | 567 | #endif |
2066 | | |
2067 | 567 | switch (publish->stat.write) |
2068 | 567 | { |
2069 | 567 | case MQTT_MSG_BEGIN: |
2070 | 567 | { |
2071 | | /* Flag write active / lock mutex */ |
2072 | 567 | if ((rc = MqttWriteStart(client, &publish->stat)) != 0) { |
2073 | 0 | return rc; |
2074 | 0 | } |
2075 | | |
2076 | | /* Encode the publish packet */ |
2077 | 567 | rc = MqttEncode_Publish(client->tx_buf, client->tx_buf_len, |
2078 | 567 | publish, pubCb ? 1 : 0); |
2079 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2080 | | PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d," |
2081 | | " QoS %d", |
2082 | | rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_PUBLISH), |
2083 | | MQTT_PACKET_TYPE_PUBLISH, publish->packet_id, |
2084 | | publish->qos); |
2085 | | #endif |
2086 | 567 | if (rc <= 0) { |
2087 | 32 | MqttWriteStop(client, &publish->stat); |
2088 | 32 | return rc; |
2089 | 32 | } |
2090 | 535 | client->write.len = rc; |
2091 | | |
2092 | | #ifdef WOLFMQTT_MULTITHREAD |
2093 | | if (publish->qos > MQTT_QOS_0) { |
2094 | | resp_type = (publish->qos == MQTT_QOS_1) ? |
2095 | | MQTT_PACKET_TYPE_PUBLISH_ACK : |
2096 | | MQTT_PACKET_TYPE_PUBLISH_COMP; |
2097 | | |
2098 | | rc = wm_SemLock(&client->lockClient); |
2099 | | if (rc == 0) { |
2100 | | /* inform other threads of expected response */ |
2101 | | rc = MqttClient_RespList_Add(client, resp_type, |
2102 | | publish->packet_id, &publish->pendResp, &publish->resp); |
2103 | | wm_SemUnlock(&client->lockClient); |
2104 | | } |
2105 | | if (rc != 0) { |
2106 | | MqttWriteStop(client, &publish->stat); |
2107 | | return rc; /* Error locking client */ |
2108 | | } |
2109 | | } |
2110 | | #endif |
2111 | | |
2112 | 535 | publish->stat.write = MQTT_MSG_HEADER; |
2113 | 535 | } |
2114 | 535 | FALL_THROUGH; |
2115 | | |
2116 | 535 | case MQTT_MSG_HEADER: |
2117 | 535 | { |
2118 | 535 | int xfer = client->write.len; |
2119 | | |
2120 | | /* Send publish packet */ |
2121 | 535 | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
2122 | | #ifdef WOLFMQTT_NONBLOCK |
2123 | | if (rc == MQTT_CODE_CONTINUE |
2124 | | #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK |
2125 | | && client->write.total > 0 |
2126 | | #endif |
2127 | | ) { |
2128 | | /* keep send locked and return early */ |
2129 | | return rc; |
2130 | | } |
2131 | | #endif |
2132 | 535 | client->write.len = 0; /* reset len, so publish chunk resets */ |
2133 | | |
2134 | | /* if failure or no data was written yet */ |
2135 | 535 | if (rc != xfer) { |
2136 | 312 | MqttWriteStop(client, &publish->stat); |
2137 | 312 | MqttClient_CancelMessage(client, (MqttObject*)publish); |
2138 | 312 | return rc; |
2139 | 312 | } |
2140 | | |
2141 | | /* advance state */ |
2142 | 223 | publish->stat.write = MQTT_MSG_PAYLOAD; |
2143 | 223 | } |
2144 | 223 | FALL_THROUGH; |
2145 | | |
2146 | 223 | case MQTT_MSG_PAYLOAD: |
2147 | 223 | { |
2148 | 223 | rc = MqttClient_Publish_WritePayload(client, publish, pubCb); |
2149 | | #ifdef WOLFMQTT_NONBLOCK |
2150 | | if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE) |
2151 | | return rc; |
2152 | | #endif |
2153 | 223 | MqttWriteStop(client, &publish->stat); |
2154 | 223 | if (rc < 0) { |
2155 | 131 | MqttClient_CancelMessage(client, (MqttObject*)publish); |
2156 | 131 | break; |
2157 | 131 | } |
2158 | | |
2159 | | /* if not expecting a reply then we are done */ |
2160 | 92 | if (publish->qos == MQTT_QOS_0) { |
2161 | 92 | break; |
2162 | 92 | } |
2163 | 0 | publish->stat.write = MQTT_MSG_WAIT; |
2164 | 0 | } |
2165 | 0 | FALL_THROUGH; |
2166 | |
|
2167 | 0 | case MQTT_MSG_WAIT: |
2168 | 0 | { |
2169 | | /* Handle QoS */ |
2170 | 0 | if (publish->qos > MQTT_QOS_0) { |
2171 | | /* Determine packet type to wait for */ |
2172 | 0 | resp_type = (publish->qos == MQTT_QOS_1) ? |
2173 | 0 | MQTT_PACKET_TYPE_PUBLISH_ACK : |
2174 | 0 | MQTT_PACKET_TYPE_PUBLISH_COMP; |
2175 | |
|
2176 | | #ifdef WOLFMQTT_MULTITHREAD |
2177 | | if (writeOnly) { |
2178 | | /* another thread will handle response */ |
2179 | | /* check if response already received from other thread */ |
2180 | | rc = MqttClient_CheckPendResp(client, resp_type, |
2181 | | publish->packet_id); |
2182 | | #ifndef WOLFMQTT_NONBLOCK |
2183 | | if (rc == MQTT_CODE_CONTINUE) { |
2184 | | /* mark success, let other thread handle response */ |
2185 | | rc = MQTT_CODE_SUCCESS; |
2186 | | } |
2187 | | #endif |
2188 | | } |
2189 | | else |
2190 | | #endif |
2191 | 0 | { |
2192 | 0 | (void)writeOnly; /* not used */ |
2193 | | |
2194 | | /* Wait for publish response packet */ |
2195 | 0 | rc = MqttClient_WaitType(client, &publish->resp, resp_type, |
2196 | 0 | publish->packet_id, client->cmd_timeout_ms); |
2197 | 0 | } |
2198 | |
|
2199 | | #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) |
2200 | | if (rc == MQTT_CODE_CONTINUE) |
2201 | | break; |
2202 | | #endif |
2203 | | #ifdef WOLFMQTT_MULTITHREAD |
2204 | | if (wm_SemLock(&client->lockClient) == 0) { |
2205 | | MqttClient_RespList_Remove(client, &publish->pendResp); |
2206 | | wm_SemUnlock(&client->lockClient); |
2207 | | } |
2208 | | #endif |
2209 | 0 | } |
2210 | 0 | break; |
2211 | 0 | } |
2212 | | |
2213 | 0 | case MQTT_MSG_ACK: |
2214 | 0 | case MQTT_MSG_AUTH: |
2215 | 0 | case MQTT_MSG_PAYLOAD2: |
2216 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2217 | | PRINTF("MqttClient_Publish: Invalid state %d!", |
2218 | | publish->stat.write); |
2219 | | #endif |
2220 | 0 | rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_STAT); |
2221 | 0 | break; |
2222 | 567 | } /* switch (publish->stat) */ |
2223 | | |
2224 | | /* reset state */ |
2225 | 223 | if ((rc != MQTT_CODE_PUB_CONTINUE) |
2226 | | #ifdef WOLFMQTT_NONBLOCK |
2227 | | && (rc != MQTT_CODE_CONTINUE) |
2228 | | #endif |
2229 | 223 | ) |
2230 | 223 | { |
2231 | 223 | publish->stat.write = MQTT_MSG_BEGIN; |
2232 | 223 | } |
2233 | 223 | if (rc > 0) { |
2234 | 0 | rc = MQTT_CODE_SUCCESS; |
2235 | 0 | } |
2236 | | |
2237 | 223 | return rc; |
2238 | 567 | } |
2239 | | |
2240 | | int MqttClient_Publish(MqttClient *client, MqttPublish *publish) |
2241 | 721 | { |
2242 | 721 | return MqttPublishMsg(client, publish, NULL, 0); |
2243 | 721 | } |
2244 | | |
2245 | | int MqttClient_Publish_ex(MqttClient *client, MqttPublish *publish, |
2246 | | MqttPublishCb pubCb) |
2247 | 0 | { |
2248 | 0 | return MqttPublishMsg(client, publish, pubCb, 0); |
2249 | 0 | } |
2250 | | |
2251 | | #ifdef WOLFMQTT_MULTITHREAD |
2252 | | int MqttClient_Publish_WriteOnly(MqttClient *client, MqttPublish *publish, |
2253 | | MqttPublishCb pubCb) |
2254 | | { |
2255 | | return MqttPublishMsg(client, publish, pubCb, 1); |
2256 | | } |
2257 | | #endif |
2258 | | |
2259 | | |
2260 | | int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe) |
2261 | 825 | { |
2262 | 825 | int rc, i; |
2263 | 825 | MqttTopic* topic; |
2264 | | |
2265 | | /* Validate required arguments */ |
2266 | 825 | if (client == NULL || subscribe == NULL) { |
2267 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2268 | 0 | } |
2269 | | |
2270 | 825 | #ifdef WOLFMQTT_V5 |
2271 | | /* Use specified protocol version if set */ |
2272 | 825 | subscribe->protocol_level = client->protocol_level; |
2273 | 825 | #endif |
2274 | | |
2275 | 825 | if (subscribe->stat.write == MQTT_MSG_BEGIN) { |
2276 | | /* Flag write active / lock mutex */ |
2277 | 825 | if ((rc = MqttWriteStart(client, &subscribe->stat)) != 0) { |
2278 | 0 | return rc; |
2279 | 0 | } |
2280 | | |
2281 | | /* Encode the subscribe packet */ |
2282 | 825 | rc = MqttEncode_Subscribe(client->tx_buf, client->tx_buf_len, |
2283 | 825 | subscribe); |
2284 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2285 | | PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d", |
2286 | | rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_SUBSCRIBE), |
2287 | | MQTT_PACKET_TYPE_SUBSCRIBE, subscribe->packet_id); |
2288 | | #endif |
2289 | 825 | if (rc <= 0) { |
2290 | 49 | MqttWriteStop(client, &subscribe->stat); |
2291 | 49 | return rc; |
2292 | 49 | } |
2293 | 776 | client->write.len = rc; |
2294 | | |
2295 | | #ifdef WOLFMQTT_MULTITHREAD |
2296 | | rc = wm_SemLock(&client->lockClient); |
2297 | | if (rc == 0) { |
2298 | | /* inform other threads of expected response */ |
2299 | | rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_SUBSCRIBE_ACK, |
2300 | | subscribe->packet_id, &subscribe->pendResp, &subscribe->ack); |
2301 | | wm_SemUnlock(&client->lockClient); |
2302 | | } |
2303 | | if (rc != 0) { |
2304 | | MqttWriteStop(client, &subscribe->stat); |
2305 | | return rc; /* Error locking client */ |
2306 | | } |
2307 | | #endif |
2308 | | |
2309 | 776 | subscribe->stat.write = MQTT_MSG_HEADER; |
2310 | 776 | } |
2311 | 776 | if (subscribe->stat.write == MQTT_MSG_HEADER) { |
2312 | 776 | int xfer = client->write.len; |
2313 | | |
2314 | | /* Send subscribe packet */ |
2315 | 776 | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
2316 | | #ifdef WOLFMQTT_NONBLOCK |
2317 | | if (rc == MQTT_CODE_CONTINUE |
2318 | | #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK |
2319 | | && client->write.total > 0 |
2320 | | #endif |
2321 | | ) { |
2322 | | /* keep send locked and return early */ |
2323 | | return rc; |
2324 | | } |
2325 | | #endif |
2326 | 776 | MqttWriteStop(client, &subscribe->stat); |
2327 | 776 | if (rc != xfer) { |
2328 | 561 | MqttClient_CancelMessage(client, (MqttObject*)subscribe); |
2329 | 561 | return rc; |
2330 | 561 | } |
2331 | | |
2332 | 215 | subscribe->stat.write = MQTT_MSG_WAIT; |
2333 | 215 | } |
2334 | | |
2335 | | /* Wait for subscribe ack packet */ |
2336 | 215 | rc = MqttClient_WaitType(client, &subscribe->ack, |
2337 | 215 | MQTT_PACKET_TYPE_SUBSCRIBE_ACK, subscribe->packet_id, |
2338 | 215 | client->cmd_timeout_ms); |
2339 | | #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) |
2340 | | if (rc == MQTT_CODE_CONTINUE) |
2341 | | return rc; |
2342 | | #endif |
2343 | | |
2344 | | #ifdef WOLFMQTT_MULTITHREAD |
2345 | | if (wm_SemLock(&client->lockClient) == 0) { |
2346 | | MqttClient_RespList_Remove(client, &subscribe->pendResp); |
2347 | | wm_SemUnlock(&client->lockClient); |
2348 | | } |
2349 | | #endif |
2350 | | |
2351 | | /* Populate return codes */ |
2352 | 215 | if (rc == MQTT_CODE_SUCCESS) { |
2353 | 279 | for (i = 0; i < subscribe->topic_count && i < MAX_MQTT_TOPICS; i++) { |
2354 | 220 | topic = &subscribe->topics[i]; |
2355 | 220 | topic->return_code = subscribe->ack.return_codes[i]; |
2356 | 220 | } |
2357 | 59 | } |
2358 | | |
2359 | | /* reset state */ |
2360 | 215 | subscribe->stat.write = MQTT_MSG_BEGIN; |
2361 | | |
2362 | 215 | return rc; |
2363 | 776 | } |
2364 | | |
2365 | | int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe) |
2366 | 402 | { |
2367 | 402 | int rc; |
2368 | | |
2369 | | /* Validate required arguments */ |
2370 | 402 | if (client == NULL || unsubscribe == NULL) { |
2371 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2372 | 0 | } |
2373 | | |
2374 | 402 | #ifdef WOLFMQTT_V5 |
2375 | | /* Use specified protocol version if set */ |
2376 | 402 | unsubscribe->protocol_level = client->protocol_level; |
2377 | 402 | #endif |
2378 | | |
2379 | 402 | if (unsubscribe->stat.write == MQTT_MSG_BEGIN) { |
2380 | | /* Flag write active / lock mutex */ |
2381 | 402 | if ((rc = MqttWriteStart(client, &unsubscribe->stat)) != 0) { |
2382 | 0 | return rc; |
2383 | 0 | } |
2384 | | |
2385 | | /* Encode the subscribe packet */ |
2386 | 402 | rc = MqttEncode_Unsubscribe(client->tx_buf, client->tx_buf_len, |
2387 | 402 | unsubscribe); |
2388 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2389 | | PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d", |
2390 | | rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_UNSUBSCRIBE), |
2391 | | MQTT_PACKET_TYPE_UNSUBSCRIBE, unsubscribe->packet_id, 0); |
2392 | | #endif |
2393 | 402 | if (rc <= 0) { |
2394 | 68 | MqttWriteStop(client, &unsubscribe->stat); |
2395 | 68 | return rc; |
2396 | 68 | } |
2397 | 334 | client->write.len = rc; |
2398 | | |
2399 | | #ifdef WOLFMQTT_MULTITHREAD |
2400 | | rc = wm_SemLock(&client->lockClient); |
2401 | | if (rc == 0) { |
2402 | | /* inform other threads of expected response */ |
2403 | | rc = MqttClient_RespList_Add(client, |
2404 | | MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK, unsubscribe->packet_id, |
2405 | | &unsubscribe->pendResp, &unsubscribe->ack); |
2406 | | wm_SemUnlock(&client->lockClient); |
2407 | | } |
2408 | | if (rc != 0) { |
2409 | | MqttWriteStop(client, &unsubscribe->stat); |
2410 | | return rc; |
2411 | | } |
2412 | | #endif |
2413 | | |
2414 | 334 | unsubscribe->stat.write = MQTT_MSG_HEADER; |
2415 | 334 | } |
2416 | 334 | if (unsubscribe->stat.write == MQTT_MSG_HEADER) { |
2417 | 334 | int xfer = client->write.len; |
2418 | | |
2419 | | /* Send unsubscribe packet */ |
2420 | 334 | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
2421 | | #ifdef WOLFMQTT_NONBLOCK |
2422 | | if (rc == MQTT_CODE_CONTINUE |
2423 | | #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK |
2424 | | && client->write.total > 0 |
2425 | | #endif |
2426 | | ) { |
2427 | | /* keep send locked and return early */ |
2428 | | return rc; |
2429 | | } |
2430 | | #endif |
2431 | 334 | MqttWriteStop(client, &unsubscribe->stat); |
2432 | 334 | if (rc != xfer) { |
2433 | 256 | MqttClient_CancelMessage(client, (MqttObject*)unsubscribe); |
2434 | 256 | return rc; |
2435 | 256 | } |
2436 | | |
2437 | 78 | unsubscribe->stat.write = MQTT_MSG_WAIT; |
2438 | 78 | } |
2439 | | |
2440 | | /* Wait for unsubscribe ack packet */ |
2441 | 78 | rc = MqttClient_WaitType(client, &unsubscribe->ack, |
2442 | 78 | MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK, unsubscribe->packet_id, |
2443 | 78 | client->cmd_timeout_ms); |
2444 | | #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) |
2445 | | if (rc == MQTT_CODE_CONTINUE) |
2446 | | return rc; |
2447 | | #endif |
2448 | | |
2449 | | #ifdef WOLFMQTT_MULTITHREAD |
2450 | | if (wm_SemLock(&client->lockClient) == 0) { |
2451 | | MqttClient_RespList_Remove(client, &unsubscribe->pendResp); |
2452 | | wm_SemUnlock(&client->lockClient); |
2453 | | } |
2454 | | #endif |
2455 | | |
2456 | 78 | #ifdef WOLFMQTT_V5 |
2457 | 78 | if (unsubscribe->ack.props != NULL) { |
2458 | | /* Release the allocated properties */ |
2459 | 0 | MqttClient_PropsFree(unsubscribe->ack.props); |
2460 | 0 | } |
2461 | 78 | #endif |
2462 | | |
2463 | | /* reset state */ |
2464 | 78 | unsubscribe->stat.write = MQTT_MSG_BEGIN; |
2465 | | |
2466 | 78 | return rc; |
2467 | 334 | } |
2468 | | |
2469 | | int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping) |
2470 | 973 | { |
2471 | 973 | int rc; |
2472 | | |
2473 | | /* Validate required arguments */ |
2474 | 973 | if (client == NULL || ping == NULL) { |
2475 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2476 | 0 | } |
2477 | | |
2478 | 973 | if (ping->stat.write == MQTT_MSG_BEGIN) { |
2479 | | /* Flag write active / lock mutex */ |
2480 | 973 | if ((rc = MqttWriteStart(client, &ping->stat)) != 0) { |
2481 | 0 | return rc; |
2482 | 0 | } |
2483 | | |
2484 | | /* Encode the subscribe packet */ |
2485 | 973 | rc = MqttEncode_Ping(client->tx_buf, client->tx_buf_len, ping); |
2486 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2487 | | PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d", |
2488 | | rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_PING_REQ), |
2489 | | MQTT_PACKET_TYPE_PING_REQ, 0, 0); |
2490 | | #endif |
2491 | 973 | if (rc <= 0) { |
2492 | 0 | MqttWriteStop(client, &ping->stat); |
2493 | 0 | return rc; |
2494 | 0 | } |
2495 | 973 | client->write.len = rc; |
2496 | | |
2497 | | #ifdef WOLFMQTT_MULTITHREAD |
2498 | | rc = wm_SemLock(&client->lockClient); |
2499 | | if (rc == 0) { |
2500 | | /* inform other threads of expected response */ |
2501 | | rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_PING_RESP, 0, |
2502 | | &ping->pendResp, ping); |
2503 | | wm_SemUnlock(&client->lockClient); |
2504 | | } |
2505 | | if (rc != 0) { |
2506 | | MqttWriteStop(client, &ping->stat); |
2507 | | return rc; /* Error locking client */ |
2508 | | } |
2509 | | #endif |
2510 | | |
2511 | 973 | ping->stat.write = MQTT_MSG_HEADER; |
2512 | 973 | } |
2513 | 973 | if (ping->stat.write == MQTT_MSG_HEADER) { |
2514 | 973 | int xfer = client->write.len; |
2515 | | |
2516 | | /* Send ping req packet */ |
2517 | 973 | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
2518 | | #ifdef WOLFMQTT_NONBLOCK |
2519 | | if (rc == MQTT_CODE_CONTINUE |
2520 | | #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK |
2521 | | && client->write.total > 0 |
2522 | | #endif |
2523 | | ) { |
2524 | | /* keep send locked and return early */ |
2525 | | return rc; |
2526 | | } |
2527 | | #endif |
2528 | 973 | MqttWriteStop(client, &ping->stat); |
2529 | 973 | if (rc != xfer) { |
2530 | 174 | MqttClient_CancelMessage(client, (MqttObject*)ping); |
2531 | 174 | return rc; |
2532 | 174 | } |
2533 | | |
2534 | 799 | ping->stat.write = MQTT_MSG_WAIT; |
2535 | 799 | } |
2536 | | |
2537 | | /* Wait for ping resp packet */ |
2538 | 799 | rc = MqttClient_WaitType(client, ping, MQTT_PACKET_TYPE_PING_RESP, 0, |
2539 | 799 | client->cmd_timeout_ms); |
2540 | | #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) |
2541 | | if (rc == MQTT_CODE_CONTINUE) |
2542 | | return rc; |
2543 | | #endif |
2544 | | |
2545 | | #ifdef WOLFMQTT_MULTITHREAD |
2546 | | if (wm_SemLock(&client->lockClient) == 0) { |
2547 | | MqttClient_RespList_Remove(client, &ping->pendResp); |
2548 | | wm_SemUnlock(&client->lockClient); |
2549 | | } |
2550 | | #endif |
2551 | | |
2552 | | /* reset state */ |
2553 | 799 | ping->stat.write = MQTT_MSG_BEGIN; |
2554 | | |
2555 | 799 | return rc; |
2556 | 973 | } |
2557 | | |
2558 | | int MqttClient_Ping(MqttClient *client) |
2559 | 0 | { |
2560 | 0 | return MqttClient_Ping_ex(client, &client->msg.ping); |
2561 | 0 | } |
2562 | | |
2563 | | int MqttClient_Disconnect(MqttClient *client) |
2564 | 0 | { |
2565 | 0 | return MqttClient_Disconnect_ex(client, NULL); |
2566 | 0 | } |
2567 | | |
2568 | | int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect) |
2569 | 0 | { |
2570 | 0 | int rc, xfer; |
2571 | 0 | MqttDisconnect *disconnect = p_disconnect, lcl_disconnect; |
2572 | | |
2573 | | /* Validate required arguments */ |
2574 | 0 | if (client == NULL) { |
2575 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2576 | 0 | } |
2577 | 0 | if (disconnect == NULL) { |
2578 | 0 | disconnect = &lcl_disconnect; |
2579 | 0 | XMEMSET(disconnect, 0, sizeof(*disconnect)); |
2580 | 0 | } |
2581 | |
|
2582 | 0 | if (disconnect->stat.write == MQTT_MSG_BEGIN) { |
2583 | 0 | #ifdef WOLFMQTT_V5 |
2584 | | /* Use specified protocol version if set */ |
2585 | 0 | disconnect->protocol_level = client->protocol_level; |
2586 | 0 | #endif |
2587 | | |
2588 | | /* Flag write active / lock mutex */ |
2589 | 0 | if ((rc = MqttWriteStart(client, &disconnect->stat)) != 0) { |
2590 | 0 | return rc; |
2591 | 0 | } |
2592 | | |
2593 | | /* Encode the disconnect packet */ |
2594 | 0 | rc = MqttEncode_Disconnect(client->tx_buf, client->tx_buf_len, |
2595 | 0 | disconnect); |
2596 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2597 | | PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d", |
2598 | | rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_DISCONNECT), |
2599 | | MQTT_PACKET_TYPE_DISCONNECT, 0, 0); |
2600 | | #endif |
2601 | 0 | if (rc <= 0) { |
2602 | 0 | MqttWriteStop(client, &disconnect->stat); |
2603 | 0 | return rc; |
2604 | 0 | } |
2605 | 0 | client->write.len = rc; |
2606 | |
|
2607 | 0 | disconnect->stat.write = MQTT_MSG_HEADER; |
2608 | 0 | } |
2609 | | |
2610 | | /* Send disconnect packet */ |
2611 | 0 | xfer = client->write.len; |
2612 | 0 | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
2613 | | #ifdef WOLFMQTT_NONBLOCK |
2614 | | /* if disconnect context avail allow partial write in non-blocking mode */ |
2615 | | if (p_disconnect != NULL && rc == MQTT_CODE_CONTINUE |
2616 | | #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK |
2617 | | && client->write.total > 0 |
2618 | | #endif |
2619 | | ) { |
2620 | | /* keep send locked and return early */ |
2621 | | return rc; |
2622 | | } |
2623 | | #endif |
2624 | 0 | MqttWriteStop(client, &disconnect->stat); |
2625 | 0 | if (rc == xfer) { |
2626 | 0 | rc = MQTT_CODE_SUCCESS; |
2627 | 0 | } |
2628 | |
|
2629 | | #if defined(WOLFMQTT_DISCONNECT_CB) && defined(WOLFMQTT_USE_CB_ON_DISCONNECT) |
2630 | | /* Trigger disconnect callback - for intentional disconnect |
2631 | | * This callback may occur on a network failure during an intentional |
2632 | | * disconnect if the transport/socket is not setup yet. */ |
2633 | | if (client->disconnect_cb |
2634 | | #ifdef WOLFMQTT_NONBLOCK |
2635 | | && rc != MQTT_CODE_CONTINUE |
2636 | | #endif |
2637 | | ) { |
2638 | | client->disconnect_cb(client, rc, client->disconnect_ctx); |
2639 | | } |
2640 | | #endif |
2641 | | |
2642 | | /* No response for MQTT disconnect packet */ |
2643 | | |
2644 | | /* reset state */ |
2645 | 0 | disconnect->stat.write = MQTT_MSG_BEGIN; |
2646 | |
|
2647 | 0 | return rc; |
2648 | 0 | } |
2649 | | |
2650 | | #ifdef WOLFMQTT_V5 |
2651 | | int MqttClient_Auth(MqttClient *client, MqttAuth* auth) |
2652 | 0 | { |
2653 | 0 | int rc; |
2654 | | |
2655 | | /* Validate required arguments */ |
2656 | 0 | if (client == NULL) { |
2657 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2658 | 0 | } |
2659 | | |
2660 | 0 | if (auth->stat.write == MQTT_MSG_BEGIN) { |
2661 | | /* Flag write active / lock mutex */ |
2662 | 0 | if ((rc = MqttWriteStart(client, &auth->stat)) != 0) { |
2663 | 0 | return rc; |
2664 | 0 | } |
2665 | | |
2666 | | /* Encode the authentication packet */ |
2667 | 0 | rc = MqttEncode_Auth(client->tx_buf, client->tx_buf_len, auth); |
2668 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2669 | | PRINTF("MqttClient_EncodePacket: Len %d, Type %s (%d), ID %d, QoS %d", |
2670 | | rc, MqttPacket_TypeDesc(MQTT_PACKET_TYPE_AUTH), |
2671 | | MQTT_PACKET_TYPE_AUTH, 0, 0); |
2672 | | #endif |
2673 | 0 | if (rc <= 0) { |
2674 | 0 | MqttWriteStop(client, &auth->stat); |
2675 | 0 | return rc; |
2676 | 0 | } |
2677 | 0 | client->write.len = rc; |
2678 | |
|
2679 | | #ifdef WOLFMQTT_MULTITHREAD |
2680 | | rc = wm_SemLock(&client->lockClient); |
2681 | | if (rc == 0) { |
2682 | | /* inform other threads of expected response */ |
2683 | | rc = MqttClient_RespList_Add(client, MQTT_PACKET_TYPE_AUTH, 0, |
2684 | | &auth->pendResp, auth); |
2685 | | wm_SemUnlock(&client->lockClient); |
2686 | | } |
2687 | | if (rc != 0) { |
2688 | | MqttWriteStop(client, &auth->stat); |
2689 | | return rc; /* Error locking client */ |
2690 | | } |
2691 | | #endif |
2692 | |
|
2693 | 0 | auth->stat.write = MQTT_MSG_HEADER; |
2694 | 0 | } |
2695 | 0 | if (auth->stat.write == MQTT_MSG_BEGIN) { |
2696 | 0 | int xfer = client->write.len; |
2697 | | |
2698 | | /* Send authentication packet */ |
2699 | 0 | rc = MqttPacket_Write(client, client->tx_buf, xfer); |
2700 | | #ifdef WOLFMQTT_NONBLOCK |
2701 | | if (rc == MQTT_CODE_CONTINUE |
2702 | | #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK |
2703 | | && client->write.total > 0 |
2704 | | #endif |
2705 | | ) { |
2706 | | /* keep send locked and return early */ |
2707 | | return rc; |
2708 | | } |
2709 | | #endif |
2710 | 0 | MqttWriteStop(client, &auth->stat); |
2711 | 0 | if (rc != xfer) { |
2712 | 0 | MqttClient_CancelMessage(client, (MqttObject*)auth); |
2713 | 0 | return rc; |
2714 | 0 | } |
2715 | | |
2716 | 0 | auth->stat.write = MQTT_MSG_WAIT; |
2717 | 0 | } |
2718 | | |
2719 | | /* Wait for auth packet */ |
2720 | 0 | rc = MqttClient_WaitType(client, auth, MQTT_PACKET_TYPE_AUTH, 0, |
2721 | 0 | client->cmd_timeout_ms); |
2722 | | #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) |
2723 | | if (rc == MQTT_CODE_CONTINUE) |
2724 | | return rc; |
2725 | | #endif |
2726 | |
|
2727 | | #ifdef WOLFMQTT_MULTITHREAD |
2728 | | if (wm_SemLock(&client->lockClient) == 0) { |
2729 | | MqttClient_RespList_Remove(client, &auth->pendResp); |
2730 | | wm_SemUnlock(&client->lockClient); |
2731 | | } |
2732 | | #endif |
2733 | | |
2734 | | /* reset state */ |
2735 | 0 | auth->stat.write = MQTT_MSG_BEGIN; |
2736 | |
|
2737 | 0 | return rc; |
2738 | 0 | } |
2739 | | |
2740 | | MqttProp* MqttClient_PropsAdd(MqttProp **head) |
2741 | 0 | { |
2742 | 0 | return MqttProps_Add(head); |
2743 | 0 | } |
2744 | | |
2745 | | int MqttClient_PropsFree(MqttProp *head) |
2746 | 0 | { |
2747 | 0 | return MqttProps_Free(head); |
2748 | 0 | } |
2749 | | |
2750 | | #endif /* WOLFMQTT_V5 */ |
2751 | | |
2752 | | int MqttClient_WaitMessage_ex(MqttClient *client, MqttObject* msg, |
2753 | | int timeout_ms) |
2754 | 3.46k | { |
2755 | 3.46k | return MqttClient_WaitType(client, msg, MQTT_PACKET_TYPE_ANY, 0, |
2756 | 3.46k | timeout_ms); |
2757 | 3.46k | } |
2758 | | int MqttClient_WaitMessage(MqttClient *client, int timeout_ms) |
2759 | 3.46k | { |
2760 | 3.46k | if (client == NULL) |
2761 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2762 | 3.46k | return MqttClient_WaitMessage_ex(client, &client->msg, timeout_ms); |
2763 | 3.46k | } |
2764 | | |
2765 | | #if !defined(WOLFMQTT_MULTITHREAD) && !defined(WOLFMQTT_NONBLOCK) |
2766 | | static |
2767 | | #endif |
2768 | | int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) |
2769 | 3.21k | { |
2770 | 3.21k | int rc = MQTT_CODE_SUCCESS; |
2771 | 3.21k | MqttMsgStat* mms_stat; |
2772 | | #ifdef WOLFMQTT_MULTITHREAD |
2773 | | MqttPendResp* tmpResp; |
2774 | | #endif |
2775 | | |
2776 | 3.21k | if (client == NULL || msg == NULL) { |
2777 | 0 | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2778 | 0 | } |
2779 | | |
2780 | | /* all packet type structures must have MqttMsgStat at top */ |
2781 | 3.21k | mms_stat = (MqttMsgStat*)msg; |
2782 | | |
2783 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2784 | | PRINTF("Cancel Msg: %p", msg); |
2785 | | #endif |
2786 | | |
2787 | | /* reset states */ |
2788 | 3.21k | mms_stat->write = MQTT_MSG_BEGIN; |
2789 | 3.21k | mms_stat->read = MQTT_MSG_BEGIN; |
2790 | | |
2791 | | #ifdef WOLFMQTT_MULTITHREAD |
2792 | | /* Remove any pending responses expected */ |
2793 | | rc = wm_SemLock(&client->lockClient); |
2794 | | if (rc != MQTT_CODE_SUCCESS) { |
2795 | | return rc; |
2796 | | } |
2797 | | |
2798 | | for (tmpResp = client->firstPendResp; |
2799 | | tmpResp != NULL; |
2800 | | tmpResp = tmpResp->next) |
2801 | | { |
2802 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2803 | | PRINTF("\tMsg: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d", |
2804 | | tmpResp, tmpResp->packet_obj, |
2805 | | MqttPacket_TypeDesc(tmpResp->packet_type), |
2806 | | tmpResp->packet_type, tmpResp->packet_id, |
2807 | | tmpResp->packetProcessing, tmpResp->packetDone); |
2808 | | #endif |
2809 | | if ((size_t)tmpResp->packet_obj == (size_t)msg || |
2810 | | (size_t)tmpResp - OFFSETOF(MqttMessage, pendResp) == (size_t)msg) { |
2811 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2812 | | PRINTF("Found Cancel Msg: %p (obj %p), Type %s (%d), ID %d, " |
2813 | | "InProc %d, Done %d", |
2814 | | tmpResp, tmpResp->packet_obj, |
2815 | | MqttPacket_TypeDesc(tmpResp->packet_type), |
2816 | | tmpResp->packet_type, tmpResp->packet_id, |
2817 | | tmpResp->packetProcessing, tmpResp->packetDone); |
2818 | | #endif |
2819 | | MqttClient_RespList_Remove(client, tmpResp); |
2820 | | break; |
2821 | | } |
2822 | | } |
2823 | | wm_SemUnlock(&client->lockClient); |
2824 | | #endif /* WOLFMQTT_MULTITHREAD */ |
2825 | | |
2826 | | /* cancel any active flags / locks */ |
2827 | 3.21k | if (mms_stat->isReadActive) { |
2828 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2829 | | PRINTF("Cancel Read Lock"); |
2830 | | #endif |
2831 | 0 | MqttReadStop(client, mms_stat); |
2832 | 0 | } |
2833 | 3.21k | if (mms_stat->isWriteActive) { |
2834 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2835 | | PRINTF("Cancel Write Lock"); |
2836 | | #endif |
2837 | 0 | MqttWriteStop(client, mms_stat); |
2838 | 0 | } |
2839 | | |
2840 | 3.21k | return rc; |
2841 | 3.21k | } |
2842 | | |
2843 | | #ifdef WOLFMQTT_NONBLOCK |
2844 | | static inline int IsMessageActive(MqttObject *msg) |
2845 | | { |
2846 | | return (msg->stat.read != MQTT_MSG_BEGIN || |
2847 | | msg->stat.write != MQTT_MSG_BEGIN); |
2848 | | } |
2849 | | |
2850 | | int MqttClient_IsMessageActive( |
2851 | | MqttClient *client, |
2852 | | MqttObject *msg) |
2853 | | { |
2854 | | int rc; |
2855 | | |
2856 | | /* must supply either client or msg */ |
2857 | | if (client == NULL && msg == NULL) { |
2858 | | return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); |
2859 | | } |
2860 | | |
2861 | | /* if msg is null then client->msg is used */ |
2862 | | if ((client != NULL && &client->msg == msg) || msg == NULL) { |
2863 | | #ifdef WOLFMQTT_MULTITHREAD |
2864 | | rc = wm_SemLock(&client->lockClient); |
2865 | | if (rc == 0) |
2866 | | #endif |
2867 | | { |
2868 | | rc = IsMessageActive(&client->msg); |
2869 | | #ifdef WOLFMQTT_MULTITHREAD |
2870 | | wm_SemUnlock(&client->lockClient); |
2871 | | #endif |
2872 | | } |
2873 | | } |
2874 | | else { |
2875 | | rc = IsMessageActive(msg); |
2876 | | } |
2877 | | return rc; |
2878 | | } |
2879 | | |
2880 | | |
2881 | | #endif /* WOLFMQTT_NONBLOCK */ |
2882 | | |
2883 | | |
2884 | | int MqttClient_NetConnect(MqttClient *client, const char* host, |
2885 | | word16 port, int timeout_ms, int use_tls, MqttTlsCb cb) |
2886 | 2.69k | { |
2887 | 2.69k | return MqttSocket_Connect(client, host, port, timeout_ms, use_tls, cb); |
2888 | 2.69k | } |
2889 | | |
2890 | | int MqttClient_NetDisconnect(MqttClient *client) |
2891 | 104 | { |
2892 | | #ifdef WOLFMQTT_MULTITHREAD |
2893 | | MqttPendResp *tmpResp; |
2894 | | int rc; |
2895 | | #endif |
2896 | | |
2897 | 104 | if (client == NULL) { |
2898 | 0 | return MQTT_CODE_ERROR_BAD_ARG; |
2899 | 0 | } |
2900 | | |
2901 | | #ifdef WOLFMQTT_MULTITHREAD |
2902 | | /* Get client lock on to ensure no other threads are active */ |
2903 | | rc = wm_SemLock(&client->lockClient); |
2904 | | if (rc == 0) { |
2905 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2906 | | PRINTF("Net Disconnect: Removing pending responses"); |
2907 | | #endif |
2908 | | for (tmpResp = client->firstPendResp; |
2909 | | tmpResp != NULL; |
2910 | | tmpResp = tmpResp->next) { |
2911 | | #ifdef WOLFMQTT_DEBUG_CLIENT |
2912 | | PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d", |
2913 | | tmpResp, tmpResp->packet_obj, |
2914 | | MqttPacket_TypeDesc(tmpResp->packet_type), |
2915 | | tmpResp->packet_type, tmpResp->packet_id, |
2916 | | tmpResp->packetProcessing, tmpResp->packetDone); |
2917 | | #endif |
2918 | | MqttClient_RespList_Remove(client, tmpResp); |
2919 | | } |
2920 | | wm_SemUnlock(&client->lockClient); |
2921 | | } |
2922 | | else { |
2923 | | return rc; |
2924 | | } |
2925 | | #endif |
2926 | | |
2927 | 104 | return MqttSocket_Disconnect(client); |
2928 | 104 | } |
2929 | | |
2930 | | int MqttClient_GetProtocolVersion(MqttClient *client) |
2931 | 0 | { |
2932 | 0 | #ifdef WOLFMQTT_V5 |
2933 | 0 | if (client && client->protocol_level == MQTT_CONNECT_PROTOCOL_LEVEL_5) |
2934 | 0 | return MQTT_CONNECT_PROTOCOL_LEVEL_5; |
2935 | | #else |
2936 | | (void)client; |
2937 | | #endif |
2938 | 0 | return MQTT_CONNECT_PROTOCOL_LEVEL_4; |
2939 | 0 | } |
2940 | | const char* MqttClient_GetProtocolVersionString(MqttClient *client) |
2941 | 0 | { |
2942 | 0 | const char* str = NULL; |
2943 | 0 | int ver = MqttClient_GetProtocolVersion(client); |
2944 | 0 | switch (ver) { |
2945 | 0 | case MQTT_CONNECT_PROTOCOL_LEVEL_4: |
2946 | 0 | return "v3.1.1"; |
2947 | 0 | #ifdef WOLFMQTT_V5 |
2948 | 0 | case MQTT_CONNECT_PROTOCOL_LEVEL_5: |
2949 | 0 | return "v5"; |
2950 | 0 | #endif |
2951 | 0 | default: |
2952 | 0 | break; |
2953 | 0 | } |
2954 | 0 | return str; |
2955 | 0 | } |
2956 | | |
2957 | | #ifndef WOLFMQTT_NO_ERROR_STRINGS |
2958 | | const char* MqttClient_ReturnCodeToString(int return_code) |
2959 | 0 | { |
2960 | 0 | switch(return_code) { |
2961 | 0 | case MQTT_CODE_SUCCESS: |
2962 | 0 | return "Success"; |
2963 | 0 | case MQTT_CODE_CONTINUE: |
2964 | 0 | return "Continue"; /* would block */ |
2965 | 0 | case MQTT_CODE_STDIN_WAKE: |
2966 | 0 | return "STDIN Wake"; |
2967 | 0 | case MQTT_CODE_PUB_CONTINUE: |
2968 | 0 | return "Continue calling publish"; /* Chunked publish */ |
2969 | 0 | case MQTT_CODE_ERROR_BAD_ARG: |
2970 | 0 | return "Error (Bad argument)"; |
2971 | 0 | case MQTT_CODE_ERROR_OUT_OF_BUFFER: |
2972 | 0 | return "Error (Out of buffer)"; |
2973 | 0 | case MQTT_CODE_ERROR_MALFORMED_DATA: |
2974 | 0 | return "Error (Malformed Remaining Length)"; |
2975 | 0 | case MQTT_CODE_ERROR_PACKET_TYPE: |
2976 | 0 | return "Error (Packet Type Mismatch)"; |
2977 | 0 | case MQTT_CODE_ERROR_PACKET_ID: |
2978 | 0 | return "Error (Packet Id Mismatch)"; |
2979 | 0 | case MQTT_CODE_ERROR_TLS_CONNECT: |
2980 | 0 | return "Error (TLS Connect)"; |
2981 | 0 | case MQTT_CODE_ERROR_TIMEOUT: |
2982 | 0 | return "Error (Timeout)"; |
2983 | 0 | case MQTT_CODE_ERROR_NETWORK: |
2984 | 0 | return "Error (Network)"; |
2985 | 0 | case MQTT_CODE_ERROR_MEMORY: |
2986 | 0 | return "Error (Memory)"; |
2987 | 0 | case MQTT_CODE_ERROR_STAT: |
2988 | 0 | return "Error (State)"; |
2989 | 0 | case MQTT_CODE_ERROR_PROPERTY: |
2990 | 0 | return "Error (Property)"; |
2991 | 0 | case MQTT_CODE_ERROR_SERVER_PROP: |
2992 | 0 | return "Error (Server Property)"; |
2993 | 0 | case MQTT_CODE_ERROR_CALLBACK: |
2994 | 0 | return "Error (Error in Callback)"; |
2995 | 0 | case MQTT_CODE_ERROR_SYSTEM: |
2996 | 0 | return "Error (System resource failed)"; |
2997 | 0 | case MQTT_CODE_ERROR_NOT_FOUND: |
2998 | 0 | return "Error (Not found)"; |
2999 | | #if defined(ENABLE_MQTT_CURL) |
3000 | | case MQTT_CODE_ERROR_CURL: |
3001 | | return "Error (libcurl)"; |
3002 | | #endif |
3003 | | |
3004 | 0 | #ifdef WOLFMQTT_V5 |
3005 | | /* MQTT v5 Reason code strings */ |
3006 | 0 | case MQTT_REASON_UNSPECIFIED_ERR: |
3007 | 0 | return "Unspecified error"; |
3008 | 0 | case MQTT_REASON_MALFORMED_PACKET: |
3009 | 0 | return "Malformed Packet"; |
3010 | 0 | case MQTT_REASON_PROTOCOL_ERR: |
3011 | 0 | return "Protocol Error"; |
3012 | 0 | case MQTT_REASON_IMPL_SPECIFIC_ERR: |
3013 | 0 | return "Implementation specific error"; |
3014 | 0 | case MQTT_REASON_UNSUP_PROTO_VER: |
3015 | 0 | return "Unsupported Protocol Version"; |
3016 | 0 | case MQTT_REASON_CLIENT_ID_NOT_VALID: |
3017 | 0 | return "Client Identifier not valid"; |
3018 | 0 | case MQTT_REASON_BAD_USER_OR_PASS: |
3019 | 0 | return "Bad User Name or Password"; |
3020 | 0 | case MQTT_REASON_NOT_AUTHORIZED: |
3021 | 0 | return "Not authorized"; |
3022 | 0 | case MQTT_REASON_SERVER_UNAVAILABLE: |
3023 | 0 | return "Server unavailable"; |
3024 | 0 | case MQTT_REASON_SERVER_BUSY: |
3025 | 0 | return "Server busy"; |
3026 | 0 | case MQTT_REASON_BANNED: |
3027 | 0 | return "Banned"; |
3028 | 0 | case MQTT_REASON_SERVER_SHUTTING_DOWN: |
3029 | 0 | return "Server shutting down"; |
3030 | 0 | case MQTT_REASON_BAD_AUTH_METHOD: |
3031 | 0 | return "Bad authentication method"; |
3032 | 0 | case MQTT_REASON_KEEP_ALIVE_TIMEOUT: |
3033 | 0 | return "Keep Alive timeout"; |
3034 | 0 | case MQTT_REASON_SESSION_TAKEN_OVER: |
3035 | 0 | return "Session taken over"; |
3036 | 0 | case MQTT_REASON_TOPIC_FILTER_INVALID: |
3037 | 0 | return "Topic Filter invalid"; |
3038 | 0 | case MQTT_REASON_TOPIC_NAME_INVALID: |
3039 | 0 | return "Topic Name invalid"; |
3040 | 0 | case MQTT_REASON_PACKET_ID_IN_USE: |
3041 | 0 | return "Packet Identifier in use"; |
3042 | 0 | case MQTT_REASON_PACKET_ID_NOT_FOUND: |
3043 | 0 | return "Packet Identifier not found"; |
3044 | 0 | case MQTT_REASON_RX_MAX_EXCEEDED: |
3045 | 0 | return "Receive Maximum exceeded"; |
3046 | 0 | case MQTT_REASON_TOPIC_ALIAS_INVALID: |
3047 | 0 | return "Topic Alias invalid"; |
3048 | 0 | case MQTT_REASON_PACKET_TOO_LARGE: |
3049 | 0 | return "Packet too large"; |
3050 | 0 | case MQTT_REASON_MSG_RATE_TOO_HIGH: |
3051 | 0 | return "Message rate too high"; |
3052 | 0 | case MQTT_REASON_QUOTA_EXCEEDED: |
3053 | 0 | return "Quota exceeded"; |
3054 | 0 | case MQTT_REASON_ADMIN_ACTION: |
3055 | 0 | return "Administrative action"; |
3056 | 0 | case MQTT_REASON_PAYLOAD_FORMAT_INVALID: |
3057 | 0 | return "Payload format invalid"; |
3058 | 0 | case MQTT_REASON_RETAIN_NOT_SUPPORTED: |
3059 | 0 | return "Retain not supported"; |
3060 | 0 | case MQTT_REASON_QOS_NOT_SUPPORTED: |
3061 | 0 | return "QoS not supported"; |
3062 | 0 | case MQTT_REASON_USE_ANOTHER_SERVER: |
3063 | 0 | return "Use another server"; |
3064 | 0 | case MQTT_REASON_SERVER_MOVED: |
3065 | 0 | return "Server moved"; |
3066 | 0 | case MQTT_REASON_SS_NOT_SUPPORTED: |
3067 | 0 | return "Shared Subscriptions not supported"; |
3068 | 0 | case MQTT_REASON_CON_RATE_EXCEED: |
3069 | 0 | return "Connection rate exceeded"; |
3070 | 0 | case MQTT_REASON_MAX_CON_TIME: |
3071 | 0 | return "Maximum connect time"; |
3072 | 0 | case MQTT_REASON_SUB_ID_NOT_SUP: |
3073 | 0 | return "Subscription Identifiers not supported"; |
3074 | 0 | case MQTT_REASON_WILDCARD_SUB_NOT_SUP: |
3075 | 0 | return "Wildcard Subscriptions not supported"; |
3076 | 0 | #endif |
3077 | 0 | } |
3078 | 0 | return "Unknown"; |
3079 | 0 | } |
3080 | | #endif /* !WOLFMQTT_NO_ERROR_STRINGS */ |
3081 | | |
3082 | | word32 MqttClient_Flags(MqttClient *client, word32 mask, word32 flags) |
3083 | 36.7k | { |
3084 | 36.7k | word32 ret = 0; |
3085 | 36.7k | if (client != NULL) { |
3086 | | #ifdef WOLFMQTT_MULTITHREAD |
3087 | | /* Get client lock on to ensure no other threads are active */ |
3088 | | if (wm_SemLock(&client->lockClient) == 0) |
3089 | | #endif |
3090 | 36.7k | { |
3091 | 36.7k | client->flags &= ~mask; |
3092 | 36.7k | client->flags |= flags; |
3093 | 36.7k | ret = client->flags; |
3094 | | #ifdef WOLFMQTT_MULTITHREAD |
3095 | | wm_SemUnlock(&client->lockClient); |
3096 | | #endif |
3097 | 36.7k | } |
3098 | 36.7k | } |
3099 | 36.7k | return ret; |
3100 | 36.7k | } |