/src/open5gs/lib/core/ogs-queue.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* Licensed to the Apache Software Foundation (ASF) under one or more |
2 | | * contributor license agreements. See the NOTICE file distributed with |
3 | | * this work for additional information regarding copyright ownership. |
4 | | * The ASF licenses this file to You under the Apache License, Version 2.0 |
5 | | * (the "License"); you may not use this file except in compliance with |
6 | | * the License. You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | /* |
18 | | * Copyright (C) 2019-2020 by Sukchan Lee <acetcom@gmail.com> |
19 | | * |
20 | | * This file is part of Open5GS. |
21 | | * |
22 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
23 | | * you may not use this file except in compliance with the License. |
24 | | * You may obtain a copy of the License at |
25 | | * |
26 | | * http://www.apache.org/licenses/LICENSE-2.0 |
27 | | * |
28 | | * Unless required by applicable law or agreed to in writing, software |
29 | | * distributed under the License is distributed on an "AS IS" BASIS, |
30 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
31 | | * See the License for the specific language governing permissions and |
32 | | * limitations under the License. |
33 | | */ |
34 | | |
35 | | #include "ogs-core.h" |
36 | | |
37 | | #undef OGS_LOG_DOMAIN |
38 | 0 | #define OGS_LOG_DOMAIN __ogs_event_domain |
39 | | |
40 | | typedef struct ogs_queue_s { |
41 | | void **data; |
42 | | unsigned int nelts; /**< # elements */ |
43 | | unsigned int in; /**< next empty location */ |
44 | | unsigned int out; /**< next filled location */ |
45 | | unsigned int bounds;/**< max size of queue */ |
46 | | unsigned int full_waiters; |
47 | | unsigned int empty_waiters; |
48 | | ogs_thread_mutex_t one_big_mutex; |
49 | | ogs_thread_cond_t not_empty; |
50 | | ogs_thread_cond_t not_full; |
51 | | int terminated; |
52 | | } ogs_queue_t; |
53 | | |
54 | | /** |
55 | | * Detects when the ogs_queue_t is full. This utility function is expected |
56 | | * to be called from within critical sections, and is not threadsafe. |
57 | | */ |
58 | 0 | #define ogs_queue_full(queue) ((queue)->nelts == (queue)->bounds) |
59 | | |
60 | | /** |
61 | | * Detects when the ogs_queue_t is empty. This utility function is expected |
62 | | * to be called from within critical sections, and is not threadsafe. |
63 | | */ |
64 | 0 | #define ogs_queue_empty(queue) ((queue)->nelts == 0) |
65 | | |
66 | | /** |
67 | | * Callback routine that is called to destroy this |
68 | | * ogs_queue_t when its pool is destroyed. |
69 | | */ |
70 | | ogs_queue_t *ogs_queue_create(unsigned int capacity) |
71 | 0 | { |
72 | 0 | ogs_queue_t *queue = ogs_calloc(1, sizeof *queue); |
73 | 0 | if (!queue) { |
74 | 0 | ogs_error("ogs_calloc() failed"); |
75 | 0 | return NULL; |
76 | 0 | } |
77 | 0 | ogs_assert(queue); |
78 | | |
79 | 0 | ogs_thread_mutex_init(&queue->one_big_mutex); |
80 | 0 | ogs_thread_cond_init(&queue->not_empty); |
81 | 0 | ogs_thread_cond_init(&queue->not_full); |
82 | |
|
83 | 0 | queue->data = ogs_calloc(1, capacity * sizeof(void*)); |
84 | 0 | if (!queue->data) { |
85 | 0 | ogs_error("ogs_calloc[capacity:%d, sizeof(void*):%d] failed", |
86 | 0 | (int)capacity, (int)sizeof(void*)); |
87 | 0 | return NULL; |
88 | 0 | } |
89 | 0 | queue->bounds = capacity; |
90 | 0 | queue->nelts = 0; |
91 | 0 | queue->in = 0; |
92 | 0 | queue->out = 0; |
93 | 0 | queue->terminated = 0; |
94 | 0 | queue->full_waiters = 0; |
95 | 0 | queue->empty_waiters = 0; |
96 | |
|
97 | 0 | return queue; |
98 | 0 | } |
99 | | |
100 | | void ogs_queue_destroy(ogs_queue_t *queue) |
101 | 0 | { |
102 | 0 | ogs_assert(queue); |
103 | | |
104 | 0 | ogs_free(queue->data); |
105 | |
|
106 | 0 | ogs_thread_cond_destroy(&queue->not_empty); |
107 | 0 | ogs_thread_cond_destroy(&queue->not_full); |
108 | 0 | ogs_thread_mutex_destroy(&queue->one_big_mutex); |
109 | |
|
110 | 0 | ogs_free(queue); |
111 | 0 | } |
112 | | |
113 | | static int queue_push(ogs_queue_t *queue, void *data, ogs_time_t timeout) |
114 | 0 | { |
115 | 0 | int rv; |
116 | |
|
117 | 0 | if (queue->terminated) { |
118 | 0 | return OGS_DONE; /* no more elements ever again */ |
119 | 0 | } |
120 | | |
121 | 0 | ogs_thread_mutex_lock(&queue->one_big_mutex); |
122 | |
|
123 | 0 | if (ogs_queue_full(queue)) { |
124 | 0 | if (!timeout) { |
125 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
126 | 0 | return OGS_RETRY; |
127 | 0 | } |
128 | 0 | if (!queue->terminated) { |
129 | 0 | queue->full_waiters++; |
130 | 0 | if (timeout > 0) { |
131 | 0 | rv = ogs_thread_cond_timedwait(&queue->not_full, |
132 | 0 | &queue->one_big_mutex, |
133 | 0 | timeout); |
134 | 0 | } |
135 | 0 | else { |
136 | 0 | rv = ogs_thread_cond_wait(&queue->not_full, |
137 | 0 | &queue->one_big_mutex); |
138 | 0 | } |
139 | 0 | queue->full_waiters--; |
140 | 0 | if (rv != OGS_OK) { |
141 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
142 | 0 | return rv; |
143 | 0 | } |
144 | 0 | } |
145 | | /* If we wake up and it's still empty, then we were interrupted */ |
146 | 0 | if (ogs_queue_full(queue)) { |
147 | 0 | ogs_warn("queue full (intr)"); |
148 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
149 | 0 | if (queue->terminated) { |
150 | 0 | return OGS_DONE; /* no more elements ever again */ |
151 | 0 | } |
152 | 0 | else { |
153 | 0 | return OGS_ERROR; |
154 | 0 | } |
155 | 0 | } |
156 | 0 | } |
157 | | |
158 | 0 | queue->data[queue->in] = data; |
159 | 0 | queue->in++; |
160 | 0 | if (queue->in >= queue->bounds) |
161 | 0 | queue->in -= queue->bounds; |
162 | 0 | queue->nelts++; |
163 | |
|
164 | 0 | if (queue->empty_waiters) { |
165 | 0 | ogs_trace("signal !empty"); |
166 | 0 | ogs_thread_cond_signal(&queue->not_empty); |
167 | 0 | } |
168 | |
|
169 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
170 | 0 | return OGS_OK; |
171 | 0 | } |
172 | | |
173 | | int ogs_queue_push(ogs_queue_t *queue, void *data) |
174 | 0 | { |
175 | 0 | return queue_push(queue, data, OGS_INFINITE_TIME); |
176 | 0 | } |
177 | | |
178 | | /** |
179 | | * Push new data onto the queue. If the queue is full, return OGS_RETRY. If |
180 | | * the push operation completes successfully, it signals other threads |
181 | | * waiting in ogs_queue_pop() that they may continue consuming sockets. |
182 | | */ |
183 | | int ogs_queue_trypush(ogs_queue_t *queue, void *data) |
184 | 0 | { |
185 | 0 | return queue_push(queue, data, 0); |
186 | 0 | } |
187 | | |
188 | | int ogs_queue_timedpush(ogs_queue_t *queue, void *data, ogs_time_t timeout) |
189 | 0 | { |
190 | 0 | return queue_push(queue, data, timeout); |
191 | 0 | } |
192 | | |
193 | | /** |
194 | | * not thread safe |
195 | | */ |
196 | 0 | unsigned int ogs_queue_size(ogs_queue_t *queue) { |
197 | 0 | return queue->nelts; |
198 | 0 | } |
199 | | |
200 | | /** |
201 | | * Retrieves the next item from the queue. If there are no |
202 | | * items available, it will either return OGS_RETRY (timeout = 0), |
203 | | * or block until one becomes available (infinitely with timeout < 0, |
204 | | * otherwise until the given timeout expires). Once retrieved, the |
205 | | * item is placed into the address specified by 'data'. |
206 | | */ |
207 | | static int queue_pop(ogs_queue_t *queue, void **data, ogs_time_t timeout) |
208 | 0 | { |
209 | 0 | int rv; |
210 | |
|
211 | 0 | if (queue->terminated) { |
212 | 0 | return OGS_DONE; /* no more elements ever again */ |
213 | 0 | } |
214 | | |
215 | 0 | ogs_thread_mutex_lock(&queue->one_big_mutex); |
216 | | |
217 | | /* Keep waiting until we wake up and find that the queue is not empty. */ |
218 | 0 | if (ogs_queue_empty(queue)) { |
219 | 0 | if (!timeout) { |
220 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
221 | 0 | return OGS_RETRY; |
222 | 0 | } |
223 | 0 | if (!queue->terminated) { |
224 | 0 | queue->empty_waiters++; |
225 | 0 | if (timeout > 0) { |
226 | 0 | rv = ogs_thread_cond_timedwait(&queue->not_empty, |
227 | 0 | &queue->one_big_mutex, |
228 | 0 | timeout); |
229 | 0 | } |
230 | 0 | else { |
231 | 0 | rv = ogs_thread_cond_wait(&queue->not_empty, |
232 | 0 | &queue->one_big_mutex); |
233 | 0 | } |
234 | 0 | queue->empty_waiters--; |
235 | 0 | if (rv != OGS_OK) { |
236 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
237 | 0 | return rv; |
238 | 0 | } |
239 | 0 | } |
240 | | /* If we wake up and it's still empty, then we were interrupted */ |
241 | 0 | if (ogs_queue_empty(queue)) { |
242 | 0 | ogs_warn("queue empty (intr)"); |
243 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
244 | 0 | if (queue->terminated) { |
245 | 0 | return OGS_DONE; /* no more elements ever again */ |
246 | 0 | } else { |
247 | 0 | return OGS_ERROR; |
248 | 0 | } |
249 | 0 | } |
250 | 0 | } |
251 | | |
252 | 0 | *data = queue->data[queue->out]; |
253 | 0 | queue->nelts--; |
254 | |
|
255 | 0 | queue->out++; |
256 | 0 | if (queue->out >= queue->bounds) |
257 | 0 | queue->out -= queue->bounds; |
258 | 0 | if (queue->full_waiters) { |
259 | 0 | ogs_trace("signal !full"); |
260 | 0 | ogs_thread_cond_signal(&queue->not_full); |
261 | 0 | } |
262 | |
|
263 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
264 | 0 | return OGS_OK; |
265 | 0 | } |
266 | | |
267 | | int ogs_queue_pop(ogs_queue_t *queue, void **data) |
268 | 0 | { |
269 | 0 | return queue_pop(queue, data, OGS_INFINITE_TIME); |
270 | 0 | } |
271 | | |
272 | | int ogs_queue_trypop(ogs_queue_t *queue, void **data) |
273 | 0 | { |
274 | 0 | return queue_pop(queue, data, 0); |
275 | 0 | } |
276 | | |
277 | | int ogs_queue_timedpop(ogs_queue_t *queue, void **data, ogs_time_t timeout) |
278 | 0 | { |
279 | 0 | return queue_pop(queue, data, timeout); |
280 | 0 | } |
281 | | |
282 | | int ogs_queue_interrupt_all(ogs_queue_t *queue) |
283 | 0 | { |
284 | 0 | ogs_debug("interrupt all"); |
285 | 0 | ogs_thread_mutex_lock(&queue->one_big_mutex); |
286 | |
|
287 | 0 | ogs_thread_cond_broadcast(&queue->not_empty); |
288 | 0 | ogs_thread_cond_broadcast(&queue->not_full); |
289 | |
|
290 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
291 | |
|
292 | 0 | return OGS_OK; |
293 | 0 | } |
294 | | |
295 | | int ogs_queue_term(ogs_queue_t *queue) |
296 | 0 | { |
297 | 0 | ogs_thread_mutex_lock(&queue->one_big_mutex); |
298 | | |
299 | | /* we must hold one_big_mutex when setting this... otherwise, |
300 | | * we could end up setting it and waking everybody up just after a |
301 | | * would-be popper checks it but right before they block |
302 | | */ |
303 | 0 | queue->terminated = 1; |
304 | 0 | ogs_thread_mutex_unlock(&queue->one_big_mutex); |
305 | |
|
306 | 0 | return ogs_queue_interrupt_all(queue); |
307 | 0 | } |
308 | | |