/src/mosquitto/src/subs.c
Line | Count | Source |
1 | | /* |
2 | | Copyright (c) 2010-2021 Roger Light <roger@atchoo.org> |
3 | | |
4 | | All rights reserved. This program and the accompanying materials |
5 | | are made available under the terms of the Eclipse Public License 2.0 |
6 | | and Eclipse Distribution License v1.0 which accompany this distribution. |
7 | | |
8 | | The Eclipse Public License is available at |
9 | | https://www.eclipse.org/legal/epl-2.0/ |
10 | | and the Eclipse Distribution License is available at |
11 | | http://www.eclipse.org/org/documents/edl-v10.php. |
12 | | |
13 | | SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
14 | | |
15 | | Contributors: |
16 | | Roger Light - initial implementation and documentation. |
17 | | */ |
18 | | |
19 | | /* A note on matching topic subscriptions. |
20 | | * |
21 | | * Topics can be up to 32767 characters in length. The / character is used as a |
22 | | * hierarchy delimiter. Messages are published to a particular topic. |
23 | | * Clients may subscribe to particular topics directly, but may also use |
24 | | * wildcards in subscriptions. The + and # characters are used as wildcards. |
25 | | * The # wildcard can be used at the end of a subscription only, and is a |
26 | | * wildcard for the level of hierarchy at which it is placed and all subsequent |
27 | | * levels. |
28 | | * The + wildcard may be used at any point within the subscription and is a |
29 | | * wildcard for only the level of hierarchy at which it is placed. |
30 | | * Neither wildcard may be used as part of a substring. |
31 | | * Valid: |
32 | | * a/b/+ |
33 | | * a/+/c |
34 | | * a/# |
35 | | * a/b/# |
36 | | * # |
37 | | * +/b/c |
38 | | * +/+/+ |
39 | | * Invalid: |
40 | | * a/#/c |
41 | | * a+/b/c |
42 | | * Valid but non-matching: |
43 | | * a/b |
44 | | * a/+ |
45 | | * +/b |
46 | | * b/c/a |
47 | | * a/b/d |
48 | | */ |
49 | | |
50 | | #include "config.h" |
51 | | |
52 | | #include <assert.h> |
53 | | #include <stdio.h> |
54 | | #include <string.h> |
55 | | |
56 | | #include "mosquitto_broker_internal.h" |
57 | | #include "mosquitto/mqtt_protocol.h" |
58 | | #include "util_mosq.h" |
59 | | |
60 | | #include "utlist.h" |
61 | | |
62 | | static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len); |
63 | | |
64 | | static unsigned int hashv_plus = 0; |
65 | | static unsigned int hashv_hash = 0; |
66 | | |
67 | | |
68 | | static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
69 | 0 | { |
70 | 0 | bool client_retain; |
71 | 0 | uint16_t mid; |
72 | 0 | uint8_t client_qos, msg_qos; |
73 | 0 | int rc2; |
74 | | |
75 | | /* Check for ACL topic access. */ |
76 | 0 | rc2 = mosquitto_acl_check(leaf->context, topic, stored->data.payloadlen, stored->data.payload, stored->data.qos, stored->data.retain, MOSQ_ACL_READ); |
77 | 0 | if(rc2 == MOSQ_ERR_ACL_DENIED){ |
78 | 0 | return MOSQ_ERR_SUCCESS; |
79 | 0 | }else if(rc2 == MOSQ_ERR_SUCCESS){ |
80 | 0 | client_qos = MQTT_SUB_OPT_GET_QOS(leaf->subscription_options); |
81 | |
|
82 | 0 | if(db.config->upgrade_outgoing_qos){ |
83 | 0 | msg_qos = client_qos; |
84 | 0 | }else{ |
85 | 0 | if(qos > client_qos){ |
86 | 0 | msg_qos = client_qos; |
87 | 0 | }else{ |
88 | 0 | msg_qos = qos; |
89 | 0 | } |
90 | 0 | } |
91 | 0 | if(msg_qos){ |
92 | 0 | mid = mosquitto__mid_generate(leaf->context); |
93 | 0 | }else{ |
94 | 0 | mid = 0; |
95 | 0 | } |
96 | 0 | if(MQTT_SUB_OPT_GET_RETAIN_AS_PUBLISHED(leaf->subscription_options)){ |
97 | 0 | client_retain = retain; |
98 | 0 | }else{ |
99 | 0 | client_retain = false; |
100 | 0 | } |
101 | 0 | if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true, true) == 1){ |
102 | 0 | return 1; |
103 | 0 | } |
104 | 0 | }else{ |
105 | 0 | return 1; /* Application error */ |
106 | 0 | } |
107 | 0 | return 0; |
108 | 0 | } |
109 | | |
110 | | |
111 | | static int subs__shared_process(struct mosquitto__subhier *hier, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
112 | 0 | { |
113 | 0 | int rc = 0, rc2; |
114 | 0 | struct mosquitto__subshared *shared, *shared_tmp; |
115 | 0 | struct mosquitto__subleaf *leaf; |
116 | |
|
117 | 0 | HASH_ITER(hh, hier->shared, shared, shared_tmp){ |
118 | 0 | leaf = shared->subs; |
119 | 0 | rc2 = subs__send(leaf, topic, qos, retain, stored); |
120 | | /* Remove current from the top, add back to the bottom */ |
121 | 0 | DL_DELETE(shared->subs, leaf); |
122 | 0 | DL_APPEND(shared->subs, leaf); |
123 | |
|
124 | 0 | if(rc2){ |
125 | 0 | rc = 1; |
126 | 0 | } |
127 | 0 | } |
128 | |
|
129 | 0 | return rc; |
130 | 0 | } |
131 | | |
132 | | |
133 | | static int subs__process(struct mosquitto__subhier *hier, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
134 | 0 | { |
135 | 0 | int rc = 0; |
136 | 0 | int rc2; |
137 | 0 | struct mosquitto__subleaf *leaf; |
138 | |
|
139 | 0 | rc = subs__shared_process(hier, topic, qos, retain, stored); |
140 | |
|
141 | 0 | leaf = hier->subs; |
142 | 0 | while(source_id && leaf){ |
143 | 0 | if(!leaf->context->id || (MQTT_SUB_OPT_GET_NO_LOCAL(leaf->subscription_options) && !strcmp(leaf->context->id, source_id))){ |
144 | 0 | leaf = leaf->next; |
145 | 0 | continue; |
146 | 0 | } |
147 | 0 | rc2 = subs__send(leaf, topic, qos, retain, stored); |
148 | 0 | if(rc2){ |
149 | 0 | rc = 1; |
150 | 0 | } |
151 | 0 | leaf = leaf->next; |
152 | 0 | } |
153 | 0 | if(hier->subs || hier->shared){ |
154 | 0 | return rc; |
155 | 0 | }else{ |
156 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
157 | 0 | } |
158 | 0 | } |
159 | | |
160 | | |
161 | | static int sub__add_leaf(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf) |
162 | 0 | { |
163 | 0 | struct mosquitto__subleaf *leaf; |
164 | |
|
165 | 0 | *newleaf = NULL; |
166 | 0 | leaf = *head; |
167 | |
|
168 | 0 | while(leaf){ |
169 | 0 | if(leaf->context && leaf->context->id && !strcmp(leaf->context->id, context->id)){ |
170 | | /* Client making a second subscription to same topic. Only |
171 | | * need to update QoS. Return MOSQ_ERR_SUB_EXISTS to |
172 | | * indicate this to the calling function. */ |
173 | 0 | leaf->identifier = sub->identifier; |
174 | 0 | leaf->subscription_options = sub->options; |
175 | 0 | return MOSQ_ERR_SUB_EXISTS; |
176 | 0 | } |
177 | 0 | leaf = leaf->next; |
178 | 0 | } |
179 | 0 | leaf = mosquitto_calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic_filter) + 1); |
180 | 0 | if(!leaf){ |
181 | 0 | return MOSQ_ERR_NOMEM; |
182 | 0 | } |
183 | 0 | leaf->context = context; |
184 | 0 | leaf->identifier = sub->identifier; |
185 | 0 | leaf->subscription_options = sub->options; |
186 | 0 | strcpy(leaf->topic_filter, sub->topic_filter); |
187 | |
|
188 | 0 | DL_APPEND(*head, leaf); |
189 | 0 | *newleaf = leaf; |
190 | |
|
191 | 0 | return MOSQ_ERR_SUCCESS; |
192 | 0 | } |
193 | | |
194 | | |
195 | | static void sub__remove_shared_leaf(struct mosquitto__subhier *subhier, struct mosquitto__subshared *shared, struct mosquitto__subleaf *leaf) |
196 | 0 | { |
197 | 0 | DL_DELETE(shared->subs, leaf); |
198 | 0 | if(shared->subs == NULL){ |
199 | 0 | HASH_DELETE(hh, subhier->shared, shared); |
200 | 0 | mosquitto_FREE(shared); |
201 | 0 | } |
202 | 0 | } |
203 | | |
204 | | |
205 | | static int sub__add_shared(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, const char *sharename) |
206 | 0 | { |
207 | 0 | struct mosquitto__subleaf *newleaf; |
208 | 0 | struct mosquitto__subshared *shared = NULL; |
209 | 0 | struct mosquitto__subleaf **subs; |
210 | 0 | size_t slen; |
211 | 0 | int rc; |
212 | 0 | unsigned hashv; |
213 | |
|
214 | 0 | slen = strlen(sharename); |
215 | |
|
216 | 0 | HASH_VALUE(sharename, slen, hashv); |
217 | |
|
218 | 0 | HASH_FIND_BYHASHVALUE(hh, subhier->shared, sharename, slen, hashv, shared); |
219 | 0 | if(shared == NULL){ |
220 | 0 | shared = mosquitto_calloc(1, sizeof(struct mosquitto__subshared) + slen + 1); |
221 | 0 | if(!shared){ |
222 | 0 | return MOSQ_ERR_NOMEM; |
223 | 0 | } |
224 | 0 | strncpy(shared->name, sharename, slen+1); |
225 | |
|
226 | 0 | HASH_ADD_BYHASHVALUE(hh, subhier->shared, name, slen, hashv, shared); |
227 | 0 | } |
228 | | |
229 | 0 | rc = sub__add_leaf(context, sub, &shared->subs, &newleaf); |
230 | 0 | if(rc > 0){ |
231 | 0 | if(shared->subs == NULL){ |
232 | 0 | HASH_DELETE(hh, subhier->shared, shared); |
233 | 0 | mosquitto_FREE(shared); |
234 | 0 | } |
235 | 0 | return rc; |
236 | 0 | } |
237 | | |
238 | 0 | if(rc != MOSQ_ERR_SUB_EXISTS){ |
239 | 0 | newleaf->hier = subhier; |
240 | 0 | newleaf->shared = shared; |
241 | |
|
242 | 0 | bool assigned = false; |
243 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
244 | 0 | if(!context->subs[i]){ |
245 | 0 | context->subs[i] = newleaf; |
246 | 0 | context->subs_count++; |
247 | 0 | assigned = true; |
248 | 0 | break; |
249 | 0 | } |
250 | 0 | } |
251 | 0 | if(assigned == false){ |
252 | 0 | subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1)); |
253 | 0 | if(!subs){ |
254 | 0 | sub__remove_shared_leaf(subhier, shared, newleaf); |
255 | 0 | mosquitto_FREE(newleaf); |
256 | 0 | return MOSQ_ERR_NOMEM; |
257 | 0 | } |
258 | 0 | context->subs = subs; |
259 | 0 | context->subs_capacity++; |
260 | 0 | context->subs_count++; |
261 | 0 | context->subs[context->subs_capacity-1] = newleaf; |
262 | 0 | } |
263 | 0 | #ifdef WITH_SYS_TREE |
264 | 0 | db.shared_subscription_count++; |
265 | 0 | #endif |
266 | 0 | } |
267 | | |
268 | 0 | if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){ |
269 | 0 | return rc; |
270 | 0 | }else{ |
271 | | /* mqttv311/mqttv5 requires retained messages are resent on |
272 | | * resubscribe. */ |
273 | 0 | return MOSQ_ERR_SUCCESS; |
274 | 0 | } |
275 | 0 | } |
276 | | |
277 | | |
278 | | static int sub__add_normal(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier) |
279 | 0 | { |
280 | 0 | struct mosquitto__subleaf *newleaf = NULL; |
281 | 0 | struct mosquitto__subleaf **subs; |
282 | 0 | int rc; |
283 | |
|
284 | 0 | rc = sub__add_leaf(context, sub, &subhier->subs, &newleaf); |
285 | 0 | if(rc > 0){ |
286 | 0 | return rc; |
287 | 0 | } |
288 | | |
289 | 0 | if(rc != MOSQ_ERR_SUB_EXISTS){ |
290 | 0 | newleaf->hier = subhier; |
291 | 0 | newleaf->shared = NULL; |
292 | |
|
293 | 0 | bool assigned = false; |
294 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
295 | 0 | if(!context->subs[i]){ |
296 | 0 | context->subs[i] = newleaf; |
297 | 0 | context->subs_count++; |
298 | 0 | assigned = true; |
299 | 0 | break; |
300 | 0 | } |
301 | 0 | } |
302 | 0 | if(assigned == false){ |
303 | 0 | subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1)); |
304 | 0 | if(!subs){ |
305 | 0 | DL_DELETE(subhier->subs, newleaf); |
306 | 0 | mosquitto_FREE(newleaf); |
307 | 0 | return MOSQ_ERR_NOMEM; |
308 | 0 | } |
309 | 0 | context->subs = subs; |
310 | 0 | context->subs_capacity++; |
311 | 0 | context->subs_count++; |
312 | 0 | context->subs[context->subs_capacity-1] = newleaf; |
313 | 0 | } |
314 | 0 | #ifdef WITH_SYS_TREE |
315 | 0 | db.subscription_count++; |
316 | 0 | #endif |
317 | 0 | } |
318 | | |
319 | 0 | if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){ |
320 | 0 | return rc; |
321 | 0 | }else{ |
322 | | /* mqttv311/mqttv5 requires retained messages are resent on |
323 | | * resubscribe. */ |
324 | 0 | return MOSQ_ERR_SUCCESS; |
325 | 0 | } |
326 | 0 | } |
327 | | |
328 | | |
329 | | static int sub__add_context(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, char *const *const topics, const char *sharename) |
330 | 0 | { |
331 | 0 | struct mosquitto__subhier *branch; |
332 | 0 | int topic_index = 0; |
333 | 0 | size_t topiclen; |
334 | | |
335 | | /* Find leaf node */ |
336 | 0 | while(topics && topics[topic_index] != NULL){ |
337 | 0 | topiclen = strlen(topics[topic_index]); |
338 | 0 | if(topiclen > UINT16_MAX){ |
339 | 0 | return MOSQ_ERR_INVAL; |
340 | 0 | } |
341 | 0 | HASH_FIND(hh, subhier->children, topics[topic_index], topiclen, branch); |
342 | 0 | if(!branch){ |
343 | | /* Not found */ |
344 | 0 | branch = sub__add_hier_entry(subhier, &subhier->children, topics[topic_index], (uint16_t)topiclen); |
345 | 0 | if(!branch){ |
346 | 0 | return MOSQ_ERR_NOMEM; |
347 | 0 | } |
348 | 0 | } |
349 | 0 | subhier = branch; |
350 | 0 | topic_index++; |
351 | 0 | } |
352 | | |
353 | | /* Add add our context */ |
354 | 0 | if(context && context->id){ |
355 | 0 | if(sharename){ |
356 | 0 | return sub__add_shared(context, sub, subhier, sharename); |
357 | 0 | }else{ |
358 | 0 | return sub__add_normal(context, sub, subhier); |
359 | 0 | } |
360 | 0 | }else{ |
361 | 0 | return MOSQ_ERR_SUCCESS; |
362 | 0 | } |
363 | 0 | } |
364 | | |
365 | | |
366 | | static int sub__remove_normal(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason) |
367 | 0 | { |
368 | 0 | struct mosquitto__subleaf *leaf; |
369 | |
|
370 | 0 | leaf = subhier->subs; |
371 | 0 | while(leaf){ |
372 | 0 | if(leaf->context==context){ |
373 | 0 | #ifdef WITH_SYS_TREE |
374 | 0 | db.subscription_count--; |
375 | 0 | #endif |
376 | 0 | DL_DELETE(subhier->subs, leaf); |
377 | | |
378 | | /* Remove the reference to the sub that the client is keeping. |
379 | | * It would be nice to be able to use the reference directly, |
380 | | * but that would involve keeping a copy of the topic string in |
381 | | * each subleaf. Might be worth considering though. */ |
382 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
383 | 0 | if(context->subs[i] && context->subs[i]->hier == subhier){ |
384 | 0 | context->subs_count--; |
385 | 0 | mosquitto_free(context->subs[i]); |
386 | 0 | context->subs[i] = NULL; |
387 | 0 | break; |
388 | 0 | } |
389 | 0 | } |
390 | 0 | *reason = 0; |
391 | 0 | return MOSQ_ERR_SUCCESS; |
392 | 0 | } |
393 | 0 | leaf = leaf->next; |
394 | 0 | } |
395 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
396 | 0 | } |
397 | | |
398 | | |
399 | | static int sub__remove_shared(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason, const char *sharename) |
400 | 0 | { |
401 | 0 | struct mosquitto__subshared *shared; |
402 | |
|
403 | 0 | HASH_FIND(hh, subhier->shared, sharename, strlen(sharename), shared); |
404 | 0 | if(shared){ |
405 | 0 | struct mosquitto__subleaf *leaf = shared->subs; |
406 | 0 | while(leaf){ |
407 | 0 | if(leaf->context==context){ |
408 | 0 | #ifdef WITH_SYS_TREE |
409 | 0 | db.shared_subscription_count--; |
410 | 0 | #endif |
411 | 0 | DL_DELETE(shared->subs, leaf); |
412 | | |
413 | | /* Remove the reference to the sub that the client is keeping. |
414 | | * It would be nice to be able to use the reference directly, |
415 | | * but that would involve keeping a copy of the topic string in |
416 | | * each subleaf. Might be worth considering though. */ |
417 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
418 | 0 | if(context->subs[i] |
419 | 0 | && context->subs[i]->hier == subhier |
420 | 0 | && context->subs[i]->shared == shared){ |
421 | |
|
422 | 0 | mosquitto_free(context->subs[i]); |
423 | 0 | context->subs[i] = NULL; |
424 | 0 | context->subs_count--; |
425 | 0 | break; |
426 | 0 | } |
427 | 0 | } |
428 | |
|
429 | 0 | if(shared->subs == NULL){ |
430 | 0 | HASH_DELETE(hh, subhier->shared, shared); |
431 | 0 | mosquitto_FREE(shared); |
432 | 0 | } |
433 | |
|
434 | 0 | *reason = 0; |
435 | 0 | return MOSQ_ERR_SUCCESS; |
436 | 0 | } |
437 | 0 | leaf = leaf->next; |
438 | 0 | } |
439 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
440 | 0 | }else{ |
441 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
442 | 0 | } |
443 | 0 | } |
444 | | |
445 | | |
446 | | static int sub__remove_recurse(struct mosquitto *context, struct mosquitto__subhier *subhier, char **topics, uint8_t *reason, const char *sharename) |
447 | 0 | { |
448 | 0 | struct mosquitto__subhier *branch; |
449 | |
|
450 | 0 | if(topics == NULL || topics[0] == NULL){ |
451 | 0 | if(sharename){ |
452 | 0 | return sub__remove_shared(context, subhier, reason, sharename); |
453 | 0 | }else{ |
454 | 0 | return sub__remove_normal(context, subhier, reason); |
455 | 0 | } |
456 | 0 | } |
457 | | |
458 | 0 | HASH_FIND(hh, subhier->children, topics[0], strlen(topics[0]), branch); |
459 | 0 | if(branch){ |
460 | 0 | sub__remove_recurse(context, branch, &(topics[1]), reason, sharename); |
461 | 0 | if(!branch->children && !branch->subs && !branch->shared){ |
462 | 0 | HASH_DELETE(hh, subhier->children, branch); |
463 | 0 | mosquitto_FREE(branch); |
464 | 0 | } |
465 | 0 | } |
466 | 0 | return MOSQ_ERR_SUCCESS; |
467 | 0 | } |
468 | | |
469 | | |
470 | | static int sub__search(struct mosquitto__subhier *subhier, char **split_topics, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
471 | 0 | { |
472 | | /* FIXME - need to take into account source_id if the client is a bridge */ |
473 | 0 | struct mosquitto__subhier *branch; |
474 | 0 | int rc; |
475 | 0 | bool have_subscribers = false; |
476 | |
|
477 | 0 | if(split_topics && split_topics[0]){ |
478 | | /* Check for literal match */ |
479 | 0 | HASH_FIND(hh, subhier->children, split_topics[0], strlen(split_topics[0]), branch); |
480 | |
|
481 | 0 | if(branch){ |
482 | 0 | rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored); |
483 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
484 | 0 | have_subscribers = true; |
485 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
486 | 0 | return rc; |
487 | 0 | } |
488 | 0 | if(split_topics[1] == NULL){ /* End of list */ |
489 | 0 | rc = subs__process(branch, source_id, topic, qos, retain, stored); |
490 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
491 | 0 | have_subscribers = true; |
492 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
493 | 0 | return rc; |
494 | 0 | } |
495 | 0 | } |
496 | 0 | } |
497 | | |
498 | | /* Check for + match */ |
499 | 0 | HASH_FIND_BYHASHVALUE(hh, subhier->children, "+", 1, hashv_plus, branch); |
500 | |
|
501 | 0 | if(branch){ |
502 | 0 | rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored); |
503 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
504 | 0 | have_subscribers = true; |
505 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
506 | 0 | return rc; |
507 | 0 | } |
508 | 0 | if(split_topics[1] == NULL){ /* End of list */ |
509 | 0 | rc = subs__process(branch, source_id, topic, qos, retain, stored); |
510 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
511 | 0 | have_subscribers = true; |
512 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
513 | 0 | return rc; |
514 | 0 | } |
515 | 0 | } |
516 | 0 | } |
517 | 0 | } |
518 | | |
519 | | /* Check for # match */ |
520 | 0 | HASH_FIND_BYHASHVALUE(hh, subhier->children, "#", 1, hashv_hash, branch); |
521 | 0 | if(branch && !branch->children){ |
522 | | /* The topic matches due to a # wildcard - process the |
523 | | * subscriptions but *don't* return. Although this branch has ended |
524 | | * there may still be other subscriptions to deal with. |
525 | | */ |
526 | 0 | rc = subs__process(branch, source_id, topic, qos, retain, stored); |
527 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
528 | 0 | have_subscribers = true; |
529 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
530 | 0 | return rc; |
531 | 0 | } |
532 | 0 | } |
533 | | |
534 | 0 | if(have_subscribers){ |
535 | 0 | return MOSQ_ERR_SUCCESS; |
536 | 0 | }else{ |
537 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
538 | 0 | } |
539 | 0 | } |
540 | | |
541 | | |
542 | | static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len) |
543 | 0 | { |
544 | 0 | struct mosquitto__subhier *child; |
545 | |
|
546 | 0 | assert(sibling); |
547 | |
|
548 | 0 | child = mosquitto_calloc(1, sizeof(struct mosquitto__subhier) + len + 1); |
549 | 0 | if(!child){ |
550 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
551 | 0 | return NULL; |
552 | 0 | } |
553 | 0 | child->parent = parent; |
554 | 0 | child->topic_len = len; |
555 | 0 | if(len > 0){ |
556 | 0 | strncpy(child->topic, topic, len); |
557 | 0 | } |
558 | |
|
559 | 0 | HASH_ADD(hh, *sibling, topic, child->topic_len, child); |
560 | | |
561 | 0 | return child; |
562 | 0 | } |
563 | | |
564 | | |
565 | | int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub) |
566 | 0 | { |
567 | 0 | int rc = 0; |
568 | 0 | struct mosquitto__subhier *subhier; |
569 | 0 | const char *sharename = NULL; |
570 | 0 | char *local_sub; |
571 | 0 | char **topics; |
572 | 0 | size_t topiclen; |
573 | |
|
574 | 0 | assert(sub); |
575 | 0 | assert(sub->topic_filter); |
576 | |
|
577 | 0 | rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &topics, &sharename); |
578 | 0 | if(rc){ |
579 | 0 | return rc; |
580 | 0 | } |
581 | | |
582 | 0 | topiclen = strlen(topics[0]); |
583 | 0 | if(topiclen > UINT16_MAX){ |
584 | 0 | mosquitto_FREE(local_sub); |
585 | 0 | mosquitto_FREE(topics); |
586 | 0 | return MOSQ_ERR_INVAL; |
587 | 0 | } |
588 | | |
589 | 0 | if(sharename){ |
590 | 0 | HASH_FIND(hh, db.shared_subs, topics[0], topiclen, subhier); |
591 | 0 | if(!subhier){ |
592 | 0 | subhier = sub__add_hier_entry(NULL, &db.shared_subs, topics[0], (uint16_t)topiclen); |
593 | 0 | if(!subhier){ |
594 | 0 | mosquitto_FREE(local_sub); |
595 | 0 | mosquitto_FREE(topics); |
596 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
597 | 0 | return MOSQ_ERR_NOMEM; |
598 | 0 | } |
599 | 0 | } |
600 | 0 | }else{ |
601 | 0 | HASH_FIND(hh, db.normal_subs, topics[0], topiclen, subhier); |
602 | 0 | if(!subhier){ |
603 | 0 | subhier = sub__add_hier_entry(NULL, &db.normal_subs, topics[0], (uint16_t)topiclen); |
604 | 0 | if(!subhier){ |
605 | 0 | mosquitto_FREE(local_sub); |
606 | 0 | mosquitto_FREE(topics); |
607 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
608 | 0 | return MOSQ_ERR_NOMEM; |
609 | 0 | } |
610 | 0 | } |
611 | 0 | } |
612 | 0 | rc = sub__add_context(context, sub, subhier, topics, sharename); |
613 | |
|
614 | 0 | mosquitto_FREE(local_sub); |
615 | 0 | mosquitto_FREE(topics); |
616 | |
|
617 | 0 | return rc; |
618 | 0 | } |
619 | | |
620 | | |
621 | | int sub__remove(struct mosquitto *context, const char *sub, uint8_t *reason) |
622 | 0 | { |
623 | 0 | int rc = 0; |
624 | 0 | struct mosquitto__subhier *subhier; |
625 | 0 | const char *sharename = NULL; |
626 | 0 | char *local_sub = NULL; |
627 | 0 | char **topics = NULL; |
628 | |
|
629 | 0 | assert(sub); |
630 | |
|
631 | 0 | rc = sub__topic_tokenise(sub, &local_sub, &topics, &sharename); |
632 | 0 | if(rc){ |
633 | 0 | return rc; |
634 | 0 | } |
635 | | |
636 | 0 | if(sharename){ |
637 | 0 | HASH_FIND(hh, db.shared_subs, topics[0], strlen(topics[0]), subhier); |
638 | 0 | }else{ |
639 | 0 | HASH_FIND(hh, db.normal_subs, topics[0], strlen(topics[0]), subhier); |
640 | 0 | } |
641 | 0 | if(subhier){ |
642 | 0 | *reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED; |
643 | 0 | rc = sub__remove_recurse(context, subhier, topics, reason, sharename); |
644 | 0 | } |
645 | |
|
646 | 0 | mosquitto_FREE(local_sub); |
647 | 0 | mosquitto_FREE(topics); |
648 | |
|
649 | 0 | return rc; |
650 | 0 | } |
651 | | |
652 | | |
653 | | int sub__messages_queue(const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg **stored) |
654 | 0 | { |
655 | 0 | int rc = MOSQ_ERR_SUCCESS, rc2; |
656 | 0 | int rc_normal = MOSQ_ERR_NO_SUBSCRIBERS, rc_shared = MOSQ_ERR_NO_SUBSCRIBERS; |
657 | 0 | struct mosquitto__subhier *subhier; |
658 | 0 | char **split_topics = NULL; |
659 | 0 | char *local_topic = NULL; |
660 | 0 | unsigned hashv; |
661 | 0 | size_t topiclen; |
662 | |
|
663 | 0 | assert(topic); |
664 | |
|
665 | 0 | if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)){ |
666 | 0 | return 1; |
667 | 0 | } |
668 | | |
669 | | /* Protect this message until we have sent it to all |
670 | | clients - this is required because websockets client calls |
671 | | db__message_write(), which could remove the message if ref_count==0. |
672 | | */ |
673 | 0 | db__msg_store_ref_inc(*stored); |
674 | |
|
675 | 0 | topiclen = strlen(split_topics[0]); |
676 | 0 | HASH_VALUE(split_topics[0], topiclen, hashv); |
677 | 0 | HASH_FIND_BYHASHVALUE(hh, db.normal_subs, split_topics[0], topiclen, hashv, subhier); |
678 | 0 | if(subhier){ |
679 | 0 | rc_normal = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored); |
680 | 0 | if(rc_normal > 0){ |
681 | 0 | rc = rc_normal; |
682 | 0 | goto end; |
683 | 0 | } |
684 | 0 | } |
685 | | |
686 | 0 | HASH_FIND_BYHASHVALUE(hh, db.shared_subs, split_topics[0], topiclen, hashv, subhier); |
687 | 0 | if(subhier){ |
688 | 0 | rc_shared = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored); |
689 | 0 | if(rc_shared > 0){ |
690 | 0 | rc = rc_shared; |
691 | 0 | goto end; |
692 | 0 | } |
693 | 0 | } |
694 | | |
695 | 0 | if(rc_normal == MOSQ_ERR_NO_SUBSCRIBERS && rc_shared == MOSQ_ERR_NO_SUBSCRIBERS){ |
696 | 0 | rc = MOSQ_ERR_NO_SUBSCRIBERS; |
697 | 0 | } |
698 | |
|
699 | 0 | if(retain){ |
700 | 0 | rc2 = retain__store(topic, *stored, split_topics, true); |
701 | 0 | if(rc2){ |
702 | 0 | rc = rc2; |
703 | 0 | } |
704 | 0 | } |
705 | |
|
706 | 0 | end: |
707 | 0 | mosquitto_FREE(split_topics); |
708 | 0 | mosquitto_FREE(local_topic); |
709 | | /* Remove our reference and free if needed. */ |
710 | 0 | db__msg_store_ref_dec(stored); |
711 | |
|
712 | 0 | return rc; |
713 | 0 | } |
714 | | |
715 | | |
716 | | /* Remove a subhier element, and return its parent if that needs freeing as well. */ |
717 | | static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub) |
718 | 0 | { |
719 | 0 | struct mosquitto__subhier *parent; |
720 | |
|
721 | 0 | if(!sub || !sub->parent){ |
722 | 0 | return NULL; |
723 | 0 | } |
724 | | |
725 | 0 | if(sub->children || sub->subs){ |
726 | 0 | return NULL; |
727 | 0 | } |
728 | | |
729 | 0 | parent = sub->parent; |
730 | 0 | HASH_DELETE(hh, parent->children, sub); |
731 | 0 | mosquitto_FREE(sub); |
732 | |
|
733 | 0 | if(parent->subs == NULL |
734 | 0 | && parent->children == NULL |
735 | 0 | && parent->shared == NULL |
736 | 0 | && parent->parent){ |
737 | |
|
738 | 0 | return parent; |
739 | 0 | }else{ |
740 | 0 | return NULL; |
741 | 0 | } |
742 | 0 | } |
743 | | |
744 | | |
745 | | /* Remove all subscriptions for a client. |
746 | | */ |
747 | | int sub__clean_session(struct mosquitto *context) |
748 | 1.60k | { |
749 | 1.60k | for(int i=0; i<context->subs_capacity; i++){ |
750 | 0 | if(context->subs[i] == NULL || context->subs[i]->hier == NULL){ |
751 | 0 | continue; |
752 | 0 | } |
753 | | |
754 | 0 | struct mosquitto__subhier *hier = context->subs[i]->hier; |
755 | 0 | struct mosquitto__subleaf *leaf; |
756 | |
|
757 | 0 | plugin_persist__handle_subscription_delete(context, context->subs[i]->topic_filter); |
758 | 0 | if(context->subs[i]->shared){ |
759 | 0 | leaf = context->subs[i]->shared->subs; |
760 | 0 | while(leaf){ |
761 | 0 | if(leaf->context==context){ |
762 | 0 | #ifdef WITH_SYS_TREE |
763 | 0 | db.shared_subscription_count--; |
764 | 0 | #endif |
765 | 0 | sub__remove_shared_leaf(context->subs[i]->hier, context->subs[i]->shared, leaf); |
766 | 0 | break; |
767 | 0 | } |
768 | 0 | leaf = leaf->next; |
769 | 0 | } |
770 | 0 | }else{ |
771 | 0 | leaf = hier->subs; |
772 | 0 | while(leaf){ |
773 | 0 | if(leaf->context==context){ |
774 | 0 | #ifdef WITH_SYS_TREE |
775 | 0 | db.subscription_count--; |
776 | 0 | #endif |
777 | 0 | DL_DELETE(hier->subs, leaf); |
778 | 0 | break; |
779 | 0 | } |
780 | 0 | leaf = leaf->next; |
781 | 0 | } |
782 | 0 | } |
783 | 0 | mosquitto_FREE(context->subs[i]); |
784 | |
|
785 | 0 | if(hier->subs == NULL |
786 | 0 | && hier->children == NULL |
787 | 0 | && hier->shared == NULL |
788 | 0 | && hier->parent){ |
789 | |
|
790 | 0 | do{ |
791 | 0 | hier = tmp_remove_subs(hier); |
792 | 0 | }while(hier); |
793 | 0 | } |
794 | 0 | } |
795 | 1.60k | mosquitto_FREE(context->subs); |
796 | 1.60k | context->subs_capacity = 0; |
797 | 1.60k | context->subs_count = 0; |
798 | | |
799 | 1.60k | return MOSQ_ERR_SUCCESS; |
800 | 1.60k | } |
801 | | |
802 | | |
803 | | void sub__tree_print(struct mosquitto__subhier *root, int level) |
804 | 0 | { |
805 | 0 | int i; |
806 | 0 | struct mosquitto__subhier *branch, *branch_tmp; |
807 | 0 | struct mosquitto__subleaf *leaf; |
808 | |
|
809 | 0 | HASH_ITER(hh, root, branch, branch_tmp){ |
810 | 0 | if(level > -1){ |
811 | 0 | for(i=0; i<(level+2)*2; i++){ |
812 | 0 | printf(" "); |
813 | 0 | } |
814 | 0 | printf("%s", branch->topic); |
815 | 0 | leaf = branch->subs; |
816 | 0 | while(leaf){ |
817 | 0 | if(leaf->context){ |
818 | 0 | printf(" (%s, %d)", leaf->context->id, MQTT_SUB_OPT_GET_QOS(leaf->subscription_options)); |
819 | 0 | }else{ |
820 | 0 | printf(" (%s, %d)", "", MQTT_SUB_OPT_GET_QOS(leaf->subscription_options)); |
821 | 0 | } |
822 | 0 | leaf = leaf->next; |
823 | 0 | } |
824 | 0 | printf("\n"); |
825 | 0 | } |
826 | |
|
827 | 0 | sub__tree_print(branch->children, level+1); |
828 | 0 | } |
829 | 0 | } |
830 | | |
831 | | |
832 | | int sub__init(void) |
833 | 0 | { |
834 | 0 | HASH_VALUE("+", 1, hashv_plus); |
835 | 0 | HASH_VALUE("#", 1, hashv_hash); |
836 | |
|
837 | 0 | if(sub__add_hier_entry(NULL, &db.shared_subs, "", 0) == NULL |
838 | 0 | || sub__add_hier_entry(NULL, &db.normal_subs, "", 0) == NULL |
839 | 0 | || sub__add_hier_entry(NULL, &db.normal_subs, "$SYS", (uint16_t)strlen("$SYS")) == NULL |
840 | 0 | ){ |
841 | |
|
842 | 0 | return MOSQ_ERR_NOMEM; |
843 | 0 | }else{ |
844 | 0 | return MOSQ_ERR_SUCCESS; |
845 | 0 | } |
846 | 0 | } |