Coverage Report

Created: 2025-07-23 07:05

/src/open5gs/lib/core/ogs-queue.c
Line
Count
Source (jump to first uncovered line)
1
/* Licensed to the Apache Software Foundation (ASF) under one or more
2
 * contributor license agreements.  See the NOTICE file distributed with
3
 * this work for additional information regarding copyright ownership.
4
 * The ASF licenses this file to You under the Apache License, Version 2.0
5
 * (the "License"); you may not use this file except in compliance with
6
 * the License.  You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
/*
18
 * Copyright (C) 2019-2020 by Sukchan Lee <acetcom@gmail.com>
19
 *
20
 * This file is part of Open5GS.
21
 *
22
 * Licensed under the Apache License, Version 2.0 (the "License");
23
 * you may not use this file except in compliance with the License.
24
 * You may obtain a copy of the License at
25
 *
26
 *   http://www.apache.org/licenses/LICENSE-2.0
27
 *
28
 * Unless required by applicable law or agreed to in writing, software
29
 * distributed under the License is distributed on an "AS IS" BASIS,
30
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
31
 * See the License for the specific language governing permissions and
32
 * limitations under the License.
33
 */
34
35
#include "ogs-core.h"
36
37
#undef OGS_LOG_DOMAIN
38
0
#define OGS_LOG_DOMAIN __ogs_event_domain
39
40
typedef struct ogs_queue_s {
41
    void              **data;
42
    unsigned int        nelts; /**< # elements */
43
    unsigned int        in;    /**< next empty location */
44
    unsigned int        out;   /**< next filled location */
45
    unsigned int        bounds;/**< max size of queue */
46
    unsigned int        full_waiters;
47
    unsigned int        empty_waiters;
48
    ogs_thread_mutex_t  one_big_mutex;
49
    ogs_thread_cond_t   not_empty;
50
    ogs_thread_cond_t   not_full;
51
    int                 terminated;
52
} ogs_queue_t;
53
54
/**
55
 * Detects when the ogs_queue_t is full. This utility function is expected
56
 * to be called from within critical sections, and is not threadsafe.
57
 */
58
0
#define ogs_queue_full(queue) ((queue)->nelts == (queue)->bounds)
59
60
/**
61
 * Detects when the ogs_queue_t is empty. This utility function is expected
62
 * to be called from within critical sections, and is not threadsafe.
63
 */
64
0
#define ogs_queue_empty(queue) ((queue)->nelts == 0)
65
66
/**
67
 * Callback routine that is called to destroy this
68
 * ogs_queue_t when its pool is destroyed.
69
 */
70
ogs_queue_t *ogs_queue_create(unsigned int capacity)
71
0
{
72
0
    ogs_queue_t *queue = ogs_calloc(1, sizeof *queue);
73
0
    if (!queue) {
74
0
        ogs_error("ogs_calloc() failed");
75
0
        return NULL;
76
0
    }
77
0
    ogs_assert(queue);
78
79
0
    ogs_thread_mutex_init(&queue->one_big_mutex);
80
0
    ogs_thread_cond_init(&queue->not_empty);
81
0
    ogs_thread_cond_init(&queue->not_full);
82
83
0
    queue->data = ogs_calloc(1, capacity * sizeof(void*));
84
0
    if (!queue->data) {
85
0
        ogs_error("ogs_calloc[capacity:%d, sizeof(void*):%d] failed",
86
0
                (int)capacity, (int)sizeof(void*));
87
0
        return NULL;
88
0
    }
89
0
    queue->bounds = capacity;
90
0
    queue->nelts = 0;
91
0
    queue->in = 0;
92
0
    queue->out = 0;
93
0
    queue->terminated = 0;
94
0
    queue->full_waiters = 0;
95
0
    queue->empty_waiters = 0;
96
97
0
    return queue;
98
0
}
99
100
void ogs_queue_destroy(ogs_queue_t *queue)
101
0
{
102
0
    ogs_assert(queue);
103
104
0
    ogs_free(queue->data);
105
106
0
    ogs_thread_cond_destroy(&queue->not_empty);
107
0
    ogs_thread_cond_destroy(&queue->not_full);
108
0
    ogs_thread_mutex_destroy(&queue->one_big_mutex);
109
110
0
    ogs_free(queue);
111
0
}
112
113
static int queue_push(ogs_queue_t *queue, void *data, ogs_time_t timeout)
114
0
{
115
0
    int rv;
116
117
0
    if (queue->terminated) {
118
0
        return OGS_DONE; /* no more elements ever again */
119
0
    }
120
121
0
    ogs_thread_mutex_lock(&queue->one_big_mutex);
122
123
0
    if (ogs_queue_full(queue)) {
124
0
        if (!timeout) {
125
0
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
126
0
            return OGS_RETRY;
127
0
        }
128
0
        if (!queue->terminated) {
129
0
            queue->full_waiters++;
130
0
            if (timeout > 0) {
131
0
                rv = ogs_thread_cond_timedwait(&queue->not_full,
132
0
                                               &queue->one_big_mutex,
133
0
                                               timeout);
134
0
            }
135
0
            else {
136
0
                rv = ogs_thread_cond_wait(&queue->not_full,
137
0
                                          &queue->one_big_mutex);
138
0
            }
139
0
            queue->full_waiters--;
140
0
            if (rv != OGS_OK) {
141
0
                ogs_thread_mutex_unlock(&queue->one_big_mutex);
142
0
                return rv;
143
0
            }
144
0
        }
145
        /* If we wake up and it's still empty, then we were interrupted */
146
0
        if (ogs_queue_full(queue)) {
147
0
            ogs_warn("queue full (intr)");
148
0
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
149
0
            if (queue->terminated) {
150
0
                return OGS_DONE; /* no more elements ever again */
151
0
            }
152
0
            else {
153
0
                return OGS_ERROR;
154
0
            }
155
0
        }
156
0
    }
157
158
0
    queue->data[queue->in] = data;
159
0
    queue->in++;
160
0
    if (queue->in >= queue->bounds)
161
0
        queue->in -= queue->bounds;
162
0
    queue->nelts++;
163
164
0
    if (queue->empty_waiters) {
165
0
        ogs_trace("signal !empty");
166
0
        ogs_thread_cond_signal(&queue->not_empty);
167
0
    }
168
169
0
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
170
0
    return OGS_OK;
171
0
}
172
173
int ogs_queue_push(ogs_queue_t *queue, void *data)
174
0
{
175
0
    return queue_push(queue, data, OGS_INFINITE_TIME);
176
0
}
177
178
/**
179
 * Push new data onto the queue. If the queue is full, return OGS_RETRY. If
180
 * the push operation completes successfully, it signals other threads
181
 * waiting in ogs_queue_pop() that they may continue consuming sockets.
182
 */
