Coverage Report

Created: 2025-07-18 06:54

/src/S2OPC/src/Common/helpers/sopc_async_queue.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Licensed to Systerel under one or more contributor license
3
 * agreements. See the NOTICE file distributed with this work
4
 * for additional information regarding copyright ownership.
5
 * Systerel licenses this file to you under the Apache
6
 * License, Version 2.0 (the "License"); you may not use this
7
 * file except in compliance with the License. You may obtain
8
 * a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
20
#include <inttypes.h>
21
#include <stdbool.h>
22
23
#include "sopc_async_queue.h"
24
#include "sopc_common_constants.h"
25
#include "sopc_logger.h"
26
#include "sopc_mem_alloc.h"
27
#include "sopc_mutexes.h"
28
#include "sopc_singly_linked_list.h"
29
30
struct SOPC_AsyncQueue
31
{
32
    const char* debugQueueName;
33
    SOPC_SLinkedList* queueList;
34
    uint32_t maxListLengthForWarning;
35
    SOPC_Condition queueCond;
36
    SOPC_Mutex queueMutex;
37
    uint32_t waitingThreads;
38
};
39
40
SOPC_ReturnStatus SOPC_AsyncQueue_Init(SOPC_AsyncQueue** queue, const char* queueName)
41
0
{
42
0
    SOPC_ReturnStatus status = SOPC_STATUS_INVALID_PARAMETERS;
43
0
    if (NULL != queue)
44
0
    {
45
0
        *queue = SOPC_Malloc(sizeof(SOPC_AsyncQueue));
46
0
        if (*queue != NULL)
47
0
        {
48
0
            status = SOPC_STATUS_OK;
49
0
            (*queue)->debugQueueName = queueName;
50
0
            (*queue)->waitingThreads = 0;
51
0
            if (SOPC_MAX_NB_ELEMENTS_ASYNC_QUEUE_WARNING_ONLY)
52
0
            {
53
0
                (*queue)->queueList = SOPC_SLinkedList_Create(0);
54
0
                (*queue)->maxListLengthForWarning = SOPC_MAX_NB_ELEMENTS_ASYNC_QUEUE;
55
0
            }
56
0
            else
57
0
            {
58
0
                (*queue)->queueList = SOPC_SLinkedList_Create(SOPC_MAX_NB_ELEMENTS_ASYNC_QUEUE);
59
0
            }
60
0
            if (NULL == (*queue)->queueList)
61
0
            {
62
0
                status = SOPC_STATUS_NOK;
63
0
            }
64
0
            if (SOPC_STATUS_OK == status)
65
0
            {
66
0
                status = SOPC_Condition_Init(&(*queue)->queueCond);
67
0
                if (SOPC_STATUS_OK != status)
68
0
                {
69
0
                    SOPC_SLinkedList_Delete((*queue)->queueList);
70
0
                    (*queue)->queueList = NULL;
71
0
                }
72
0
            }
73
0
            if (SOPC_STATUS_OK == status)
74
0
            {
75
0
                status = SOPC_Mutex_Initialization(&(*queue)->queueMutex);
76
0
                if (SOPC_STATUS_OK != status)
77
0
                {
78
0
                    SOPC_SLinkedList_Delete((*queue)->queueList);
79
0
                    (*queue)->queueList = NULL;
80
0
                    SOPC_Condition_Clear(&(*queue)->queueCond);
81
0
                }
82
0
            }
83
0
            if (SOPC_STATUS_OK != status)
84
0
            {
85
0
                SOPC_Free(*queue);
86
0
                *queue = NULL;
87
0
            }
88
0
        }
89
0
    }
90
0
    return status;
91
0
}
92
93
uint32_t SOPC_AsyncQueue_GetSize(SOPC_AsyncQueue* queue)
94
0
{
95
0
    uint32_t result = 0;
96
0
    if (NULL == queue)
97
0
    {
98
0
        return result;
99
0
    }
100
0
    SOPC_ReturnStatus status = SOPC_Mutex_Lock(&queue->queueMutex);
101
0
    if (SOPC_STATUS_OK == status)
102
0
    {
103
0
        result = SOPC_SLinkedList_GetLength(queue->queueList);
104
0
        SOPC_Mutex_Unlock(&queue->queueMutex);
105
0
    }
106
0
    return result;
107
0
}
108
109
static SOPC_ReturnStatus SOPC_AsyncQueue_BlockingEnqueueFirstOrLast(SOPC_AsyncQueue* queue,
110
                                                                    void* element,
111
                                                                    bool firstOut)
