/src/mosquitto/src/keepalive.c
Line | Count | Source |
1 | | /* |
2 | | Copyright (c) 2009-2021 Roger Light <roger@atchoo.org> |
3 | | |
4 | | All rights reserved. This program and the accompanying materials |
5 | | are made available under the terms of the Eclipse Public License 2.0 |
6 | | and Eclipse Distribution License v1.0 which accompany this distribution. |
7 | | |
8 | | The Eclipse Public License is available at |
9 | | https://www.eclipse.org/legal/epl-2.0/ |
10 | | and the Eclipse Distribution License is available at |
11 | | http://www.eclipse.org/org/documents/edl-v10.php. |
12 | | |
13 | | SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
14 | | |
15 | | Contributors: |
16 | | Roger Light - initial implementation and documentation. |
17 | | */ |
18 | | |
19 | | #include "config.h" |
20 | | #include <time.h> |
21 | | #include "mosquitto_broker_internal.h" |
22 | | #include <utlist.h> |
23 | | |
24 | | |
25 | | /* This contains code for checking whether clients have exceeded their keepalive timeouts. |
26 | | * There are two versions. |
27 | | * |
28 | | * The old version can be used by compiling with `make WITH_OLD_KEEPALIVE=yes`. |
29 | | * It will scan the entire list of connected clients every 5 seconds to see if |
30 | | * they have expired. Hence it scales with O(n) and with e.g. 60000 clients can |
31 | | * have a measurable effect on CPU usage in the low single digit percent range. |
32 | | * |
33 | | * The new version scales with O(1). It uses a ring buffer that contains |
34 | | * max_keepalive*1.5+1 entries. The current time in integer seconds, modulus |
35 | | * the number of entries, points to the head of the ring buffer. Any clients |
36 | | * will appear after this point at the position indexed by the time at which |
37 | | * they will expire if they do not send another message, assuming they do not |
38 | | * have keepalive==0 - in which case they are not part of this check. So a |
39 | | * client that connects with keepalive=60 will be added at `now + 60*1.5`. |
40 | | * |
41 | | * A client is added to an entry with a doubly linked list. When the client |
42 | | * sends a new message, it is removed from the old position and added to the |
43 | | * new. |
44 | | * |
45 | | * As time moves on, if the linked list at the current entry is not empty, all |
46 | | * of the clients are expired. |
47 | | * |
48 | | * The ring buffer size is determined by max_keepalive. At the default, it is |
49 | | * 65535*1.5+1=98303 entries long. On a 64-bit machine that is 786424 bytes. |
50 | | * If this is too big a burden and you do not need many clients connected, then |
51 | | * the old check is sufficient. You can reduce the number of entries by setting |
52 | | * a lower max_keepalive value. A value as low as 600 still gives a 10 minute |
53 | | * keepalive and reduces the memory for the ring buffer to 7208 bytes. |
54 | | * |
55 | | * *NOTE* It is likely that the old check routine will be removed in the |
56 | | * future, and max_keepalive set to a sensible default value. If this is a |
57 | | * problem for you please get in touch. |
58 | | */ |
59 | | |
60 | | static time_t last_keepalive_check = 0; |
61 | | #ifndef WITH_OLD_KEEPALIVE |
62 | | static int keepalive_list_max = 0; |
63 | | static struct mosquitto **keepalive_list = NULL; |
64 | | #endif |
65 | | |
66 | | #ifndef WITH_OLD_KEEPALIVE |
67 | | |
68 | | |
69 | | static int calc_index(struct mosquitto *context) |
70 | 0 | { |
71 | 0 | return (int)(context->last_msg_in + context->keepalive*3/2) % keepalive_list_max; |
72 | 0 | } |
73 | | #endif |
74 | | |
75 | | |
76 | | int keepalive__init(void) |
77 | 0 | { |
78 | 0 | #ifndef WITH_OLD_KEEPALIVE |
79 | 0 | struct mosquitto *context, *ctxt_tmp; |
80 | |
|
81 | 0 | if(db.config->max_keepalive <= 0){ |
82 | 0 | keepalive_list_max = (UINT16_MAX * 3)/2 + 1; |
83 | 0 | }else{ |
84 | 0 | keepalive_list_max = (db.config->max_keepalive * 3)/2 + 1; |
85 | 0 | } |
86 | 0 | keepalive_list = mosquitto_calloc((size_t)keepalive_list_max, sizeof(struct mosquitto *)); |
87 | 0 | if(keepalive_list == NULL){ |
88 | 0 | log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); |
89 | 0 | keepalive_list_max = 0; |
90 | 0 | return MOSQ_ERR_NOMEM; |
91 | 0 | } |
92 | | |
93 | | /* Add existing clients - should only be applicable on MOSQ_EVT_RELOAD */ |
94 | 0 | HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){ |
95 | 0 | if(net__is_connected(context) && !context->bridge && context->keepalive > 0){ |
96 | 0 | keepalive__add(context); |
97 | 0 | } |
98 | 0 | } |
99 | 0 | #endif |
100 | 0 | last_keepalive_check = db.now_s; |
101 | 0 | return MOSQ_ERR_SUCCESS; |
102 | 0 | } |
103 | | |
104 | | |
105 | | void keepalive__cleanup(void) |
106 | 0 | { |
107 | 0 | #ifndef WITH_OLD_KEEPALIVE |
108 | 0 | for(int idx=0; idx<keepalive_list_max; idx++){ |
109 | 0 | struct mosquitto *context, *ctxt_tmp; |
110 | 0 | DL_FOREACH_SAFE2(keepalive_list[idx], context, ctxt_tmp, keepalive_next){ |
111 | 0 | DL_DELETE2(keepalive_list[idx], context, keepalive_prev, keepalive_next); |
112 | 0 | } |
113 | 0 | } |
114 | 0 | mosquitto_free(keepalive_list); |
115 | 0 | keepalive_list = NULL; |
116 | 0 | keepalive_list_max = 0; |
117 | 0 | #endif |
118 | 0 | } |
119 | | |
120 | | |
121 | | int keepalive__add(struct mosquitto *context) |
122 | 0 | { |
123 | 0 | #ifndef WITH_OLD_KEEPALIVE |
124 | 0 | if(context->keepalive <= 0 || !net__is_connected(context)){ |
125 | 0 | return MOSQ_ERR_SUCCESS; |
126 | 0 | } |
127 | 0 | #ifdef WITH_BRIDGE |
128 | 0 | if(context->bridge){ |
129 | 0 | return MOSQ_ERR_SUCCESS; |
130 | 0 | } |
131 | 0 | #endif |
132 | | |
133 | 0 | DL_APPEND2(keepalive_list[calc_index(context)], context, keepalive_prev, keepalive_next); |
134 | 0 | context->keepalive_add_time = db.now_s; |
135 | | #else |
136 | | UNUSED(context); |
137 | | #endif |
138 | 0 | return MOSQ_ERR_SUCCESS; |
139 | 0 | } |
140 | | |
141 | | |
142 | | #ifndef WITH_OLD_KEEPALIVE |
143 | | |
144 | | |
145 | | void keepalive__check(void) |
146 | 0 | { |
147 | 0 | struct mosquitto *context, *ctxt_tmp; |
148 | 0 | time_t timeout; |
149 | |
|
150 | 0 | if(db.contexts_by_sock){ |
151 | | /* Check the next 5 seconds for upcoming expiries */ |
152 | | /* FIXME - find the actual next entry without having to iterate over |
153 | | * the whole list */ |
154 | 0 | timeout = 5; |
155 | 0 | for(time_t i=5; i>0; i--){ |
156 | 0 | if(keepalive_list[(db.now_s + i) % keepalive_list_max]){ |
157 | 0 | timeout = i; |
158 | 0 | } |
159 | 0 | } |
160 | 0 | loop__update_next_event(timeout*1000); |
161 | 0 | } |
162 | 0 | for(time_t i=last_keepalive_check; i<db.now_s; i++){ |
163 | 0 | int idx = (int)(i % keepalive_list_max); |
164 | 0 | if(keepalive_list[idx]){ |
165 | 0 | DL_FOREACH_SAFE2(keepalive_list[idx], context, ctxt_tmp, keepalive_next){ |
166 | | /* keepalive_add_time lets us account for the client adding itself to the keepalive |
167 | | * list when its last_msg_in value is greater than the last_keepalive_check. |
168 | | * Without this, the client would be expired if it has keepalive == max_keepalive. |
169 | | */ |
170 | 0 | if(context->keepalive_add_time <= last_keepalive_check && net__is_connected(context)){ |
171 | | /* Client has exceeded keepalive*1.5 */ |
172 | 0 | do_disconnect(context, MOSQ_ERR_KEEPALIVE); |
173 | 0 | } |
174 | 0 | } |
175 | 0 | } |
176 | 0 | } |
177 | |
|
178 | 0 | last_keepalive_check = db.now_s; |
179 | 0 | } |
180 | | #else |
181 | | |
182 | | |
183 | | void keepalive__check(void) |
184 | | { |
185 | | struct mosquitto *context, *ctxt_tmp; |
186 | | time_t timeout; |
187 | | |
188 | | if(db.contexts_by_sock){ |
189 | | timeout = (last_keepalive_check + 5 - db.now_s); |
190 | | if(timeout <= 0){ |
191 | | timeout = 5; |
192 | | } |
193 | | loop__update_next_event(timeout*1000); |
194 | | } |
195 | | if(last_keepalive_check + 5 <= db.now_s){ |
196 | | last_keepalive_check = db.now_s; |
197 | | |
198 | | HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){ |
199 | | if(net__is_connected(context)){ |
200 | | /* Local bridges never time out in this fashion. */ |
201 | | if(!(context->keepalive) |
202 | | || context->bridge |
203 | | || db.now_s - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ |
204 | | |
205 | | }else{ |
206 | | /* Client has exceeded keepalive*1.5 */ |
207 | | do_disconnect(context, MOSQ_ERR_KEEPALIVE); |
208 | | } |
209 | | } |
210 | | } |
211 | | } |
212 | | } |
213 | | #endif |
214 | | |
215 | | |
216 | | int keepalive__remove(struct mosquitto *context) |
217 | 1.29k | { |
218 | 1.29k | #ifndef WITH_OLD_KEEPALIVE |
219 | 1.29k | int idx; |
220 | | |
221 | 1.29k | if(context->keepalive <= 0 || context->keepalive_prev == NULL){ |
222 | 1.29k | return MOSQ_ERR_SUCCESS; |
223 | 1.29k | } |
224 | | |
225 | 0 | idx = calc_index(context); |
226 | 0 | if(keepalive_list[idx]){ |
227 | 0 | DL_DELETE2(keepalive_list[idx], context, keepalive_prev, keepalive_next); |
228 | 0 | context->keepalive_next = NULL; |
229 | 0 | context->keepalive_prev = NULL; |
230 | 0 | } |
231 | | #else |
232 | | UNUSED(context); |
233 | | #endif |
234 | 0 | return MOSQ_ERR_SUCCESS; |
235 | 0 | } |
236 | | |
237 | | |
238 | | int keepalive__update(struct mosquitto *context) |
239 | 0 | { |
240 | 0 | #ifndef WITH_OLD_KEEPALIVE |
241 | 0 | keepalive__remove(context); |
242 | | /* coverity[missing_lock] - broker is single threaded, so no lock required */ |
243 | 0 | context->last_msg_in = db.now_s; |
244 | 0 | keepalive__add(context); |
245 | | #else |
246 | | UNUSED(context); |
247 | | #endif |
248 | 0 | return MOSQ_ERR_SUCCESS; |
249 | 0 | } |