183
int ogs_queue_trypush(ogs_queue_t *queue, void *data)
184
0
{
185
0
    return queue_push(queue, data, 0);
186
0
}
187
188
int ogs_queue_timedpush(ogs_queue_t *queue, void *data, ogs_time_t timeout)
189
0
{
190
0
    return queue_push(queue, data, timeout);
191
0
}
192
193
/**
194
 * not thread safe
195
 */
196
0
unsigned int ogs_queue_size(ogs_queue_t *queue) {
197
0
    return queue->nelts;
198
0
}
199
200
/**
201
 * Retrieves the next item from the queue. If there are no
202
 * items available, it will either return OGS_RETRY (timeout = 0),
203
 * or block until one becomes available (infinitely with timeout < 0,
204
 * otherwise until the given timeout expires). Once retrieved, the
205
 * item is placed into the address specified by 'data'.
206
 */
207
static int queue_pop(ogs_queue_t *queue, void **data, ogs_time_t timeout)
208
0
{
209
0
    int rv;
210
211
0
    if (queue->terminated) {
212
0
        return OGS_DONE; /* no more elements ever again */
213
0
    }
214
215
0
    ogs_thread_mutex_lock(&queue->one_big_mutex);
216
217
    /* Keep waiting until we wake up and find that the queue is not empty. */
218
0
    if (ogs_queue_empty(queue)) {
219
0
        if (!timeout) {
220
0
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
221
0
            return OGS_RETRY;
222
0
        }
223
0
        if (!queue->terminated) {
224
0
            queue->empty_waiters++;
225
0
            if (timeout > 0) {
226
0
                rv = ogs_thread_cond_timedwait(&queue->not_empty,
227
0
                                               &queue->one_big_mutex,
228
0
                                               timeout);
229
0
            }
230
0
            else {
231
0
                rv = ogs_thread_cond_wait(&queue->not_empty,
232
0
                                          &queue->one_big_mutex);
233
0
            }
234
0
            queue->empty_waiters--;
235
0
            if (rv != OGS_OK) {
236
0
                ogs_thread_mutex_unlock(&queue->one_big_mutex);
237
0
                return rv;
238
0
            }
239
0
        }
240
        /* If we wake up and it's still empty, then we were interrupted */
241
0
        if (ogs_queue_empty(queue)) {
242
0
            ogs_warn("queue empty (intr)");
243
0
            ogs_thread_mutex_unlock(&queue->one_big_mutex);
244
0
            if (queue->terminated) {
245
0
                return OGS_DONE; /* no more elements ever again */
246
0
            } else {
247
0
                return OGS_ERROR;
248
0
            }
249
0
        }
250
0
    } 
251
252
0
    *data = queue->data[queue->out];
253
0
    queue->nelts--;
254
255
0
    queue->out++;
256
0
    if (queue->out >= queue->bounds)
257
0
        queue->out -= queue->bounds;
258
0
    if (queue->full_waiters) {
259
0
        ogs_trace("signal !full");
260
0
        ogs_thread_cond_signal(&queue->not_full);
261
0
    }
262
263
0
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
264
0
    return OGS_OK;
265
0
}
266
267
int ogs_queue_pop(ogs_queue_t *queue, void **data)
268
0
{
269
0
    return queue_pop(queue, data, OGS_INFINITE_TIME);
270
0
}
271
272
int ogs_queue_trypop(ogs_queue_t *queue, void **data)
273
0
{
274
0
    return queue_pop(queue, data, 0);
275
0
}
276
277
int ogs_queue_timedpop(ogs_queue_t *queue, void **data, ogs_time_t timeout)
278
0
{
279
0
    return queue_pop(queue, data, timeout);
280
0
}
281
282
int ogs_queue_interrupt_all(ogs_queue_t *queue)
283
0
{
284
0
    ogs_debug("interrupt all");
285
0
    ogs_thread_mutex_lock(&queue->one_big_mutex);
286
287
0
    ogs_thread_cond_broadcast(&queue->not_empty);
288
0
    ogs_thread_cond_broadcast(&queue->not_full);
289
290
0
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
291
292
0
    return OGS_OK;
293
0
}
294
295
int ogs_queue_term(ogs_queue_t *queue)
296
0
{
297
0
    ogs_thread_mutex_lock(&queue->one_big_mutex);
298
299
    /* we must hold one_big_mutex when setting this... otherwise,
300
     * we could end up setting it and waking everybody up just after a 
301
     * would-be popper checks it but right before they block
302
     */
303
0
    queue->terminated = 1;
304
0
    ogs_thread_mutex_unlock(&queue->one_big_mutex);
305
306
0
    return ogs_queue_interrupt_all(queue);
307
0
}
308