112
0
{
113
0
    SOPC_ReturnStatus status = SOPC_STATUS_INVALID_PARAMETERS;
114
0
    void* enqueuedElt = NULL;
115
116
0
    if (NULL == queue || NULL == element)
117
0
    {
118
0
        return SOPC_STATUS_INVALID_PARAMETERS;
119
0
    }
120
121
0
    status = SOPC_Mutex_Lock(&queue->queueMutex);
122
0
    if (SOPC_STATUS_OK == status)
123
0
    {
124
0
        if (false == firstOut)
125
0
        {
126
0
            enqueuedElt = (void*) SOPC_SLinkedList_Append(queue->queueList, 0, (uintptr_t) element);
127
0
        }
128
0
        else
129
0
        {
130
0
            enqueuedElt = (void*) SOPC_SLinkedList_Prepend(queue->queueList, 0, (uintptr_t) element);
131
0
        }
132
0
        if (element == enqueuedElt)
133
0
        {
134
0
            if (queue->waitingThreads > 0)
135
0
            {
136
0
                SOPC_Condition_SignalAll(&queue->queueCond);
137
0
            }
138
0
            uint32_t queueLength = SOPC_SLinkedList_GetLength(queue->queueList);
139
0
            if (queue->maxListLengthForWarning != 0 && queueLength > queue->maxListLengthForWarning &&
140
0
                queueLength % ((SOPC_MAX_NB_ELEMENTS_ASYNC_QUEUE / 10) + 1) == 0)
141
0
            {
142
0
                SOPC_Logger_TraceWarning(SOPC_LOG_MODULE_COMMON,
143
0
                                         "Maximum length of queue '%s' exceeded: %" PRIu32 " (>%" PRIu32 ")",
144
0
                                         queue->debugQueueName, queueLength, queue->maxListLengthForWarning);
145
0
            }
146
0
        }
147
0
        else
148
0
        {
149
0
            SOPC_Logger_TraceError(SOPC_LOG_MODULE_COMMON, "Unable to Enqueue on queue %s", queue->debugQueueName);
150
0
            status = SOPC_STATUS_NOK;
151
0
        }
152
0
        SOPC_Mutex_Unlock(&queue->queueMutex);
153
0
    }
154
155
0
    return status;
156
0
}
157
158
SOPC_ReturnStatus SOPC_AsyncQueue_BlockingEnqueueFirstOut(SOPC_AsyncQueue* queue, void* element)
159
0
{
160
0
    return SOPC_AsyncQueue_BlockingEnqueueFirstOrLast(queue, element, true);
161
0
}
162
163
SOPC_ReturnStatus SOPC_AsyncQueue_BlockingEnqueue(SOPC_AsyncQueue* queue, void* element)
164
0
{
165
0
    return SOPC_AsyncQueue_BlockingEnqueueFirstOrLast(queue, element, false);
166
0
}
167
168
static SOPC_ReturnStatus SOPC_AsyncQueue_Dequeue(SOPC_AsyncQueue* queue, bool isBlocking, void** element)
169
0
{
170
0
    SOPC_ReturnStatus status = SOPC_STATUS_INVALID_PARAMETERS;
171
0
    if (NULL != queue && NULL != element)
172
0
    {
173
0
        SOPC_Mutex_Lock(&queue->queueMutex);
174
0
        *element = (void*) SOPC_SLinkedList_PopHead(queue->queueList);
175
0
        if (NULL == *element)
176
0
        {
177
0
            if (false == isBlocking)
178
0
            {
179
0
                status = SOPC_STATUS_WOULD_BLOCK;
180
0
            }
181
0
            else
182
0
            {
183
0
                queue->waitingThreads++;
184
0
                *element = (void*) SOPC_SLinkedList_PopHead(queue->queueList);
185
0
                while (NULL == *element)
186
0
                {
187
0
                    SOPC_Mutex_UnlockAndWaitCond(&queue->queueCond, &queue->queueMutex);
188
0
                    *element = (void*) SOPC_SLinkedList_PopHead(queue->queueList);
189
0
                }
190
0
                status = SOPC_STATUS_OK;
191
0
                queue->waitingThreads--;
192
0
            }
193
0
        }
194
0
        else
195
0
        {
196
0
            status = SOPC_STATUS_OK;
197
0
        }
198
0
        SOPC_Mutex_Unlock(&queue->queueMutex);
199
0
    }
200
0
    return status;
201
0
}
202
203
SOPC_ReturnStatus SOPC_AsyncQueue_BlockingDequeue(SOPC_AsyncQueue* queue, void** element)
204
0
{
205
0
    return SOPC_AsyncQueue_Dequeue(queue, true, element);
206
0
}
207
208
SOPC_ReturnStatus SOPC_AsyncQueue_NonBlockingDequeue(SOPC_AsyncQueue* queue, void** element)
209
0
{
210
0
    return SOPC_AsyncQueue_Dequeue(queue, false, element);
211
0
}
212
213
void SOPC_AsyncQueue_Free(SOPC_AsyncQueue** queue)
214
0
{
215
0
    if (NULL != queue)
216
0
    {
217
0
        if (NULL != *queue && NULL != (*queue)->queueList)
218
0
        {
219
0
            SOPC_SLinkedList_Apply((*queue)->queueList, SOPC_SLinkedList_EltGenericFree);
220
0
            SOPC_SLinkedList_Delete((*queue)->queueList);
221
0
            SOPC_Mutex_Clear(&(*queue)->queueMutex);
222
0
            SOPC_Condition_Clear(&(*queue)->queueCond);
223
0
        }
224
0
        SOPC_Free(*queue);
225
0
        *queue = NULL;
226
0
    }
227
0
}