/src/mosquitto/src/database.c
Line | Count | Source |
1 | | /* |
2 | | Copyright (c) 2009-2021 Roger Light <roger@atchoo.org> |
3 | | |
4 | | All rights reserved. This program and the accompanying materials |
5 | | are made available under the terms of the Eclipse Public License 2.0 |
6 | | and Eclipse Distribution License v1.0 which accompany this distribution. |
7 | | |
8 | | The Eclipse Public License is available at |
9 | | https://www.eclipse.org/legal/epl-2.0/ |
10 | | and the Eclipse Distribution License is available at |
11 | | http://www.eclipse.org/org/documents/edl-v10.php. |
12 | | |
13 | | SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
14 | | |
15 | | Contributors: |
16 | | Roger Light - initial implementation and documentation. |
17 | | */ |
18 | | |
19 | | #include "config.h" |
20 | | |
21 | | #include <assert.h> |
22 | | #include <stdio.h> |
23 | | #include <utlist.h> |
24 | | |
25 | | #include "mosquitto_broker_internal.h" |
26 | | #include "send_mosq.h" |
27 | | #include "sys_tree.h" |
28 | | #include "util_mosq.h" |
29 | | |
30 | | |
31 | | /** |
32 | | * Is this context ready to take more in flight messages right now? |
33 | | * @param context the client context of interest |
34 | | * @param qos qos for the packet of interest |
35 | | * @return true if more in flight are allowed. |
36 | | */ |
37 | | bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos) |
38 | 0 | { |
39 | 0 | struct mosquitto_msg_data *msgs; |
40 | 0 | bool valid_bytes; |
41 | 0 | bool valid_count; |
42 | |
|
43 | 0 | if(dir == mosq_md_out){ |
44 | 0 | msgs = &context->msgs_out; |
45 | 0 | }else{ |
46 | 0 | msgs = &context->msgs_in; |
47 | 0 | } |
48 | |
|
49 | 0 | if(msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0){ |
50 | 0 | return true; |
51 | 0 | } |
52 | | |
53 | 0 | if(qos == 0){ |
54 | | /* Deliver QoS 0 messages unless the queue is already full. |
55 | | * For QoS 0 messages the choice is either "inflight" or dropped. |
56 | | * There is no queueing option, unless the client is offline and |
57 | | * queue_qos0_messages is enabled. |
58 | | */ |
59 | 0 | if(db.config->max_queued_messages == 0 && db.config->max_inflight_bytes == 0){ |
60 | 0 | return true; |
61 | 0 | } |
62 | 0 | valid_bytes = ((msgs->inflight_bytes - (ssize_t)db.config->max_inflight_bytes) < (ssize_t)db.config->max_queued_bytes); |
63 | 0 | if(dir == mosq_md_out){ |
64 | 0 | valid_count = context->out_packet_count < db.config->max_queued_messages; |
65 | 0 | }else{ |
66 | 0 | valid_count = msgs->inflight_count - msgs->inflight_maximum < db.config->max_queued_messages; |
67 | 0 | } |
68 | |
|
69 | 0 | if(db.config->max_queued_messages == 0){ |
70 | 0 | return valid_bytes; |
71 | 0 | } |
72 | 0 | if(db.config->max_queued_bytes == 0){ |
73 | 0 | return valid_count; |
74 | 0 | } |
75 | 0 | }else{ |
76 | 0 | valid_bytes = (ssize_t)msgs->inflight_bytes12 < (ssize_t)db.config->max_inflight_bytes; |
77 | 0 | valid_count = msgs->inflight_quota > 0; |
78 | |
|
79 | 0 | if(msgs->inflight_maximum == 0){ |
80 | 0 | return valid_bytes; |
81 | 0 | } |
82 | 0 | if(db.config->max_inflight_bytes == 0){ |
83 | 0 | return valid_count; |
84 | 0 | } |
85 | 0 | } |
86 | | |
87 | 0 | return valid_bytes && valid_count; |
88 | 0 | } |
89 | | |
90 | | |
91 | | /** |
92 | | * For a given client context, are more messages allowed to be queued? |
93 | | * It is assumed that inflight checks and queue_qos0 checks have already |
94 | | * been made. |
95 | | * @param context client of interest |
96 | | * @param qos destination qos for the packet of interest |
97 | | * @return true if queuing is allowed, false if should be dropped |
98 | | */ |
99 | | bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data) |
100 | 0 | { |
101 | 0 | int source_count; |
102 | 0 | int adjust_count; |
103 | 0 | long source_bytes; |
104 | 0 | ssize_t adjust_bytes = (ssize_t)db.config->max_inflight_bytes; |
105 | 0 | bool valid_bytes; |
106 | 0 | bool valid_count; |
107 | |
|
108 | 0 | if(db.config->max_queued_messages == 0 && db.config->max_queued_bytes == 0){ |
109 | 0 | return true; |
110 | 0 | } |
111 | | |
112 | 0 | if(qos == 0 && db.config->queue_qos0_messages == false){ |
113 | 0 | return false; /* This case is handled in db__ready_for_flight() */ |
114 | 0 | }else{ |
115 | 0 | source_bytes = (ssize_t)msg_data->queued_bytes12; |
116 | 0 | source_count = msg_data->queued_count12; |
117 | 0 | } |
118 | 0 | adjust_count = msg_data->inflight_maximum; |
119 | | |
120 | | /* nothing in flight for offline clients */ |
121 | 0 | if(!net__is_connected(context)){ |
122 | 0 | adjust_bytes = 0; |
123 | 0 | adjust_count = 0; |
124 | 0 | } |
125 | |
|
126 | 0 | valid_bytes = (source_bytes - (ssize_t)adjust_bytes) < (ssize_t)db.config->max_queued_bytes; |
127 | 0 | valid_count = source_count - adjust_count < db.config->max_queued_messages; |
128 | |
|
129 | 0 | if(db.config->max_queued_bytes == 0){ |
130 | 0 | return valid_count; |
131 | 0 | } |
132 | 0 | if(db.config->max_queued_messages == 0){ |
133 | 0 | return valid_bytes; |
134 | 0 | } |
135 | | |
136 | 0 | return valid_bytes && valid_count; |
137 | 0 | } |
138 | | |
139 | | |
140 | | void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) |
141 | 0 | { |
142 | 0 | msg_data->inflight_count++; |
143 | 0 | msg_data->inflight_bytes += client_msg->base_msg->data.payloadlen; |
144 | 0 | if(client_msg->data.qos != 0){ |
145 | 0 | msg_data->inflight_count12++; |
146 | 0 | msg_data->inflight_bytes12 += client_msg->base_msg->data.payloadlen; |
147 | 0 | } |
148 | 0 | } |
149 | | |
150 | | |
151 | | static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) |
152 | 0 | { |
153 | 0 | msg_data->inflight_count--; |
154 | 0 | msg_data->inflight_bytes -= client_msg->base_msg->data.payloadlen; |
155 | 0 | if(client_msg->data.qos != 0){ |
156 | 0 | msg_data->inflight_count12--; |
157 | 0 | msg_data->inflight_bytes12 -= client_msg->base_msg->data.payloadlen; |
158 | 0 | } |
159 | 0 | } |
160 | | |
161 | | |
162 | | void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) |
163 | 0 | { |
164 | 0 | msg_data->queued_count++; |
165 | 0 | msg_data->queued_bytes += client_msg->base_msg->data.payloadlen; |
166 | 0 | if(client_msg->data.qos != 0){ |
167 | 0 | msg_data->queued_count12++; |
168 | 0 | msg_data->queued_bytes12 += client_msg->base_msg->data.payloadlen; |
169 | 0 | } |
170 | 0 | } |
171 | | |
172 | | |
173 | | static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) |
174 | 0 | { |
175 | 0 | msg_data->queued_count--; |
176 | 0 | msg_data->queued_bytes -= client_msg->base_msg->data.payloadlen; |
177 | 0 | if(client_msg->data.qos != 0){ |
178 | 0 | msg_data->queued_count12--; |
179 | 0 | msg_data->queued_bytes12 -= client_msg->base_msg->data.payloadlen; |
180 | 0 | } |
181 | 0 | } |
182 | | |
183 | | |
184 | | int db__open(struct mosquitto__config *config) |
185 | 0 | { |
186 | 0 | if(!config){ |
187 | 0 | return MOSQ_ERR_INVAL; |
188 | 0 | } |
189 | | |
190 | 0 | db.contexts_by_id = NULL; |
191 | 0 | db.contexts_by_sock = NULL; |
192 | 0 | db.contexts_for_free = NULL; |
193 | 0 | #ifdef WITH_BRIDGE |
194 | 0 | db.bridges = NULL; |
195 | 0 | db.bridge_count = 0; |
196 | 0 | #endif |
197 | | |
198 | | /* Initialize the hashtable */ |
199 | 0 | db.clientid_index_hash = NULL; |
200 | |
|
201 | 0 | db.normal_subs = NULL; |
202 | 0 | db.shared_subs = NULL; |
203 | |
|
204 | 0 | sub__init(); |
205 | 0 | retain__init(); |
206 | |
|
207 | 0 | db.config->security_options.unpwd = NULL; |
208 | |
|
209 | 0 | #ifdef WITH_PERSISTENCE |
210 | 0 | if(persist__restore()){ |
211 | 0 | return 1; |
212 | 0 | } |
213 | 0 | #endif |
214 | | |
215 | 0 | return MOSQ_ERR_SUCCESS; |
216 | 0 | } |
217 | | |
218 | | |
219 | | static void subhier_clean(struct mosquitto__subhier **subhier) |
220 | 0 | { |
221 | 0 | struct mosquitto__subhier *peer, *subhier_tmp; |
222 | 0 | struct mosquitto__subleaf *leaf, *nextleaf; |
223 | |
|
224 | 0 | HASH_ITER(hh, *subhier, peer, subhier_tmp){ |
225 | 0 | leaf = peer->subs; |
226 | 0 | while(leaf){ |
227 | 0 | nextleaf = leaf->next; |
228 | 0 | mosquitto_FREE(leaf); |
229 | 0 | leaf = nextleaf; |
230 | 0 | } |
231 | 0 | subhier_clean(&peer->children); |
232 | |
|
233 | 0 | HASH_DELETE(hh, *subhier, peer); |
234 | 0 | mosquitto_FREE(peer); |
235 | 0 | } |
236 | 0 | } |
237 | | |
238 | | |
239 | | int db__close(void) |
240 | 0 | { |
241 | 0 | subhier_clean(&db.normal_subs); |
242 | 0 | subhier_clean(&db.shared_subs); |
243 | 0 | retain__clean(&db.retains); |
244 | 0 | db__msg_store_clean(); |
245 | |
|
246 | 0 | return MOSQ_ERR_SUCCESS; |
247 | 0 | } |
248 | | |
249 | | |
250 | | int db__msg_store_add(struct mosquitto__base_msg *base_msg) |
251 | 0 | { |
252 | 0 | struct mosquitto__base_msg *found; |
253 | 0 | unsigned hashv; |
254 | |
|
255 | 0 | HASH_VALUE(&base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv); |
256 | 0 | HASH_FIND_BYHASHVALUE(hh, db.msg_store, &base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv, found); |
257 | 0 | if(found == NULL){ |
258 | 0 | HASH_ADD_KEYPTR_BYHASHVALUE(hh, db.msg_store, &base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv, base_msg); |
259 | 0 | return MOSQ_ERR_SUCCESS; |
260 | 0 | }else{ |
261 | 0 | return MOSQ_ERR_ALREADY_EXISTS; |
262 | 0 | } |
263 | 0 | } |
264 | | |
265 | | |
266 | | void db__msg_store_free(struct mosquitto__base_msg *base_msg) |
267 | 0 | { |
268 | 0 | mosquitto_FREE(base_msg->data.source_id); |
269 | 0 | mosquitto_FREE(base_msg->data.source_username); |
270 | 0 | if(base_msg->dest_ids){ |
271 | 0 | for(int i=0; i<base_msg->dest_id_count; i++){ |
272 | 0 | mosquitto_FREE(base_msg->dest_ids[i]); |
273 | 0 | } |
274 | 0 | mosquitto_FREE(base_msg->dest_ids); |
275 | 0 | } |
276 | 0 | mosquitto_FREE(base_msg->data.topic); |
277 | 0 | mosquitto_property_free_all(&base_msg->data.properties); |
278 | 0 | mosquitto_FREE(base_msg->data.payload); |
279 | 0 | mosquitto_FREE(base_msg); |
280 | 0 | } |
281 | | |
282 | | |
283 | | void db__msg_store_remove(struct mosquitto__base_msg *base_msg, bool notify) |
284 | 0 | { |
285 | 0 | if(base_msg == NULL){ |
286 | 0 | return; |
287 | 0 | } |
288 | 0 | HASH_DELETE(hh, db.msg_store, base_msg); |
289 | 0 | db.msg_store_count--; |
290 | 0 | db.msg_store_bytes -= base_msg->data.payloadlen; |
291 | 0 | if(notify == true){ |
292 | 0 | plugin_persist__handle_base_msg_delete(base_msg); |
293 | 0 | } |
294 | 0 | db__msg_store_free(base_msg); |
295 | 0 | } |
296 | | |
297 | | |
298 | | void db__msg_store_clean(void) |
299 | 0 | { |
300 | 0 | struct mosquitto__base_msg *base_msg, *base_msg_tmp; |
301 | |
|
302 | 0 | HASH_ITER(hh, db.msg_store, base_msg, base_msg_tmp){ |
303 | 0 | db__msg_store_remove(base_msg, false); |
304 | 0 | } |
305 | 0 | } |
306 | | |
307 | | |
308 | | void db__msg_store_ref_inc(struct mosquitto__base_msg *base_msg) |
309 | 0 | { |
310 | 0 | base_msg->ref_count++; |
311 | 0 | } |
312 | | |
313 | | |
314 | | void db__msg_store_ref_dec(struct mosquitto__base_msg **base_msg) |
315 | 0 | { |
316 | 0 | (*base_msg)->ref_count--; |
317 | 0 | if((*base_msg)->ref_count == 0){ |
318 | 0 | db__msg_store_remove(*base_msg, true); |
319 | 0 | *base_msg = NULL; |
320 | 0 | } |
321 | 0 | } |
322 | | |
323 | | |
324 | | void db__msg_store_compact(void) |
325 | 0 | { |
326 | 0 | struct mosquitto__base_msg *base_msg, *base_msg_tmp; |
327 | |
|
328 | 0 | HASH_ITER(hh, db.msg_store, base_msg, base_msg_tmp){ |
329 | 0 | if(base_msg->ref_count < 1){ |
330 | 0 | db__msg_store_remove(base_msg, true); |
331 | 0 | } |
332 | 0 | } |
333 | 0 | } |
334 | | |
335 | | |
336 | | static void db__message_remove_inflight(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *item) |
337 | 0 | { |
338 | 0 | if(!context || !msg_data || !item){ |
339 | 0 | return; |
340 | 0 | } |
341 | | |
342 | 0 | plugin_persist__handle_client_msg_delete(context, item); |
343 | |
|
344 | 0 | DL_DELETE(msg_data->inflight, item); |
345 | 0 | if(item->base_msg){ |
346 | 0 | db__msg_remove_from_inflight_stats(msg_data, item); |
347 | 0 | db__msg_store_ref_dec(&item->base_msg); |
348 | 0 | } |
349 | |
|
350 | 0 | mosquitto_FREE(item); |
351 | 0 | } |
352 | | |
353 | | |
354 | | static void db__message_remove_queued(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *item) |
355 | 0 | { |
356 | 0 | if(!context || !msg_data || !item){ |
357 | 0 | return; |
358 | 0 | } |
359 | | |
360 | 0 | plugin_persist__handle_client_msg_delete(context, item); |
361 | |
|
362 | 0 | DL_DELETE(msg_data->queued, item); |
363 | 0 | if(item->base_msg){ |
364 | 0 | db__msg_remove_from_queued_stats(msg_data, item); |
365 | 0 | db__msg_store_ref_dec(&item->base_msg); |
366 | 0 | } |
367 | |
|
368 | 0 | mosquitto_FREE(item); |
369 | 0 | } |
370 | | |
371 | | |
372 | | static void db__fill_inflight_out_from_queue(struct mosquitto *context) |
373 | 0 | { |
374 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
375 | |
|
376 | 0 | DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ |
377 | 0 | if(!db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){ |
378 | 0 | return; |
379 | 0 | } |
380 | 0 | switch(client_msg->data.qos){ |
381 | 0 | case 0: |
382 | 0 | client_msg->data.state = mosq_ms_publish_qos0; |
383 | 0 | break; |
384 | 0 | case 1: |
385 | 0 | client_msg->data.state = mosq_ms_publish_qos1; |
386 | 0 | break; |
387 | 0 | case 2: |
388 | 0 | client_msg->data.state = mosq_ms_publish_qos2; |
389 | 0 | break; |
390 | 0 | } |
391 | 0 | if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ |
392 | 0 | db__message_remove_queued(context, &context->msgs_out, client_msg); |
393 | 0 | continue; |
394 | 0 | } |
395 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
396 | 0 | db__message_dequeue_first(context, &context->msgs_out); |
397 | 0 | } |
398 | 0 | } |
399 | | |
400 | | |
401 | | void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data) |
402 | 0 | { |
403 | 0 | struct mosquitto__client_msg *client_msg; |
404 | |
|
405 | 0 | UNUSED(context); |
406 | |
|
407 | 0 | client_msg = msg_data->queued; |
408 | 0 | DL_DELETE(msg_data->queued, client_msg); |
409 | 0 | DL_APPEND(msg_data->inflight, client_msg); |
410 | 0 | if(msg_data->inflight_quota > 0){ |
411 | 0 | msg_data->inflight_quota--; |
412 | 0 | } |
413 | |
|
414 | 0 | db__msg_remove_from_queued_stats(msg_data, client_msg); |
415 | 0 | db__msg_add_to_inflight_stats(msg_data, client_msg); |
416 | 0 | } |
417 | | |
418 | | |
419 | | int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos) |
420 | 0 | { |
421 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
422 | 0 | bool deleted = false; |
423 | |
|
424 | 0 | if(!context){ |
425 | 0 | return MOSQ_ERR_INVAL; |
426 | 0 | } |
427 | | |
428 | 0 | DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ |
429 | 0 | if(client_msg->data.mid == mid){ |
430 | 0 | if(client_msg->data.qos != qos){ |
431 | 0 | return MOSQ_ERR_PROTOCOL; |
432 | 0 | }else if(qos == 2 && client_msg->data.state != expect_state && expect_state != mosq_ms_any){ |
433 | 0 | return MOSQ_ERR_PROTOCOL; |
434 | 0 | } |
435 | 0 | db__message_remove_inflight(context, &context->msgs_out, client_msg); |
436 | 0 | deleted = true; |
437 | 0 | break; |
438 | 0 | } |
439 | 0 | } |
440 | | |
441 | 0 | if(deleted == false){ |
442 | 0 | DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ |
443 | 0 | if(client_msg->data.mid == mid){ |
444 | 0 | if(client_msg->data.qos != qos){ |
445 | 0 | return MOSQ_ERR_PROTOCOL; |
446 | 0 | }else if(qos == 2 && client_msg->data.state != expect_state && expect_state != mosq_ms_any){ |
447 | 0 | return MOSQ_ERR_PROTOCOL; |
448 | 0 | } |
449 | 0 | db__message_remove_queued(context, &context->msgs_out, client_msg); |
450 | 0 | break; |
451 | 0 | } |
452 | 0 | } |
453 | 0 | } |
454 | 0 | db__fill_inflight_out_from_queue(context); |
455 | 0 | #ifdef WITH_PERSISTENCE |
456 | 0 | db.persistence_changes++; |
457 | 0 | #endif |
458 | |
|
459 | 0 | return db__message_write_inflight_out_latest(context); |
460 | 0 | } |
461 | | |
462 | | |
463 | | /* Only for QoS 2 messages */ |
464 | | int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto__base_msg *base_msg, bool persist) |
465 | 0 | { |
466 | 0 | struct mosquitto__client_msg *client_msg; |
467 | 0 | struct mosquitto_msg_data *msg_data; |
468 | 0 | enum mosquitto_msg_state state = mosq_ms_invalid; |
469 | 0 | int rc = 0; |
470 | |
|
471 | 0 | assert(base_msg); |
472 | 0 | if(!context){ |
473 | 0 | return MOSQ_ERR_INVAL; |
474 | 0 | } |
475 | 0 | if(!context->id){ |
476 | | /* Protect against unlikely "client is disconnected but not entirely freed" scenario */ |
477 | 0 | return MOSQ_ERR_SUCCESS; |
478 | |
|
479 | 0 | } |
480 | 0 | msg_data = &context->msgs_in; |
481 | |
|
482 | 0 | if(db__ready_for_flight(context, mosq_md_in, base_msg->data.qos)){ |
483 | 0 | state = mosq_ms_wait_for_pubrel; |
484 | 0 | }else if(base_msg->data.qos != 0 && db__ready_for_queue(context, base_msg->data.qos, msg_data)){ |
485 | 0 | state = mosq_ms_queued; |
486 | 0 | rc = 2; |
487 | 0 | }else{ |
488 | | /* Dropping message due to full queue. */ |
489 | 0 | if(context->is_dropping == false){ |
490 | 0 | context->is_dropping = true; |
491 | 0 | log__printf(NULL, MOSQ_LOG_NOTICE, |
492 | 0 | "Outgoing messages are being dropped for client %s.", |
493 | 0 | context->id); |
494 | 0 | } |
495 | 0 | metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1); |
496 | 0 | context->stats.messages_dropped++; |
497 | |
|
498 | 0 | return 2; |
499 | 0 | } |
500 | | |
501 | 0 | assert(state != mosq_ms_invalid); |
502 | |
|
503 | 0 | #ifdef WITH_PERSISTENCE |
504 | 0 | if(state == mosq_ms_queued){ |
505 | 0 | db.persistence_changes++; |
506 | 0 | } |
507 | 0 | #endif |
508 | |
|
509 | 0 | client_msg = mosquitto_malloc(sizeof(struct mosquitto__client_msg)); |
510 | 0 | if(!client_msg){ |
511 | 0 | return MOSQ_ERR_NOMEM; |
512 | 0 | } |
513 | 0 | client_msg->prev = NULL; |
514 | 0 | client_msg->next = NULL; |
515 | 0 | if(cmsg_id){ |
516 | 0 | client_msg->data.cmsg_id = cmsg_id; |
517 | 0 | }else{ |
518 | 0 | client_msg->data.cmsg_id = ++context->last_cmsg_id; |
519 | 0 | } |
520 | 0 | client_msg->base_msg = base_msg; |
521 | 0 | db__msg_store_ref_inc(client_msg->base_msg); |
522 | 0 | client_msg->data.mid = base_msg->data.source_mid; |
523 | 0 | client_msg->data.direction = mosq_md_in; |
524 | 0 | client_msg->data.state = (enum mosquitto_msg_state)state; |
525 | 0 | client_msg->data.dup = false; |
526 | 0 | if(base_msg->data.qos > context->max_qos){ |
527 | 0 | client_msg->data.qos = context->max_qos; |
528 | 0 | }else{ |
529 | 0 | client_msg->data.qos = base_msg->data.qos; |
530 | 0 | } |
531 | 0 | client_msg->data.retain = base_msg->data.retain; |
532 | 0 | client_msg->data.subscription_identifier = 0; |
533 | |
|
534 | 0 | if(state == mosq_ms_queued){ |
535 | 0 | DL_APPEND(msg_data->queued, client_msg); |
536 | 0 | db__msg_add_to_queued_stats(msg_data, client_msg); |
537 | 0 | }else{ |
538 | 0 | DL_APPEND(msg_data->inflight, client_msg); |
539 | 0 | db__msg_add_to_inflight_stats(msg_data, client_msg); |
540 | 0 | } |
541 | |
|
542 | 0 | if(persist && context->is_persisted){ |
543 | 0 | plugin_persist__handle_base_msg_add(client_msg->base_msg); |
544 | 0 | plugin_persist__handle_client_msg_add(context, client_msg); |
545 | 0 | } |
546 | |
|
547 | 0 | if(client_msg->base_msg->data.qos > 0){ |
548 | 0 | util__decrement_receive_quota(context); |
549 | 0 | } |
550 | 0 | return rc; |
551 | 0 | } |
552 | | |
553 | | |
554 | | int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto__base_msg *base_msg, uint32_t subscription_identifier, bool update, bool persist) |
555 | 0 | { |
556 | 0 | struct mosquitto__client_msg *client_msg; |
557 | 0 | struct mosquitto_msg_data *msg_data; |
558 | 0 | enum mosquitto_msg_state state = mosq_ms_invalid; |
559 | 0 | int rc = 0; |
560 | 0 | char **dest_ids; |
561 | |
|
562 | 0 | assert(base_msg); |
563 | 0 | if(!context){ |
564 | 0 | return MOSQ_ERR_INVAL; |
565 | 0 | } |
566 | 0 | if(!context->id){ |
567 | | /* Protect against unlikely "client is disconnected but not entirely freed" scenario */ |
568 | 0 | return MOSQ_ERR_SUCCESS; |
569 | |
|
570 | 0 | } |
571 | 0 | context->stats.messages_sent++; |
572 | |
|
573 | 0 | msg_data = &context->msgs_out; |
574 | | |
575 | | /* Check whether we've already sent this message to this client |
576 | | * for outgoing messages only. |
577 | | * If retain==true then this is a stale retained message and so should be |
578 | | * sent regardless. FIXME - this does mean retained messages will received |
579 | | * multiple times for overlapping subscriptions, although this is only the |
580 | | * case for SUBSCRIPTION with multiple subs in so is a minor concern. |
581 | | */ |
582 | 0 | if(context->protocol != mosq_p_mqtt5 |
583 | 0 | && db.config->allow_duplicate_messages == false |
584 | 0 | && retain == false && base_msg->dest_ids){ |
585 | |
|
586 | 0 | for(int i=0; i<base_msg->dest_id_count; i++){ |
587 | 0 | if(base_msg->dest_ids[i] && !strcmp(base_msg->dest_ids[i], context->id)){ |
588 | | /* We have already sent this message to this client. */ |
589 | 0 | return MOSQ_ERR_SUCCESS; |
590 | 0 | } |
591 | 0 | } |
592 | 0 | } |
593 | 0 | if(!net__is_connected(context)){ |
594 | | /* Client is not connected only queue messages with QoS>0. */ |
595 | 0 | if(qos == 0 && !db.config->queue_qos0_messages){ |
596 | 0 | if(!context->bridge){ |
597 | 0 | return 2; |
598 | 0 | }else{ |
599 | 0 | if(context->bridge->start_type != bst_lazy){ |
600 | 0 | return 2; |
601 | 0 | } |
602 | 0 | } |
603 | 0 | } |
604 | 0 | if(context->bridge && context->bridge->clean_start_local == true){ |
605 | 0 | return 2; |
606 | 0 | } |
607 | 0 | } |
608 | | |
609 | 0 | if(net__is_connected(context)){ |
610 | 0 | if(db__ready_for_flight(context, mosq_md_out, qos)){ |
611 | 0 | switch(qos){ |
612 | 0 | case 0: |
613 | 0 | state = mosq_ms_publish_qos0; |
614 | 0 | break; |
615 | 0 | case 1: |
616 | 0 | state = mosq_ms_publish_qos1; |
617 | 0 | break; |
618 | 0 | case 2: |
619 | 0 | state = mosq_ms_publish_qos2; |
620 | 0 | break; |
621 | 0 | } |
622 | 0 | }else if(qos != 0 && db__ready_for_queue(context, qos, msg_data)){ |
623 | 0 | state = mosq_ms_queued; |
624 | 0 | rc = 2; |
625 | 0 | }else{ |
626 | | /* Dropping message due to full queue. */ |
627 | 0 | if(context->is_dropping == false){ |
628 | 0 | context->is_dropping = true; |
629 | 0 | log__printf(NULL, MOSQ_LOG_NOTICE, |
630 | 0 | "Outgoing messages are being dropped for client %s.", |
631 | 0 | context->id); |
632 | 0 | } |
633 | 0 | metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1); |
634 | 0 | return 2; |
635 | 0 | } |
636 | 0 | }else{ |
637 | 0 | if(db__ready_for_queue(context, qos, msg_data)){ |
638 | 0 | state = mosq_ms_queued; |
639 | 0 | }else{ |
640 | 0 | metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1); |
641 | 0 | if(context->is_dropping == false){ |
642 | 0 | context->is_dropping = true; |
643 | 0 | log__printf(NULL, MOSQ_LOG_NOTICE, |
644 | 0 | "Outgoing messages are being dropped for client %s.", |
645 | 0 | context->id); |
646 | 0 | } |
647 | 0 | return 2; |
648 | 0 | } |
649 | 0 | } |
650 | 0 | assert(state != mosq_ms_invalid); |
651 | |
|
652 | 0 | #ifdef WITH_PERSISTENCE |
653 | 0 | if(state == mosq_ms_queued){ |
654 | 0 | db.persistence_changes++; |
655 | 0 | } |
656 | 0 | #endif |
657 | |
|
658 | 0 | client_msg = mosquitto_malloc(sizeof(struct mosquitto__client_msg)); |
659 | 0 | if(!client_msg){ |
660 | 0 | return MOSQ_ERR_NOMEM; |
661 | 0 | } |
662 | 0 | client_msg->prev = NULL; |
663 | 0 | client_msg->next = NULL; |
664 | 0 | if(cmsg_id){ |
665 | 0 | client_msg->data.cmsg_id = cmsg_id; |
666 | 0 | }else{ |
667 | 0 | client_msg->data.cmsg_id = ++context->last_cmsg_id; |
668 | 0 | } |
669 | 0 | client_msg->base_msg = base_msg; |
670 | 0 | db__msg_store_ref_inc(client_msg->base_msg); |
671 | 0 | client_msg->data.mid = mid; |
672 | 0 | client_msg->data.direction = mosq_md_out; |
673 | 0 | client_msg->data.state = (enum mosquitto_msg_state)state; |
674 | 0 | client_msg->data.dup = false; |
675 | 0 | if(qos > context->max_qos){ |
676 | 0 | client_msg->data.qos = context->max_qos; |
677 | 0 | }else{ |
678 | 0 | client_msg->data.qos = qos; |
679 | 0 | } |
680 | 0 | client_msg->data.retain = retain; |
681 | 0 | client_msg->data.subscription_identifier = subscription_identifier; |
682 | |
|
683 | 0 | if(state == mosq_ms_queued){ |
684 | 0 | DL_APPEND(msg_data->queued, client_msg); |
685 | 0 | db__msg_add_to_queued_stats(msg_data, client_msg); |
686 | 0 | }else{ |
687 | 0 | DL_APPEND(msg_data->inflight, client_msg); |
688 | 0 | db__msg_add_to_inflight_stats(msg_data, client_msg); |
689 | 0 | } |
690 | |
|
691 | 0 | if(persist && context->is_persisted){ |
692 | 0 | plugin_persist__handle_base_msg_add(client_msg->base_msg); |
693 | 0 | plugin_persist__handle_client_msg_add(context, client_msg); |
694 | 0 | } |
695 | |
|
696 | 0 | if(db.config->allow_duplicate_messages == false && retain == false){ |
697 | | /* Record which client ids this message has been sent to so we can avoid duplicates. |
698 | | * Outgoing messages only. |
699 | | * If retain==true then this is a stale retained message and so should be |
700 | | * sent regardless. FIXME - this does mean retained messages will received |
701 | | * multiple times for overlapping subscriptions, although this is only the |
702 | | * case for SUBSCRIPTION with multiple subs in so is a minor concern. |
703 | | */ |
704 | 0 | dest_ids = mosquitto_realloc(base_msg->dest_ids, sizeof(char *)*(size_t)(base_msg->dest_id_count+1)); |
705 | 0 | if(dest_ids){ |
706 | 0 | base_msg->dest_ids = dest_ids; |
707 | 0 | base_msg->dest_id_count++; |
708 | 0 | base_msg->dest_ids[base_msg->dest_id_count-1] = mosquitto_strdup(context->id); |
709 | 0 | if(!base_msg->dest_ids[base_msg->dest_id_count-1]){ |
710 | 0 | return MOSQ_ERR_NOMEM; |
711 | 0 | } |
712 | 0 | }else{ |
713 | 0 | return MOSQ_ERR_NOMEM; |
714 | 0 | } |
715 | 0 | } |
716 | 0 | #ifdef WITH_BRIDGE |
717 | 0 | if(context->bridge && context->bridge->start_type == bst_lazy |
718 | 0 | && !net__is_connected(context) |
719 | 0 | && context->msgs_out.inflight_count + context->msgs_out.queued_count >= context->bridge->threshold){ |
720 | |
|
721 | 0 | context->bridge->lazy_reconnect = true; |
722 | 0 | } |
723 | 0 | #endif |
724 | |
|
725 | 0 | if(client_msg->data.qos > 0 && state != mosq_ms_queued){ |
726 | 0 | util__decrement_send_quota(context); |
727 | 0 | } |
728 | |
|
729 | 0 | if(update){ |
730 | 0 | rc = db__message_write_inflight_out_latest(context); |
731 | 0 | if(rc){ |
732 | 0 | return rc; |
733 | 0 | } |
734 | 0 | rc = db__message_write_queued_out(context); |
735 | 0 | if(rc){ |
736 | 0 | return rc; |
737 | 0 | } |
738 | 0 | } |
739 | | |
740 | 0 | return rc; |
741 | 0 | } |
742 | | |
743 | | |
744 | | static inline int db__message_update_outgoing_state(struct mosquitto *context, struct mosquitto__client_msg *head, |
745 | | uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist) |
746 | 0 | { |
747 | 0 | struct mosquitto__client_msg *client_msg; |
748 | |
|
749 | 0 | DL_FOREACH(head, client_msg){ |
750 | 0 | if(client_msg->data.mid == mid){ |
751 | 0 | if(client_msg->data.qos != qos){ |
752 | 0 | return MOSQ_ERR_PROTOCOL; |
753 | 0 | } |
754 | 0 | client_msg->data.state = (enum mosquitto_msg_state)state; |
755 | 0 | if(persist){ |
756 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
757 | 0 | } |
758 | 0 | return MOSQ_ERR_SUCCESS; |
759 | 0 | } |
760 | 0 | } |
761 | 0 | return MOSQ_ERR_NOT_FOUND; |
762 | 0 | } |
763 | | |
764 | | |
765 | | int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist) |
766 | 0 | { |
767 | 0 | int rc; |
768 | |
|
769 | 0 | rc = db__message_update_outgoing_state(context, context->msgs_out.inflight, mid, state, qos, persist); |
770 | 0 | if(!persist && rc == MOSQ_ERR_NOT_FOUND){ |
771 | 0 | rc = db__message_update_outgoing_state(context, context->msgs_out.queued, mid, state, qos, persist); |
772 | 0 | } |
773 | 0 | return rc; |
774 | 0 | } |
775 | | |
776 | | |
777 | | static void db__messages_delete_list(struct mosquitto__client_msg **head) |
778 | 0 | { |
779 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
780 | |
|
781 | 0 | DL_FOREACH_SAFE(*head, client_msg, tmp){ |
782 | 0 | DL_DELETE(*head, client_msg); |
783 | 0 | db__msg_store_ref_dec(&client_msg->base_msg); |
784 | 0 | mosquitto_FREE(client_msg); |
785 | 0 | } |
786 | 0 | *head = NULL; |
787 | 0 | } |
788 | | |
789 | | |
790 | | int db__messages_delete_incoming(struct mosquitto *context) |
791 | 0 | { |
792 | 0 | if(!context){ |
793 | 0 | return MOSQ_ERR_INVAL; |
794 | 0 | } |
795 | | |
796 | 0 | db__messages_delete_list(&context->msgs_in.inflight); |
797 | 0 | db__messages_delete_list(&context->msgs_in.queued); |
798 | 0 | context->msgs_in.inflight_bytes = 0; |
799 | 0 | context->msgs_in.inflight_bytes12 = 0; |
800 | 0 | context->msgs_in.inflight_count = 0; |
801 | 0 | context->msgs_in.inflight_count12 = 0; |
802 | 0 | context->msgs_in.queued_bytes = 0; |
803 | 0 | context->msgs_in.queued_bytes12 = 0; |
804 | 0 | context->msgs_in.queued_count = 0; |
805 | 0 | context->msgs_in.queued_count12 = 0; |
806 | |
|
807 | 0 | return MOSQ_ERR_SUCCESS; |
808 | 0 | } |
809 | | |
810 | | |
811 | | int db__messages_delete_outgoing(struct mosquitto *context) |
812 | 0 | { |
813 | 0 | if(!context){ |
814 | 0 | return MOSQ_ERR_INVAL; |
815 | 0 | } |
816 | | |
817 | 0 | db__messages_delete_list(&context->msgs_out.inflight); |
818 | 0 | db__messages_delete_list(&context->msgs_out.queued); |
819 | 0 | context->msgs_out.inflight_bytes = 0; |
820 | 0 | context->msgs_out.inflight_bytes12 = 0; |
821 | 0 | context->msgs_out.inflight_count = 0; |
822 | 0 | context->msgs_out.inflight_count12 = 0; |
823 | 0 | context->msgs_out.queued_bytes = 0; |
824 | 0 | context->msgs_out.queued_bytes12 = 0; |
825 | 0 | context->msgs_out.queued_count = 0; |
826 | 0 | context->msgs_out.queued_count12 = 0; |
827 | |
|
828 | 0 | return MOSQ_ERR_SUCCESS; |
829 | 0 | } |
830 | | |
831 | | |
832 | | int db__messages_delete(struct mosquitto *context, bool force_free) |
833 | 0 | { |
834 | 0 | if(!context){ |
835 | 0 | return MOSQ_ERR_INVAL; |
836 | 0 | } |
837 | | |
838 | 0 | if(force_free || context->clean_start || (context->bridge && context->bridge->clean_start)){ |
839 | 0 | db__messages_delete_incoming(context); |
840 | 0 | } |
841 | |
|
842 | 0 | if(force_free || (context->bridge && context->bridge->clean_start_local) |
843 | 0 | || (context->bridge == NULL && context->clean_start)){ |
844 | |
|
845 | 0 | db__messages_delete_outgoing(context); |
846 | 0 | } |
847 | |
|
848 | 0 | return MOSQ_ERR_SUCCESS; |
849 | 0 | } |
850 | | |
851 | | |
852 | | int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties) |
853 | 0 | { |
854 | 0 | struct mosquitto__base_msg *base_msg; |
855 | 0 | const char *source_id; |
856 | 0 | enum mosquitto_msg_origin origin; |
857 | |
|
858 | 0 | if(!topic){ |
859 | 0 | return MOSQ_ERR_INVAL; |
860 | 0 | } |
861 | | |
862 | 0 | base_msg = mosquitto_calloc(1, sizeof(struct mosquitto__base_msg)); |
863 | 0 | if(base_msg == NULL){ |
864 | 0 | return MOSQ_ERR_NOMEM; |
865 | 0 | } |
866 | | |
867 | 0 | base_msg->data.topic = mosquitto_strdup(topic); |
868 | 0 | if(base_msg->data.topic == NULL){ |
869 | 0 | db__msg_store_free(base_msg); |
870 | 0 | return MOSQ_ERR_INVAL; |
871 | 0 | } |
872 | | |
873 | 0 | base_msg->data.qos = qos; |
874 | 0 | if(db.config->retain_available == false){ |
875 | 0 | base_msg->data.retain = 0; |
876 | 0 | }else{ |
877 | 0 | base_msg->data.retain = retain; |
878 | 0 | } |
879 | |
|
880 | 0 | base_msg->data.payloadlen = payloadlen; |
881 | 0 | if(payloadlen > 0){ |
882 | 0 | base_msg->data.payload = mosquitto_malloc(base_msg->data.payloadlen+1); |
883 | 0 | if(base_msg->data.payload == NULL){ |
884 | 0 | db__msg_store_free(base_msg); |
885 | 0 | return MOSQ_ERR_NOMEM; |
886 | 0 | } |
887 | | /* Ensure payload is always zero terminated, this is the reason for the extra byte above */ |
888 | 0 | ((uint8_t *)base_msg->data.payload)[base_msg->data.payloadlen] = 0; |
889 | 0 | memcpy(base_msg->data.payload, payload, base_msg->data.payloadlen); |
890 | 0 | } |
891 | | |
892 | 0 | if(context && context->id){ |
893 | 0 | source_id = context->id; |
894 | 0 | }else{ |
895 | 0 | source_id = ""; |
896 | 0 | } |
897 | 0 | if(properties){ |
898 | 0 | base_msg->data.properties = *properties; |
899 | 0 | *properties = NULL; |
900 | 0 | } |
901 | |
|
902 | 0 | if(context){ |
903 | 0 | origin = mosq_mo_client; |
904 | 0 | }else{ |
905 | 0 | origin = mosq_mo_broker; |
906 | 0 | } |
907 | 0 | if(db__message_store(context, base_msg, &message_expiry_interval, origin)){ |
908 | 0 | return 1; |
909 | 0 | } |
910 | | |
911 | 0 | return sub__messages_queue(source_id, base_msg->data.topic, base_msg->data.qos, base_msg->data.retain, &base_msg); |
912 | 0 | } |
913 | | |
914 | | |
915 | 0 | #define MOSQ_UUID_EPOCH 1637168273 |
916 | | |
917 | | |
918 | | /* db__new_msg_id() attempts to generate a new unique id on the broker, or a |
919 | | * number of brokers. It uses the 10-bit node ID, which can be set by plugins |
920 | | * to allow different brokers to share the same plugin persistence database |
921 | | * without overlapping one another. |
922 | | * |
923 | | * The message ID is a 64-bit unsigned integer arranged as follows: |
924 | | * |
925 | | * 10-bit ID 31-bit seconds 23-bit fractional seconds |
926 | | * iiiiiiiiiisssssssssssssssssssssssssssssssnnnnnnnnnnnnnnnnnnnnnnn |
927 | | * |
928 | | * 10-bit ID gives a total of 1024 brokers can produce unique values (complete overkill) |
929 | | * 31-bit seconds gives a roll over date of 68 years after MOSQ_UUID_EPOCH - 2089. |
930 | | * This roll over date would affect messages that have been queued waiting |
931 | | * for a client to receive them, or retained messages only. If either of |
932 | | * those remains for 68 years unchanged, then there will potentially be a |
933 | | * collision. Ideally we need to ensure, however, that the message id is |
934 | | * continually increasing for sorting purposes. |
935 | | * 23-bit fractional seconds gives a resolution of 120ns, or 8.4 million |
936 | | * messages per second per broker. |
937 | | */ |
938 | | uint64_t db__new_msg_id(void) |
939 | 0 | { |
940 | | #ifdef WIN32 |
941 | | FILETIME ftime; |
942 | | uint64_t ftime64; |
943 | | #else |
944 | 0 | struct timespec ts; |
945 | 0 | #endif |
946 | 0 | uint64_t id; |
947 | 0 | uint64_t tmp; |
948 | 0 | time_t sec; |
949 | 0 | long nsec; |
950 | |
|
951 | 0 | id = db.node_id_shifted; /* Top 10-bits */ |
952 | |
|
953 | | #ifdef WIN32 |
954 | | GetSystemTimePreciseAsFileTime(&ftime); |
955 | | ftime64 = (((uint64_t)ftime.dwHighDateTime)<<32) + ftime.dwLowDateTime; |
956 | | tmp = ftime64 - 116444736000000000LL; /* Convert offset to unix epoch, still in counts of 100ns */ |
957 | | sec = tmp / 10000000; /* Convert to seconds */ |
958 | | nsec = (long)(tmp - sec)*100; /* Remove seconds, convert to counts of 1ns */ |
959 | | #else |
960 | 0 | clock_gettime(CLOCK_REALTIME, &ts); |
961 | 0 | sec = ts.tv_sec; |
962 | 0 | nsec = ts.tv_nsec; |
963 | 0 | #endif |
964 | 0 | tmp = (sec - MOSQ_UUID_EPOCH) & 0x7FFFFFFF; |
965 | 0 | id = id | (tmp << 23); /* Seconds, 31-bits (68 years) */ |
966 | |
|
967 | 0 | tmp = (nsec & 0x7FFFFF80); /* top 23-bits of the bottom 30 bits (1 billion ns), ~100 ns resolution */ |
968 | 0 | id = id | (tmp >> 7); |
969 | |
|
970 | 0 | if(id <= db.last_db_id){ |
971 | 0 | id = db.last_db_id + 1; |
972 | 0 | } |
973 | 0 | db.last_db_id = id; |
974 | |
|
975 | 0 | return id; |
976 | 0 | } |
977 | | |
978 | | |
979 | | /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */ |
980 | | int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg *base_msg, uint32_t *message_expiry_interval, enum mosquitto_msg_origin origin) |
981 | 0 | { |
982 | 0 | int rc; |
983 | |
|
984 | 0 | assert(base_msg); |
985 | |
|
986 | 0 | if(source && source->id){ |
987 | 0 | base_msg->data.source_id = mosquitto_strdup(source->id); |
988 | 0 | }else{ |
989 | 0 | base_msg->data.source_id = mosquitto_strdup(""); |
990 | 0 | } |
991 | 0 | if(!base_msg->data.source_id){ |
992 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
993 | 0 | db__msg_store_free(base_msg); |
994 | 0 | return MOSQ_ERR_NOMEM; |
995 | 0 | } |
996 | | |
997 | 0 | if(source && source->username){ |
998 | 0 | base_msg->data.source_username = mosquitto_strdup(source->username); |
999 | 0 | if(!base_msg->data.source_username){ |
1000 | 0 | db__msg_store_free(base_msg); |
1001 | 0 | return MOSQ_ERR_NOMEM; |
1002 | 0 | } |
1003 | 0 | } |
1004 | 0 | if(source){ |
1005 | 0 | base_msg->source_listener = source->listener; |
1006 | 0 | } |
1007 | 0 | base_msg->origin = origin; |
1008 | 0 | if(message_expiry_interval){ |
1009 | 0 | if(*message_expiry_interval > 0 && *message_expiry_interval != MSG_EXPIRY_INFINITE){ |
1010 | 0 | base_msg->data.expiry_time = db.now_real_s + *message_expiry_interval; |
1011 | 0 | }else{ |
1012 | 0 | base_msg->data.expiry_time = 0; |
1013 | 0 | } |
1014 | 0 | } |
1015 | |
|
1016 | 0 | base_msg->dest_ids = NULL; |
1017 | 0 | base_msg->dest_id_count = 0; |
1018 | 0 | db.msg_store_count++; |
1019 | 0 | db.msg_store_bytes += base_msg->data.payloadlen; |
1020 | |
|
1021 | 0 | if(!base_msg->data.store_id){ |
1022 | 0 | base_msg->data.store_id = db__new_msg_id(); |
1023 | 0 | } |
1024 | |
|
1025 | 0 | rc = db__msg_store_add(base_msg); |
1026 | 0 | if(rc){ |
1027 | 0 | db__msg_store_free(base_msg); |
1028 | 0 | return rc; |
1029 | 0 | } |
1030 | | |
1031 | 0 | return MOSQ_ERR_SUCCESS; |
1032 | 0 | } |
1033 | | |
1034 | | |
1035 | | int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto__client_msg **client_msg) |
1036 | 0 | { |
1037 | 0 | struct mosquitto__client_msg *cmsg; |
1038 | |
|
1039 | 0 | *client_msg = NULL; |
1040 | |
|
1041 | 0 | if(!context){ |
1042 | 0 | return MOSQ_ERR_INVAL; |
1043 | 0 | } |
1044 | | |
1045 | 0 | DL_FOREACH(context->msgs_in.inflight, cmsg){ |
1046 | 0 | if(cmsg->base_msg->data.source_mid == mid){ |
1047 | 0 | *client_msg = cmsg; |
1048 | 0 | return MOSQ_ERR_SUCCESS; |
1049 | 0 | } |
1050 | 0 | } |
1051 | | |
1052 | 0 | DL_FOREACH(context->msgs_in.queued, cmsg){ |
1053 | 0 | if(cmsg->base_msg->data.source_mid == mid){ |
1054 | 0 | *client_msg = cmsg; |
1055 | 0 | return MOSQ_ERR_SUCCESS; |
1056 | 0 | } |
1057 | 0 | } |
1058 | | |
1059 | 0 | return 1; |
1060 | 0 | } |
1061 | | |
1062 | | |
1063 | | /* Called on reconnect to set outgoing messages to a sensible state and force a |
1064 | | * retry, and to set incoming messages to expect an appropriate retry. */ |
1065 | | static int db__message_reconnect_reset_outgoing(struct mosquitto *context) |
1066 | 0 | { |
1067 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1068 | |
|
1069 | 0 | context->msgs_out.inflight_bytes = 0; |
1070 | 0 | context->msgs_out.inflight_bytes12 = 0; |
1071 | 0 | context->msgs_out.inflight_count = 0; |
1072 | 0 | context->msgs_out.inflight_count12 = 0; |
1073 | 0 | context->msgs_out.queued_bytes = 0; |
1074 | 0 | context->msgs_out.queued_bytes12 = 0; |
1075 | 0 | context->msgs_out.queued_count = 0; |
1076 | 0 | context->msgs_out.queued_count12 = 0; |
1077 | 0 | context->msgs_out.inflight_quota = context->msgs_out.inflight_maximum; |
1078 | |
|
1079 | 0 | DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ |
1080 | 0 | db__msg_add_to_inflight_stats(&context->msgs_out, client_msg); |
1081 | 0 | if(client_msg->data.qos > 0){ |
1082 | 0 | util__decrement_send_quota(context); |
1083 | 0 | } |
1084 | |
|
1085 | 0 | switch(client_msg->data.qos){ |
1086 | 0 | case 0: |
1087 | 0 | client_msg->data.state = mosq_ms_publish_qos0; |
1088 | 0 | break; |
1089 | 0 | case 1: |
1090 | 0 | client_msg->data.state = mosq_ms_publish_qos1; |
1091 | 0 | break; |
1092 | 0 | case 2: |
1093 | 0 | if(client_msg->data.state == mosq_ms_wait_for_pubcomp){ |
1094 | 0 | client_msg->data.state = mosq_ms_resend_pubrel; |
1095 | 0 | }else{ |
1096 | 0 | client_msg->data.state = mosq_ms_publish_qos2; |
1097 | 0 | } |
1098 | 0 | break; |
1099 | 0 | } |
1100 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1101 | 0 | } |
1102 | | /* Messages received when the client was disconnected are put |
1103 | | * in the mosq_ms_queued state. If we don't change them to the |
1104 | | * appropriate "publish" state, then the queued messages won't |
1105 | | * get sent until the client next receives a message - and they |
1106 | | * will be sent out of order. |
1107 | | */ |
1108 | 0 | DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ |
1109 | 0 | db__msg_add_to_queued_stats(&context->msgs_out, client_msg); |
1110 | 0 | } |
1111 | 0 | db__fill_inflight_out_from_queue(context); |
1112 | |
|
1113 | 0 | return MOSQ_ERR_SUCCESS; |
1114 | 0 | } |
1115 | | |
1116 | | |
1117 | | /* Called on reconnect to set incoming messages to expect an appropriate retry. */ |
1118 | | static int db__message_reconnect_reset_incoming(struct mosquitto *context) |
1119 | 0 | { |
1120 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1121 | |
|
1122 | 0 | context->msgs_in.inflight_bytes = 0; |
1123 | 0 | context->msgs_in.inflight_bytes12 = 0; |
1124 | 0 | context->msgs_in.inflight_count = 0; |
1125 | 0 | context->msgs_in.inflight_count12 = 0; |
1126 | 0 | context->msgs_in.queued_bytes = 0; |
1127 | 0 | context->msgs_in.queued_bytes12 = 0; |
1128 | 0 | context->msgs_in.queued_count = 0; |
1129 | 0 | context->msgs_in.queued_count12 = 0; |
1130 | 0 | context->msgs_in.inflight_quota = context->msgs_in.inflight_maximum; |
1131 | |
|
1132 | 0 | DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ |
1133 | 0 | db__msg_add_to_inflight_stats(&context->msgs_in, client_msg); |
1134 | 0 | if(client_msg->data.qos > 0){ |
1135 | 0 | util__decrement_receive_quota(context); |
1136 | 0 | } |
1137 | |
|
1138 | 0 | if(client_msg->data.qos != 2){ |
1139 | | /* Anything <QoS 2 can be completely retried by the client at |
1140 | | * no harm. */ |
1141 | 0 | db__message_remove_inflight(context, &context->msgs_in, client_msg); |
1142 | 0 | }else{ |
1143 | | /* Message state can be preserved here because it should match |
1144 | | * whatever the client has got. */ |
1145 | 0 | client_msg->data.dup = 0; |
1146 | 0 | } |
1147 | 0 | } |
1148 | | |
1149 | | /* Messages received when the client was disconnected are put |
1150 | | * in the mosq_ms_queued state. If we don't change them to the |
1151 | | * appropriate "publish" state, then the queued messages won't |
1152 | | * get sent until the client next receives a message - and they |
1153 | | * will be sent out of order. |
1154 | | */ |
1155 | 0 | DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ |
1156 | 0 | client_msg->data.dup = 0; |
1157 | 0 | db__msg_add_to_queued_stats(&context->msgs_in, client_msg); |
1158 | 0 | if(db__ready_for_flight(context, mosq_md_in, client_msg->data.qos)){ |
1159 | 0 | switch(client_msg->data.qos){ |
1160 | 0 | case 0: |
1161 | 0 | client_msg->data.state = mosq_ms_publish_qos0; |
1162 | 0 | break; |
1163 | 0 | case 1: |
1164 | 0 | client_msg->data.state = mosq_ms_publish_qos1; |
1165 | 0 | break; |
1166 | 0 | case 2: |
1167 | 0 | client_msg->data.state = mosq_ms_publish_qos2; |
1168 | 0 | break; |
1169 | 0 | } |
1170 | 0 | db__message_dequeue_first(context, &context->msgs_in); |
1171 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1172 | 0 | } |
1173 | 0 | } |
1174 | | |
1175 | 0 | return MOSQ_ERR_SUCCESS; |
1176 | 0 | } |
1177 | | |
1178 | | |
1179 | | int db__message_reconnect_reset(struct mosquitto *context) |
1180 | 0 | { |
1181 | 0 | int rc; |
1182 | |
|
1183 | 0 | rc = db__message_reconnect_reset_outgoing(context); |
1184 | 0 | if(rc){ |
1185 | 0 | return rc; |
1186 | 0 | } |
1187 | 0 | return db__message_reconnect_reset_incoming(context); |
1188 | 0 | } |
1189 | | |
1190 | | |
1191 | | int db__message_remove_incoming(struct mosquitto *context, uint16_t mid) |
1192 | 0 | { |
1193 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1194 | |
|
1195 | 0 | if(!context){ |
1196 | 0 | return MOSQ_ERR_INVAL; |
1197 | 0 | } |
1198 | | |
1199 | 0 | DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ |
1200 | 0 | if(client_msg->data.mid == mid){ |
1201 | 0 | if(client_msg->base_msg->data.qos != 2){ |
1202 | 0 | return MOSQ_ERR_PROTOCOL; |
1203 | 0 | } |
1204 | 0 | db__message_remove_inflight(context, &context->msgs_in, client_msg); |
1205 | 0 | return MOSQ_ERR_SUCCESS; |
1206 | 0 | } |
1207 | 0 | } |
1208 | | |
1209 | 0 | return MOSQ_ERR_NOT_FOUND; |
1210 | 0 | } |
1211 | | |
1212 | | |
1213 | | int db__message_release_incoming(struct mosquitto *context, uint16_t mid) |
1214 | 0 | { |
1215 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1216 | 0 | int retain; |
1217 | 0 | char *topic; |
1218 | 0 | char *source_id; |
1219 | 0 | bool deleted = false; |
1220 | 0 | int rc; |
1221 | |
|
1222 | 0 | if(!context){ |
1223 | 0 | return MOSQ_ERR_INVAL; |
1224 | 0 | } |
1225 | | |
1226 | 0 | DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ |
1227 | 0 | if(client_msg->data.mid == mid){ |
1228 | 0 | if(client_msg->base_msg->data.qos != 2){ |
1229 | 0 | return MOSQ_ERR_PROTOCOL; |
1230 | 0 | } |
1231 | 0 | topic = client_msg->base_msg->data.topic; |
1232 | 0 | retain = client_msg->data.retain; |
1233 | 0 | source_id = client_msg->base_msg->data.source_id; |
1234 | | |
1235 | | /* topic==NULL should be a QoS 2 message that was |
1236 | | * denied/dropped and is being processed so the client doesn't |
1237 | | * keep resending it. That means we don't send it to other |
1238 | | * clients. */ |
1239 | 0 | if(topic == NULL){ |
1240 | 0 | db__message_remove_inflight(context, &context->msgs_in, client_msg); |
1241 | 0 | deleted = true; |
1242 | 0 | }else{ |
1243 | 0 | rc = sub__messages_queue(source_id, topic, 2, retain, &client_msg->base_msg); |
1244 | 0 | if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){ |
1245 | 0 | db__message_remove_inflight(context, &context->msgs_in, client_msg); |
1246 | 0 | deleted = true; |
1247 | 0 | }else{ |
1248 | 0 | return 1; |
1249 | 0 | } |
1250 | 0 | } |
1251 | 0 | } |
1252 | 0 | } |
1253 | | |
1254 | 0 | DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ |
1255 | 0 | if(db__ready_for_flight(context, mosq_md_in, client_msg->data.qos)){ |
1256 | 0 | break; |
1257 | 0 | } |
1258 | | |
1259 | 0 | if(client_msg->data.qos == 2){ |
1260 | 0 | send__pubrec(context, client_msg->data.mid, 0, NULL); |
1261 | 0 | client_msg->data.state = mosq_ms_wait_for_pubrel; |
1262 | 0 | db__message_dequeue_first(context, &context->msgs_in); |
1263 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1264 | 0 | } |
1265 | 0 | } |
1266 | 0 | if(deleted){ |
1267 | 0 | return MOSQ_ERR_SUCCESS; |
1268 | 0 | }else{ |
1269 | 0 | return MOSQ_ERR_NOT_FOUND; |
1270 | 0 | } |
1271 | 0 | } |
1272 | | |
1273 | | |
1274 | | void db__expire_all_messages(struct mosquitto *context) |
1275 | 0 | { |
1276 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1277 | |
|
1278 | 0 | DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ |
1279 | 0 | if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ |
1280 | 0 | if(client_msg->data.qos > 0){ |
1281 | 0 | util__increment_send_quota(context); |
1282 | 0 | } |
1283 | 0 | db__message_remove_inflight(context, &context->msgs_out, client_msg); |
1284 | 0 | } |
1285 | 0 | } |
1286 | 0 | db__fill_inflight_out_from_queue(context); |
1287 | 0 | DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ |
1288 | 0 | if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ |
1289 | 0 | db__message_remove_queued(context, &context->msgs_out, client_msg); |
1290 | 0 | } |
1291 | 0 | } |
1292 | 0 | DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ |
1293 | 0 | if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ |
1294 | 0 | if(client_msg->data.qos > 0){ |
1295 | 0 | util__increment_receive_quota(context); |
1296 | 0 | } |
1297 | 0 | db__message_remove_inflight(context, &context->msgs_in, client_msg); |
1298 | 0 | } |
1299 | 0 | } |
1300 | 0 | DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ |
1301 | 0 | if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ |
1302 | 0 | db__message_remove_queued(context, &context->msgs_in, client_msg); |
1303 | 0 | } |
1304 | 0 | } |
1305 | 0 | } |
1306 | | |
1307 | | |
1308 | | static void db__client_messages_check_acl(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg **head, |
1309 | | void (*decrement_stats_fn)(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg)) |
1310 | 0 | { |
1311 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1312 | 0 | struct mosquitto__base_msg *base_msg; |
1313 | 0 | int access; |
1314 | |
|
1315 | 0 | DL_FOREACH_SAFE((*head), client_msg, tmp){ |
1316 | 0 | base_msg = client_msg->base_msg; |
1317 | 0 | if(client_msg->data.direction == mosq_md_out){ |
1318 | 0 | access = MOSQ_ACL_READ; |
1319 | 0 | }else{ |
1320 | 0 | access = MOSQ_ACL_WRITE; |
1321 | 0 | } |
1322 | 0 | if(mosquitto_acl_check(context, base_msg->data.topic, |
1323 | 0 | base_msg->data.payloadlen, base_msg->data.payload, |
1324 | 0 | base_msg->data.qos, base_msg->data.retain, access) != MOSQ_ERR_SUCCESS){ |
1325 | |
|
1326 | 0 | DL_DELETE((*head), client_msg); |
1327 | 0 | decrement_stats_fn(msg_data, client_msg); |
1328 | 0 | plugin_persist__handle_client_msg_delete(context, client_msg); |
1329 | 0 | db__msg_store_ref_dec(&client_msg->base_msg); |
1330 | 0 | mosquitto_FREE(client_msg); |
1331 | 0 | } |
1332 | 0 | } |
1333 | 0 | } |
1334 | | |
1335 | | |
1336 | | void db__check_acl_of_all_messages(struct mosquitto *context) |
1337 | 0 | { |
1338 | 0 | db__client_messages_check_acl(context, &context->msgs_in, &context->msgs_in.inflight, &db__msg_remove_from_inflight_stats); |
1339 | 0 | db__client_messages_check_acl(context, &context->msgs_in, &context->msgs_in.queued, &db__msg_remove_from_queued_stats); |
1340 | 0 | db__client_messages_check_acl(context, &context->msgs_out, &context->msgs_out.inflight, &db__msg_remove_from_inflight_stats); |
1341 | 0 | db__client_messages_check_acl(context, &context->msgs_out, &context->msgs_out.queued, &db__msg_remove_from_queued_stats); |
1342 | 0 | } |
1343 | | |
1344 | | |
1345 | | static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto__client_msg *client_msg) |
1346 | 0 | { |
1347 | 0 | struct mosquitto__base_msg *base_msg; |
1348 | 0 | mosquitto_property *base_msg_props = NULL; |
1349 | 0 | int rc; |
1350 | 0 | uint16_t mid; |
1351 | 0 | int retries; |
1352 | 0 | int retain; |
1353 | 0 | const char *topic; |
1354 | 0 | uint8_t qos; |
1355 | 0 | uint32_t payloadlen; |
1356 | 0 | const void *payload; |
1357 | 0 | uint32_t expiry_interval; |
1358 | 0 | uint32_t subscription_id; |
1359 | |
|
1360 | 0 | base_msg = client_msg->base_msg; |
1361 | |
|
1362 | 0 | expiry_interval = 0; |
1363 | 0 | if(base_msg->data.expiry_time){ |
1364 | 0 | if(db.now_real_s > base_msg->data.expiry_time){ |
1365 | | /* Message is expired, must not send. */ |
1366 | 0 | if(client_msg->data.direction == mosq_md_out && client_msg->data.qos > 0){ |
1367 | 0 | util__increment_send_quota(context); |
1368 | 0 | } |
1369 | 0 | db__message_remove_inflight(context, &context->msgs_out, client_msg); |
1370 | 0 | db__fill_inflight_out_from_queue(context); |
1371 | 0 | return MOSQ_ERR_SUCCESS; |
1372 | 0 | }else{ |
1373 | 0 | expiry_interval = (uint32_t)(base_msg->data.expiry_time - db.now_real_s); |
1374 | 0 | } |
1375 | 0 | } |
1376 | 0 | mid = client_msg->data.mid; |
1377 | 0 | retries = client_msg->data.dup; |
1378 | 0 | retain = client_msg->data.retain; |
1379 | 0 | topic = base_msg->data.topic; |
1380 | 0 | qos = (uint8_t)client_msg->data.qos; |
1381 | 0 | payloadlen = base_msg->data.payloadlen; |
1382 | 0 | payload = base_msg->data.payload; |
1383 | 0 | subscription_id = client_msg->data.subscription_identifier; |
1384 | 0 | base_msg_props = base_msg->data.properties; |
1385 | |
|
1386 | 0 | switch(client_msg->data.state){ |
1387 | 0 | case mosq_ms_publish_qos0: |
1388 | 0 | rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval); |
1389 | 0 | if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){ |
1390 | 0 | db__message_remove_inflight(context, &context->msgs_out, client_msg); |
1391 | 0 | }else{ |
1392 | 0 | return rc; |
1393 | 0 | } |
1394 | 0 | break; |
1395 | | |
1396 | 0 | case mosq_ms_publish_qos1: |
1397 | 0 | rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval); |
1398 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
1399 | 0 | client_msg->data.dup = 1; /* Any retry attempts are a duplicate. */ |
1400 | 0 | client_msg->data.state = mosq_ms_wait_for_puback; |
1401 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1402 | 0 | }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ |
1403 | 0 | db__message_remove_inflight(context, &context->msgs_out, client_msg); |
1404 | 0 | }else{ |
1405 | 0 | return rc; |
1406 | 0 | } |
1407 | 0 | break; |
1408 | | |
1409 | 0 | case mosq_ms_publish_qos2: |
1410 | 0 | rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval); |
1411 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
1412 | 0 | client_msg->data.dup = 1; /* Any retry attempts are a duplicate. */ |
1413 | 0 | client_msg->data.state = mosq_ms_wait_for_pubrec; |
1414 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1415 | 0 | }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ |
1416 | 0 | db__message_remove_inflight(context, &context->msgs_out, client_msg); |
1417 | 0 | }else{ |
1418 | 0 | return rc; |
1419 | 0 | } |
1420 | 0 | break; |
1421 | | |
1422 | 0 | case mosq_ms_resend_pubrel: |
1423 | 0 | rc = send__pubrel(context, mid, NULL); |
1424 | 0 | if(!rc){ |
1425 | 0 | client_msg->data.state = mosq_ms_wait_for_pubcomp; |
1426 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1427 | 0 | }else{ |
1428 | 0 | return rc; |
1429 | 0 | } |
1430 | 0 | break; |
1431 | | |
1432 | 0 | case mosq_ms_invalid: |
1433 | 0 | case mosq_ms_send_pubrec: |
1434 | 0 | case mosq_ms_resend_pubcomp: |
1435 | 0 | case mosq_ms_wait_for_puback: |
1436 | 0 | case mosq_ms_wait_for_pubrec: |
1437 | 0 | case mosq_ms_wait_for_pubrel: |
1438 | 0 | case mosq_ms_wait_for_pubcomp: |
1439 | 0 | case mosq_ms_queued: |
1440 | 0 | break; |
1441 | 0 | } |
1442 | 0 | return MOSQ_ERR_SUCCESS; |
1443 | 0 | } |
1444 | | |
1445 | | |
1446 | | int db__message_write_inflight_out_all(struct mosquitto *context) |
1447 | 0 | { |
1448 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1449 | 0 | int rc; |
1450 | |
|
1451 | 0 | if(context->state != mosq_cs_active || !net__is_connected(context)){ |
1452 | 0 | return MOSQ_ERR_SUCCESS; |
1453 | 0 | } |
1454 | | |
1455 | 0 | DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ |
1456 | 0 | rc = db__message_write_inflight_out_single(context, client_msg); |
1457 | 0 | if(rc){ |
1458 | 0 | return rc; |
1459 | 0 | } |
1460 | 0 | } |
1461 | 0 | return MOSQ_ERR_SUCCESS; |
1462 | 0 | } |
1463 | | |
1464 | | |
1465 | | int db__message_write_inflight_out_latest(struct mosquitto *context) |
1466 | 0 | { |
1467 | 0 | struct mosquitto__client_msg *client_msg, *next; |
1468 | 0 | int rc; |
1469 | |
|
1470 | 0 | if(context->state != mosq_cs_active |
1471 | 0 | || !net__is_connected(context) |
1472 | 0 | || context->msgs_out.inflight == NULL){ |
1473 | |
|
1474 | 0 | return MOSQ_ERR_SUCCESS; |
1475 | 0 | } |
1476 | | |
1477 | 0 | if(context->msgs_out.inflight->prev == context->msgs_out.inflight){ |
1478 | | /* Only one message */ |
1479 | 0 | return db__message_write_inflight_out_single(context, context->msgs_out.inflight); |
1480 | 0 | } |
1481 | | |
1482 | | /* Start at the end of the list and work backwards looking for the first |
1483 | | * message in a non-publish state */ |
1484 | 0 | client_msg = context->msgs_out.inflight->prev; |
1485 | 0 | while(client_msg != context->msgs_out.inflight && |
1486 | 0 | (client_msg->data.state == mosq_ms_publish_qos0 |
1487 | 0 | || client_msg->data.state == mosq_ms_publish_qos1 |
1488 | 0 | || client_msg->data.state == mosq_ms_publish_qos2)){ |
1489 | |
|
1490 | 0 | client_msg = client_msg->prev; |
1491 | 0 | } |
1492 | | |
1493 | | /* Tail is now either the head of the list, if that message is waiting for |
1494 | | * publish, or the oldest message not waiting for a publish. In the latter |
1495 | | * case, any pending publishes should be next after this message. */ |
1496 | 0 | if(client_msg != context->msgs_out.inflight){ |
1497 | 0 | client_msg = client_msg->next; |
1498 | 0 | } |
1499 | |
|
1500 | 0 | while(client_msg){ |
1501 | 0 | next = client_msg->next; |
1502 | 0 | rc = db__message_write_inflight_out_single(context, client_msg); |
1503 | 0 | if(rc){ |
1504 | 0 | return rc; |
1505 | 0 | } |
1506 | 0 | client_msg = next; |
1507 | 0 | } |
1508 | 0 | return MOSQ_ERR_SUCCESS; |
1509 | 0 | } |
1510 | | |
1511 | | |
1512 | | int db__message_write_queued_in(struct mosquitto *context) |
1513 | 0 | { |
1514 | 0 | struct mosquitto__client_msg *client_msg, *tmp; |
1515 | 0 | int rc; |
1516 | |
|
1517 | 0 | if(context->state != mosq_cs_active){ |
1518 | 0 | return MOSQ_ERR_SUCCESS; |
1519 | 0 | } |
1520 | | |
1521 | 0 | DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ |
1522 | 0 | if(context->msgs_in.inflight_maximum != 0 && context->msgs_in.inflight_quota == 0){ |
1523 | 0 | break; |
1524 | 0 | } |
1525 | | |
1526 | 0 | if(client_msg->data.qos == 2){ |
1527 | 0 | client_msg->data.state = mosq_ms_send_pubrec; |
1528 | 0 | db__message_dequeue_first(context, &context->msgs_in); |
1529 | 0 | rc = send__pubrec(context, client_msg->data.mid, 0, NULL); |
1530 | 0 | if(!rc){ |
1531 | 0 | client_msg->data.state = mosq_ms_wait_for_pubrel; |
1532 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1533 | 0 | }else{ |
1534 | 0 | plugin_persist__handle_client_msg_update(context, client_msg); |
1535 | 0 | return rc; |
1536 | 0 | } |
1537 | 0 | } |
1538 | 0 | } |
1539 | 0 | return MOSQ_ERR_SUCCESS; |
1540 | 0 | } |
1541 | | |
1542 | | |
1543 | | int db__message_write_queued_out(struct mosquitto *context) |
1544 | 0 | { |
1545 | 0 | if(context->state != mosq_cs_active){ |
1546 | 0 | return MOSQ_ERR_SUCCESS; |
1547 | 0 | } |
1548 | | |
1549 | 0 | db__fill_inflight_out_from_queue(context); |
1550 | |
|
1551 | 0 | return MOSQ_ERR_SUCCESS; |
1552 | 0 | } |