/src/mosquitto/src/subs.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | Copyright (c) 2010-2021 Roger Light <roger@atchoo.org> |
3 | | |
4 | | All rights reserved. This program and the accompanying materials |
5 | | are made available under the terms of the Eclipse Public License 2.0 |
6 | | and Eclipse Distribution License v1.0 which accompany this distribution. |
7 | | |
8 | | The Eclipse Public License is available at |
9 | | https://www.eclipse.org/legal/epl-2.0/ |
10 | | and the Eclipse Distribution License is available at |
11 | | http://www.eclipse.org/org/documents/edl-v10.php. |
12 | | |
13 | | SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
14 | | |
15 | | Contributors: |
16 | | Roger Light - initial implementation and documentation. |
17 | | */ |
18 | | |
19 | | /* A note on matching topic subscriptions. |
20 | | * |
21 | | * Topics can be up to 32767 characters in length. The / character is used as a |
22 | | * hierarchy delimiter. Messages are published to a particular topic. |
23 | | * Clients may subscribe to particular topics directly, but may also use |
24 | | * wildcards in subscriptions. The + and # characters are used as wildcards. |
25 | | * The # wildcard can be used at the end of a subscription only, and is a |
26 | | * wildcard for the level of hierarchy at which it is placed and all subsequent |
27 | | * levels. |
28 | | * The + wildcard may be used at any point within the subscription and is a |
29 | | * wildcard for only the level of hierarchy at which it is placed. |
30 | | * Neither wildcard may be used as part of a substring. |
31 | | * Valid: |
32 | | * a/b/+ |
33 | | * a/+/c |
34 | | * a/# |
35 | | * a/b/# |
36 | | * # |
37 | | * +/b/c |
38 | | * +/+/+ |
39 | | * Invalid: |
40 | | * a/#/c |
41 | | * a+/b/c |
42 | | * Valid but non-matching: |
43 | | * a/b |
44 | | * a/+ |
45 | | * +/b |
46 | | * b/c/a |
47 | | * a/b/d |
48 | | */ |
49 | | |
50 | | #include "config.h" |
51 | | |
52 | | #include <assert.h> |
53 | | #include <stdio.h> |
54 | | #include <string.h> |
55 | | |
56 | | #include "mosquitto_broker_internal.h" |
57 | | #include "mosquitto/mqtt_protocol.h" |
58 | | #include "util_mosq.h" |
59 | | |
60 | | #include "utlist.h" |
61 | | |
62 | | static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len); |
63 | | |
64 | | static unsigned int hashv_plus = 0; |
65 | | static unsigned int hashv_hash = 0; |
66 | | |
67 | | static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
68 | 0 | { |
69 | 0 | bool client_retain; |
70 | 0 | uint16_t mid; |
71 | 0 | uint8_t client_qos, msg_qos; |
72 | 0 | int rc2; |
73 | | |
74 | | /* Check for ACL topic access. */ |
75 | 0 | rc2 = mosquitto_acl_check(leaf->context, topic, stored->data.payloadlen, stored->data.payload, stored->data.qos, stored->data.retain, MOSQ_ACL_READ); |
76 | 0 | if(rc2 == MOSQ_ERR_ACL_DENIED){ |
77 | 0 | return MOSQ_ERR_SUCCESS; |
78 | 0 | }else if(rc2 == MOSQ_ERR_SUCCESS){ |
79 | 0 | client_qos = MQTT_SUB_OPT_GET_QOS(leaf->subscription_options); |
80 | |
|
81 | 0 | if(db.config->upgrade_outgoing_qos){ |
82 | 0 | msg_qos = client_qos; |
83 | 0 | }else{ |
84 | 0 | if(qos > client_qos){ |
85 | 0 | msg_qos = client_qos; |
86 | 0 | }else{ |
87 | 0 | msg_qos = qos; |
88 | 0 | } |
89 | 0 | } |
90 | 0 | if(msg_qos){ |
91 | 0 | mid = mosquitto__mid_generate(leaf->context); |
92 | 0 | }else{ |
93 | 0 | mid = 0; |
94 | 0 | } |
95 | 0 | if(MQTT_SUB_OPT_GET_RETAIN_AS_PUBLISHED(leaf->subscription_options)){ |
96 | 0 | client_retain = retain; |
97 | 0 | }else{ |
98 | 0 | client_retain = false; |
99 | 0 | } |
100 | 0 | if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true, true) == 1){ |
101 | 0 | return 1; |
102 | 0 | } |
103 | 0 | }else{ |
104 | 0 | return 1; /* Application error */ |
105 | 0 | } |
106 | 0 | return 0; |
107 | 0 | } |
108 | | |
109 | | |
110 | | static int subs__shared_process(struct mosquitto__subhier *hier, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
111 | 0 | { |
112 | 0 | int rc = 0, rc2; |
113 | 0 | struct mosquitto__subshared *shared, *shared_tmp; |
114 | 0 | struct mosquitto__subleaf *leaf; |
115 | |
|
116 | 0 | HASH_ITER(hh, hier->shared, shared, shared_tmp){ |
117 | 0 | leaf = shared->subs; |
118 | 0 | rc2 = subs__send(leaf, topic, qos, retain, stored); |
119 | | /* Remove current from the top, add back to the bottom */ |
120 | 0 | DL_DELETE(shared->subs, leaf); |
121 | 0 | DL_APPEND(shared->subs, leaf); |
122 | |
|
123 | 0 | if(rc2) rc = 1; |
124 | 0 | } |
125 | | |
126 | 0 | return rc; |
127 | 0 | } |
128 | | |
129 | | static int subs__process(struct mosquitto__subhier *hier, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
130 | 0 | { |
131 | 0 | int rc = 0; |
132 | 0 | int rc2; |
133 | 0 | struct mosquitto__subleaf *leaf; |
134 | |
|
135 | 0 | rc = subs__shared_process(hier, topic, qos, retain, stored); |
136 | |
|
137 | 0 | leaf = hier->subs; |
138 | 0 | while(source_id && leaf){ |
139 | 0 | if(!leaf->context->id || (MQTT_SUB_OPT_GET_NO_LOCAL(leaf->subscription_options) && !strcmp(leaf->context->id, source_id))){ |
140 | 0 | leaf = leaf->next; |
141 | 0 | continue; |
142 | 0 | } |
143 | 0 | rc2 = subs__send(leaf, topic, qos, retain, stored); |
144 | 0 | if(rc2){ |
145 | 0 | rc = 1; |
146 | 0 | } |
147 | 0 | leaf = leaf->next; |
148 | 0 | } |
149 | 0 | if(hier->subs || hier->shared){ |
150 | 0 | return rc; |
151 | 0 | }else{ |
152 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
153 | 0 | } |
154 | 0 | } |
155 | | |
156 | | |
157 | | static int sub__add_leaf(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf) |
158 | 0 | { |
159 | 0 | struct mosquitto__subleaf *leaf; |
160 | |
|
161 | 0 | *newleaf = NULL; |
162 | 0 | leaf = *head; |
163 | |
|
164 | 0 | while(leaf){ |
165 | 0 | if(leaf->context && leaf->context->id && !strcmp(leaf->context->id, context->id)){ |
166 | | /* Client making a second subscription to same topic. Only |
167 | | * need to update QoS. Return MOSQ_ERR_SUB_EXISTS to |
168 | | * indicate this to the calling function. */ |
169 | 0 | leaf->identifier = sub->identifier; |
170 | 0 | leaf->subscription_options = sub->options; |
171 | 0 | return MOSQ_ERR_SUB_EXISTS; |
172 | 0 | } |
173 | 0 | leaf = leaf->next; |
174 | 0 | } |
175 | 0 | leaf = mosquitto_calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic_filter) + 1); |
176 | 0 | if(!leaf) return MOSQ_ERR_NOMEM; |
177 | 0 | leaf->context = context; |
178 | 0 | leaf->identifier = sub->identifier; |
179 | 0 | leaf->subscription_options = sub->options; |
180 | 0 | strcpy(leaf->topic_filter, sub->topic_filter); |
181 | |
|
182 | 0 | DL_APPEND(*head, leaf); |
183 | 0 | *newleaf = leaf; |
184 | |
|
185 | 0 | return MOSQ_ERR_SUCCESS; |
186 | 0 | } |
187 | | |
188 | | |
189 | | static void sub__remove_shared_leaf(struct mosquitto__subhier *subhier, struct mosquitto__subshared *shared, struct mosquitto__subleaf *leaf) |
190 | 0 | { |
191 | 0 | DL_DELETE(shared->subs, leaf); |
192 | 0 | if(shared->subs == NULL){ |
193 | 0 | HASH_DELETE(hh, subhier->shared, shared); |
194 | 0 | mosquitto_FREE(shared); |
195 | 0 | } |
196 | 0 | } |
197 | | |
198 | | |
199 | | static int sub__add_shared(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, const char *sharename) |
200 | 0 | { |
201 | 0 | struct mosquitto__subleaf *newleaf; |
202 | 0 | struct mosquitto__subshared *shared = NULL; |
203 | 0 | struct mosquitto__subleaf **subs; |
204 | 0 | size_t slen; |
205 | 0 | int rc; |
206 | 0 | unsigned hashv; |
207 | |
|
208 | 0 | slen = strlen(sharename); |
209 | |
|
210 | 0 | HASH_VALUE(sharename, slen, hashv); |
211 | |
|
212 | 0 | HASH_FIND_BYHASHVALUE(hh, subhier->shared, sharename, slen, hashv, shared); |
213 | 0 | if(shared == NULL){ |
214 | 0 | shared = mosquitto_calloc(1, sizeof(struct mosquitto__subshared) + slen + 1); |
215 | 0 | if(!shared){ |
216 | 0 | return MOSQ_ERR_NOMEM; |
217 | 0 | } |
218 | 0 | strncpy(shared->name, sharename, slen+1); |
219 | |
|
220 | 0 | HASH_ADD_BYHASHVALUE(hh, subhier->shared, name, slen, hashv, shared); |
221 | 0 | } |
222 | | |
223 | 0 | rc = sub__add_leaf(context, sub, &shared->subs, &newleaf); |
224 | 0 | if(rc > 0){ |
225 | 0 | if(shared->subs == NULL){ |
226 | 0 | HASH_DELETE(hh, subhier->shared, shared); |
227 | 0 | mosquitto_FREE(shared); |
228 | 0 | } |
229 | 0 | return rc; |
230 | 0 | } |
231 | | |
232 | 0 | if(rc != MOSQ_ERR_SUB_EXISTS){ |
233 | 0 | newleaf->hier = subhier; |
234 | 0 | newleaf->shared = shared; |
235 | |
|
236 | 0 | bool assigned = false; |
237 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
238 | 0 | if(!context->subs[i]){ |
239 | 0 | context->subs[i] = newleaf; |
240 | 0 | context->subs_count++; |
241 | 0 | assigned = true; |
242 | 0 | break; |
243 | 0 | } |
244 | 0 | } |
245 | 0 | if(assigned == false){ |
246 | 0 | subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1)); |
247 | 0 | if(!subs){ |
248 | 0 | sub__remove_shared_leaf(subhier, shared, newleaf); |
249 | 0 | mosquitto_FREE(newleaf); |
250 | 0 | return MOSQ_ERR_NOMEM; |
251 | 0 | } |
252 | 0 | context->subs = subs; |
253 | 0 | context->subs_capacity++; |
254 | 0 | context->subs_count++; |
255 | 0 | context->subs[context->subs_capacity-1] = newleaf; |
256 | 0 | } |
257 | 0 | #ifdef WITH_SYS_TREE |
258 | 0 | db.shared_subscription_count++; |
259 | 0 | #endif |
260 | 0 | } |
261 | | |
262 | 0 | if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){ |
263 | 0 | return rc; |
264 | 0 | }else{ |
265 | | /* mqttv311/mqttv5 requires retained messages are resent on |
266 | | * resubscribe. */ |
267 | 0 | return MOSQ_ERR_SUCCESS; |
268 | 0 | } |
269 | 0 | } |
270 | | |
271 | | |
272 | | static int sub__add_normal(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier) |
273 | 0 | { |
274 | 0 | struct mosquitto__subleaf *newleaf = NULL; |
275 | 0 | struct mosquitto__subleaf **subs; |
276 | 0 | int rc; |
277 | |
|
278 | 0 | rc = sub__add_leaf(context, sub, &subhier->subs, &newleaf); |
279 | 0 | if(rc > 0){ |
280 | 0 | return rc; |
281 | 0 | } |
282 | | |
283 | 0 | if(rc != MOSQ_ERR_SUB_EXISTS){ |
284 | 0 | newleaf->hier = subhier; |
285 | 0 | newleaf->shared = NULL; |
286 | |
|
287 | 0 | bool assigned = false; |
288 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
289 | 0 | if(!context->subs[i]){ |
290 | 0 | context->subs[i] = newleaf; |
291 | 0 | context->subs_count++; |
292 | 0 | assigned = true; |
293 | 0 | break; |
294 | 0 | } |
295 | 0 | } |
296 | 0 | if(assigned == false){ |
297 | 0 | subs = mosquitto_realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->subs_capacity + 1)); |
298 | 0 | if(!subs){ |
299 | 0 | DL_DELETE(subhier->subs, newleaf); |
300 | 0 | mosquitto_FREE(newleaf); |
301 | 0 | return MOSQ_ERR_NOMEM; |
302 | 0 | } |
303 | 0 | context->subs = subs; |
304 | 0 | context->subs_capacity++; |
305 | 0 | context->subs_count++; |
306 | 0 | context->subs[context->subs_capacity-1] = newleaf; |
307 | 0 | } |
308 | 0 | #ifdef WITH_SYS_TREE |
309 | 0 | db.subscription_count++; |
310 | 0 | #endif |
311 | 0 | } |
312 | | |
313 | 0 | if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){ |
314 | 0 | return rc; |
315 | 0 | }else{ |
316 | | /* mqttv311/mqttv5 requires retained messages are resent on |
317 | | * resubscribe. */ |
318 | 0 | return MOSQ_ERR_SUCCESS; |
319 | 0 | } |
320 | 0 | } |
321 | | |
322 | | |
323 | | static int sub__add_context(struct mosquitto *context, const struct mosquitto_subscription *sub, struct mosquitto__subhier *subhier, char *const *const topics, const char *sharename) |
324 | 0 | { |
325 | 0 | struct mosquitto__subhier *branch; |
326 | 0 | int topic_index = 0; |
327 | 0 | size_t topiclen; |
328 | | |
329 | | /* Find leaf node */ |
330 | 0 | while(topics && topics[topic_index] != NULL){ |
331 | 0 | topiclen = strlen(topics[topic_index]); |
332 | 0 | if(topiclen > UINT16_MAX){ |
333 | 0 | return MOSQ_ERR_INVAL; |
334 | 0 | } |
335 | 0 | HASH_FIND(hh, subhier->children, topics[topic_index], topiclen, branch); |
336 | 0 | if(!branch){ |
337 | | /* Not found */ |
338 | 0 | branch = sub__add_hier_entry(subhier, &subhier->children, topics[topic_index], (uint16_t)topiclen); |
339 | 0 | if(!branch) return MOSQ_ERR_NOMEM; |
340 | 0 | } |
341 | 0 | subhier = branch; |
342 | 0 | topic_index++; |
343 | 0 | } |
344 | | |
345 | | /* Add add our context */ |
346 | 0 | if(context && context->id){ |
347 | 0 | if(sharename){ |
348 | 0 | return sub__add_shared(context, sub, subhier, sharename); |
349 | 0 | }else{ |
350 | 0 | return sub__add_normal(context, sub, subhier); |
351 | 0 | } |
352 | 0 | }else{ |
353 | 0 | return MOSQ_ERR_SUCCESS; |
354 | 0 | } |
355 | 0 | } |
356 | | |
357 | | |
358 | | static int sub__remove_normal(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason) |
359 | 0 | { |
360 | 0 | struct mosquitto__subleaf *leaf; |
361 | |
|
362 | 0 | leaf = subhier->subs; |
363 | 0 | while(leaf){ |
364 | 0 | if(leaf->context==context){ |
365 | 0 | #ifdef WITH_SYS_TREE |
366 | 0 | db.subscription_count--; |
367 | 0 | #endif |
368 | 0 | DL_DELETE(subhier->subs, leaf); |
369 | | |
370 | | /* Remove the reference to the sub that the client is keeping. |
371 | | * It would be nice to be able to use the reference directly, |
372 | | * but that would involve keeping a copy of the topic string in |
373 | | * each subleaf. Might be worth considering though. */ |
374 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
375 | 0 | if(context->subs[i] && context->subs[i]->hier == subhier){ |
376 | 0 | context->subs_count--; |
377 | 0 | mosquitto_free(context->subs[i]); |
378 | 0 | context->subs[i] = NULL; |
379 | 0 | break; |
380 | 0 | } |
381 | 0 | } |
382 | 0 | *reason = 0; |
383 | 0 | return MOSQ_ERR_SUCCESS; |
384 | 0 | } |
385 | 0 | leaf = leaf->next; |
386 | 0 | } |
387 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
388 | 0 | } |
389 | | |
390 | | |
391 | | static int sub__remove_shared(struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason, const char *sharename) |
392 | 0 | { |
393 | 0 | struct mosquitto__subshared *shared; |
394 | |
|
395 | 0 | HASH_FIND(hh, subhier->shared, sharename, strlen(sharename), shared); |
396 | 0 | if(shared){ |
397 | 0 | struct mosquitto__subleaf *leaf = shared->subs; |
398 | 0 | while(leaf){ |
399 | 0 | if(leaf->context==context){ |
400 | 0 | #ifdef WITH_SYS_TREE |
401 | 0 | db.shared_subscription_count--; |
402 | 0 | #endif |
403 | 0 | DL_DELETE(shared->subs, leaf); |
404 | | |
405 | | /* Remove the reference to the sub that the client is keeping. |
406 | | * It would be nice to be able to use the reference directly, |
407 | | * but that would involve keeping a copy of the topic string in |
408 | | * each subleaf. Might be worth considering though. */ |
409 | 0 | for(int i=0; i<context->subs_capacity; i++){ |
410 | 0 | if(context->subs[i] |
411 | 0 | && context->subs[i]->hier == subhier |
412 | 0 | && context->subs[i]->shared == shared){ |
413 | |
|
414 | 0 | mosquitto_free(context->subs[i]); |
415 | 0 | context->subs[i] = NULL; |
416 | 0 | context->subs_count--; |
417 | 0 | break; |
418 | 0 | } |
419 | 0 | } |
420 | |
|
421 | 0 | if(shared->subs == NULL){ |
422 | 0 | HASH_DELETE(hh, subhier->shared, shared); |
423 | 0 | mosquitto_FREE(shared); |
424 | 0 | } |
425 | |
|
426 | 0 | *reason = 0; |
427 | 0 | return MOSQ_ERR_SUCCESS; |
428 | 0 | } |
429 | 0 | leaf = leaf->next; |
430 | 0 | } |
431 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
432 | 0 | }else{ |
433 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
434 | 0 | } |
435 | 0 | } |
436 | | |
437 | | |
438 | | static int sub__remove_recurse(struct mosquitto *context, struct mosquitto__subhier *subhier, char **topics, uint8_t *reason, const char *sharename) |
439 | 0 | { |
440 | 0 | struct mosquitto__subhier *branch; |
441 | |
|
442 | 0 | if(topics == NULL || topics[0] == NULL){ |
443 | 0 | if(sharename){ |
444 | 0 | return sub__remove_shared(context, subhier, reason, sharename); |
445 | 0 | }else{ |
446 | 0 | return sub__remove_normal(context, subhier, reason); |
447 | 0 | } |
448 | 0 | } |
449 | | |
450 | 0 | HASH_FIND(hh, subhier->children, topics[0], strlen(topics[0]), branch); |
451 | 0 | if(branch){ |
452 | 0 | sub__remove_recurse(context, branch, &(topics[1]), reason, sharename); |
453 | 0 | if(!branch->children && !branch->subs && !branch->shared){ |
454 | 0 | HASH_DELETE(hh, subhier->children, branch); |
455 | 0 | mosquitto_FREE(branch); |
456 | 0 | } |
457 | 0 | } |
458 | 0 | return MOSQ_ERR_SUCCESS; |
459 | 0 | } |
460 | | |
461 | | |
462 | | static int sub__search(struct mosquitto__subhier *subhier, char **split_topics, const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg *stored) |
463 | 0 | { |
464 | | /* FIXME - need to take into account source_id if the client is a bridge */ |
465 | 0 | struct mosquitto__subhier *branch; |
466 | 0 | int rc; |
467 | 0 | bool have_subscribers = false; |
468 | |
|
469 | 0 | if(split_topics && split_topics[0]){ |
470 | | /* Check for literal match */ |
471 | 0 | HASH_FIND(hh, subhier->children, split_topics[0], strlen(split_topics[0]), branch); |
472 | |
|
473 | 0 | if(branch){ |
474 | 0 | rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored); |
475 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
476 | 0 | have_subscribers = true; |
477 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
478 | 0 | return rc; |
479 | 0 | } |
480 | 0 | if(split_topics[1] == NULL){ /* End of list */ |
481 | 0 | rc = subs__process(branch, source_id, topic, qos, retain, stored); |
482 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
483 | 0 | have_subscribers = true; |
484 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
485 | 0 | return rc; |
486 | 0 | } |
487 | 0 | } |
488 | 0 | } |
489 | | |
490 | | /* Check for + match */ |
491 | 0 | HASH_FIND_BYHASHVALUE(hh, subhier->children, "+", 1, hashv_plus, branch); |
492 | |
|
493 | 0 | if(branch){ |
494 | 0 | rc = sub__search(branch, &(split_topics[1]), source_id, topic, qos, retain, stored); |
495 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
496 | 0 | have_subscribers = true; |
497 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
498 | 0 | return rc; |
499 | 0 | } |
500 | 0 | if(split_topics[1] == NULL){ /* End of list */ |
501 | 0 | rc = subs__process(branch, source_id, topic, qos, retain, stored); |
502 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
503 | 0 | have_subscribers = true; |
504 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
505 | 0 | return rc; |
506 | 0 | } |
507 | 0 | } |
508 | 0 | } |
509 | 0 | } |
510 | | |
511 | | /* Check for # match */ |
512 | 0 | HASH_FIND_BYHASHVALUE(hh, subhier->children, "#", 1, hashv_hash, branch); |
513 | 0 | if(branch && !branch->children){ |
514 | | /* The topic matches due to a # wildcard - process the |
515 | | * subscriptions but *don't* return. Although this branch has ended |
516 | | * there may still be other subscriptions to deal with. |
517 | | */ |
518 | 0 | rc = subs__process(branch, source_id, topic, qos, retain, stored); |
519 | 0 | if(rc == MOSQ_ERR_SUCCESS){ |
520 | 0 | have_subscribers = true; |
521 | 0 | }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ |
522 | 0 | return rc; |
523 | 0 | } |
524 | 0 | } |
525 | | |
526 | 0 | if(have_subscribers){ |
527 | 0 | return MOSQ_ERR_SUCCESS; |
528 | 0 | }else{ |
529 | 0 | return MOSQ_ERR_NO_SUBSCRIBERS; |
530 | 0 | } |
531 | 0 | } |
532 | | |
533 | | |
534 | | static struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, uint16_t len) |
535 | 0 | { |
536 | 0 | struct mosquitto__subhier *child; |
537 | |
|
538 | 0 | assert(sibling); |
539 | | |
540 | 0 | child = mosquitto_calloc(1, sizeof(struct mosquitto__subhier) + len + 1); |
541 | 0 | if(!child){ |
542 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
543 | 0 | return NULL; |
544 | 0 | } |
545 | 0 | child->parent = parent; |
546 | 0 | child->topic_len = len; |
547 | 0 | if(len > 0){ |
548 | 0 | strncpy(child->topic, topic, len); |
549 | 0 | } |
550 | |
|
551 | 0 | HASH_ADD(hh, *sibling, topic, child->topic_len, child); |
552 | | |
553 | 0 | return child; |
554 | 0 | } |
555 | | |
556 | | |
557 | | int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub) |
558 | 0 | { |
559 | 0 | int rc = 0; |
560 | 0 | struct mosquitto__subhier *subhier; |
561 | 0 | const char *sharename = NULL; |
562 | 0 | char *local_sub; |
563 | 0 | char **topics; |
564 | 0 | size_t topiclen; |
565 | |
|
566 | 0 | assert(sub); |
567 | 0 | assert(sub->topic_filter); |
568 | | |
569 | 0 | rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &topics, &sharename); |
570 | 0 | if(rc) return rc; |
571 | | |
572 | 0 | topiclen = strlen(topics[0]); |
573 | 0 | if(topiclen > UINT16_MAX){ |
574 | 0 | mosquitto_FREE(local_sub); |
575 | 0 | mosquitto_FREE(topics); |
576 | 0 | return MOSQ_ERR_INVAL; |
577 | 0 | } |
578 | | |
579 | 0 | if(sharename){ |
580 | 0 | HASH_FIND(hh, db.shared_subs, topics[0], topiclen, subhier); |
581 | 0 | if(!subhier){ |
582 | 0 | subhier = sub__add_hier_entry(NULL, &db.shared_subs, topics[0], (uint16_t)topiclen); |
583 | 0 | if(!subhier){ |
584 | 0 | mosquitto_FREE(local_sub); |
585 | 0 | mosquitto_FREE(topics); |
586 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
587 | 0 | return MOSQ_ERR_NOMEM; |
588 | 0 | } |
589 | 0 | } |
590 | 0 | }else{ |
591 | 0 | HASH_FIND(hh, db.normal_subs, topics[0], topiclen, subhier); |
592 | 0 | if(!subhier){ |
593 | 0 | subhier = sub__add_hier_entry(NULL, &db.normal_subs, topics[0], (uint16_t)topiclen); |
594 | 0 | if(!subhier){ |
595 | 0 | mosquitto_FREE(local_sub); |
596 | 0 | mosquitto_FREE(topics); |
597 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
598 | 0 | return MOSQ_ERR_NOMEM; |
599 | 0 | } |
600 | 0 | } |
601 | 0 | } |
602 | 0 | rc = sub__add_context(context, sub, subhier, topics, sharename); |
603 | |
|
604 | 0 | mosquitto_FREE(local_sub); |
605 | 0 | mosquitto_FREE(topics); |
606 | |
|
607 | 0 | return rc; |
608 | 0 | } |
609 | | |
610 | | int sub__remove(struct mosquitto *context, const char *sub, uint8_t *reason) |
611 | 0 | { |
612 | 0 | int rc = 0; |
613 | 0 | struct mosquitto__subhier *subhier; |
614 | 0 | const char *sharename = NULL; |
615 | 0 | char *local_sub = NULL; |
616 | 0 | char **topics = NULL; |
617 | |
|
618 | 0 | assert(sub); |
619 | | |
620 | 0 | rc = sub__topic_tokenise(sub, &local_sub, &topics, &sharename); |
621 | 0 | if(rc) return rc; |
622 | | |
623 | 0 | if(sharename){ |
624 | 0 | HASH_FIND(hh, db.shared_subs, topics[0], strlen(topics[0]), subhier); |
625 | 0 | }else{ |
626 | 0 | HASH_FIND(hh, db.normal_subs, topics[0], strlen(topics[0]), subhier); |
627 | 0 | } |
628 | 0 | if(subhier){ |
629 | 0 | *reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED; |
630 | 0 | rc = sub__remove_recurse(context, subhier, topics, reason, sharename); |
631 | 0 | } |
632 | |
|
633 | 0 | mosquitto_FREE(local_sub); |
634 | 0 | mosquitto_FREE(topics); |
635 | |
|
636 | 0 | return rc; |
637 | 0 | } |
638 | | |
639 | | int sub__messages_queue(const char *source_id, const char *topic, uint8_t qos, int retain, struct mosquitto__base_msg **stored) |
640 | 0 | { |
641 | 0 | int rc = MOSQ_ERR_SUCCESS, rc2; |
642 | 0 | int rc_normal = MOSQ_ERR_NO_SUBSCRIBERS, rc_shared = MOSQ_ERR_NO_SUBSCRIBERS; |
643 | 0 | struct mosquitto__subhier *subhier; |
644 | 0 | char **split_topics = NULL; |
645 | 0 | char *local_topic = NULL; |
646 | 0 | unsigned hashv; |
647 | 0 | size_t topiclen; |
648 | |
|
649 | 0 | assert(topic); |
650 | | |
651 | 0 | if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)) return 1; |
652 | | |
653 | | /* Protect this message until we have sent it to all |
654 | | clients - this is required because websockets client calls |
655 | | db__message_write(), which could remove the message if ref_count==0. |
656 | | */ |
657 | 0 | db__msg_store_ref_inc(*stored); |
658 | |
|
659 | 0 | topiclen = strlen(split_topics[0]); |
660 | 0 | HASH_VALUE(split_topics[0], topiclen, hashv); |
661 | 0 | HASH_FIND_BYHASHVALUE(hh, db.normal_subs, split_topics[0], topiclen, hashv, subhier); |
662 | 0 | if(subhier){ |
663 | 0 | rc_normal = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored); |
664 | 0 | if(rc_normal > 0){ |
665 | 0 | rc = rc_normal; |
666 | 0 | goto end; |
667 | 0 | } |
668 | 0 | } |
669 | | |
670 | 0 | HASH_FIND_BYHASHVALUE(hh, db.shared_subs, split_topics[0], topiclen, hashv, subhier); |
671 | 0 | if(subhier){ |
672 | 0 | rc_shared = sub__search(subhier, split_topics, source_id, topic, qos, retain, *stored); |
673 | 0 | if(rc_shared > 0){ |
674 | 0 | rc = rc_shared; |
675 | 0 | goto end; |
676 | 0 | } |
677 | 0 | } |
678 | | |
679 | 0 | if(rc_normal == MOSQ_ERR_NO_SUBSCRIBERS && rc_shared == MOSQ_ERR_NO_SUBSCRIBERS){ |
680 | 0 | rc = MOSQ_ERR_NO_SUBSCRIBERS; |
681 | 0 | } |
682 | |
|
683 | 0 | if(retain){ |
684 | 0 | rc2 = retain__store(topic, *stored, split_topics, true); |
685 | 0 | if(rc2) rc = rc2; |
686 | 0 | } |
687 | |
|
688 | 0 | end: |
689 | 0 | mosquitto_FREE(split_topics); |
690 | 0 | mosquitto_FREE(local_topic); |
691 | | /* Remove our reference and free if needed. */ |
692 | 0 | db__msg_store_ref_dec(stored); |
693 | |
|
694 | 0 | return rc; |
695 | 0 | } |
696 | | |
697 | | |
698 | | /* Remove a subhier element, and return its parent if that needs freeing as well. */ |
699 | | static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub) |
700 | 0 | { |
701 | 0 | struct mosquitto__subhier *parent; |
702 | |
|
703 | 0 | if(!sub || !sub->parent){ |
704 | 0 | return NULL; |
705 | 0 | } |
706 | | |
707 | 0 | if(sub->children || sub->subs){ |
708 | 0 | return NULL; |
709 | 0 | } |
710 | | |
711 | 0 | parent = sub->parent; |
712 | 0 | HASH_DELETE(hh, parent->children, sub); |
713 | 0 | mosquitto_FREE(sub); |
714 | |
|
715 | 0 | if(parent->subs == NULL |
716 | 0 | && parent->children == NULL |
717 | 0 | && parent->shared == NULL |
718 | 0 | && parent->parent){ |
719 | |
|
720 | 0 | return parent; |
721 | 0 | }else{ |
722 | 0 | return NULL; |
723 | 0 | } |
724 | 0 | } |
725 | | |
726 | | |
727 | | /* Remove all subscriptions for a client. |
728 | | */ |
729 | | int sub__clean_session(struct mosquitto *context) |
730 | 1.55k | { |
731 | 1.55k | for(int i=0; i<context->subs_capacity; i++){ |
732 | 0 | if(context->subs[i] == NULL || context->subs[i]->hier == NULL){ |
733 | 0 | continue; |
734 | 0 | } |
735 | | |
736 | 0 | struct mosquitto__subhier *hier = context->subs[i]->hier; |
737 | 0 | struct mosquitto__subleaf *leaf; |
738 | |
|
739 | 0 | plugin_persist__handle_subscription_delete(context, context->subs[i]->topic_filter); |
740 | 0 | if(context->subs[i]->shared){ |
741 | 0 | leaf = context->subs[i]->shared->subs; |
742 | 0 | while(leaf){ |
743 | 0 | if(leaf->context==context){ |
744 | 0 | #ifdef WITH_SYS_TREE |
745 | 0 | db.shared_subscription_count--; |
746 | 0 | #endif |
747 | 0 | sub__remove_shared_leaf(context->subs[i]->hier, context->subs[i]->shared, leaf); |
748 | 0 | break; |
749 | 0 | } |
750 | 0 | leaf = leaf->next; |
751 | 0 | } |
752 | 0 | }else{ |
753 | 0 | leaf = hier->subs; |
754 | 0 | while(leaf){ |
755 | 0 | if(leaf->context==context){ |
756 | 0 | #ifdef WITH_SYS_TREE |
757 | 0 | db.subscription_count--; |
758 | 0 | #endif |
759 | 0 | DL_DELETE(hier->subs, leaf); |
760 | 0 | break; |
761 | 0 | } |
762 | 0 | leaf = leaf->next; |
763 | 0 | } |
764 | 0 | } |
765 | 0 | mosquitto_FREE(context->subs[i]); |
766 | |
|
767 | 0 | if(hier->subs == NULL |
768 | 0 | && hier->children == NULL |
769 | 0 | && hier->shared == NULL |
770 | 0 | && hier->parent){ |
771 | |
|
772 | 0 | do{ |
773 | 0 | hier = tmp_remove_subs(hier); |
774 | 0 | }while(hier); |
775 | 0 | } |
776 | 0 | } |
777 | 1.55k | mosquitto_FREE(context->subs); |
778 | 1.55k | context->subs_capacity = 0; |
779 | 1.55k | context->subs_count = 0; |
780 | | |
781 | 1.55k | return MOSQ_ERR_SUCCESS; |
782 | 1.55k | } |
783 | | |
784 | | void sub__tree_print(struct mosquitto__subhier *root, int level) |
785 | 0 | { |
786 | 0 | int i; |
787 | 0 | struct mosquitto__subhier *branch, *branch_tmp; |
788 | 0 | struct mosquitto__subleaf *leaf; |
789 | |
|
790 | 0 | HASH_ITER(hh, root, branch, branch_tmp){ |
791 | 0 | if(level > -1){ |
792 | 0 | for(i=0; i<(level+2)*2; i++){ |
793 | 0 | printf(" "); |
794 | 0 | } |
795 | 0 | printf("%s", branch->topic); |
796 | 0 | leaf = branch->subs; |
797 | 0 | while(leaf){ |
798 | 0 | if(leaf->context){ |
799 | 0 | printf(" (%s, %d)", leaf->context->id, MQTT_SUB_OPT_GET_QOS(leaf->subscription_options)); |
800 | 0 | }else{ |
801 | 0 | printf(" (%s, %d)", "", MQTT_SUB_OPT_GET_QOS(leaf->subscription_options)); |
802 | 0 | } |
803 | 0 | leaf = leaf->next; |
804 | 0 | } |
805 | 0 | printf("\n"); |
806 | 0 | } |
807 | |
|
808 | 0 | sub__tree_print(branch->children, level+1); |
809 | 0 | } |
810 | 0 | } |
811 | | |
812 | | int sub__init(void) |
813 | 0 | { |
814 | 0 | HASH_VALUE("+", 1, hashv_plus); |
815 | 0 | HASH_VALUE("#", 1, hashv_hash); |
816 | |
|
817 | 0 | if(sub__add_hier_entry(NULL, &db.shared_subs, "", 0) == NULL |
818 | 0 | || sub__add_hier_entry(NULL, &db.normal_subs, "", 0) == NULL |
819 | 0 | || sub__add_hier_entry(NULL, &db.normal_subs, "$SYS", (uint16_t)strlen("$SYS")) == NULL |
820 | 0 | ){ |
821 | |
|
822 | 0 | return MOSQ_ERR_NOMEM; |
823 | 0 | }else{ |
824 | 0 | return MOSQ_ERR_SUCCESS; |
825 | 0 | } |
826 | 0 | } |