Coverage Report

Created: 2026-02-26 06:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/FreeRDP/winpr/libwinpr/utils/collections/MessageQueue.c
Line
Count
Source
1
/**
2
 * WinPR: Windows Portable Runtime
3
 * Message Queue
4
 *
5
 * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
6
 *
7
 * Licensed under the Apache License, Version 2.0 (the "License");
8
 * you may not use this file except in compliance with the License.
9
 * You may obtain a copy of the License at
10
 *
11
 *     http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing, software
14
 * distributed under the License is distributed on an "AS IS" BASIS,
15
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 * See the License for the specific language governing permissions and
17
 * limitations under the License.
18
 */
19
20
#include <winpr/config.h>
21
22
#include <winpr/crt.h>
23
#include <winpr/sysinfo.h>
24
#include <winpr/assert.h>
25
26
#include <winpr/collections.h>
27
28
struct s_wMessageQueue
29
{
30
  size_t head;
31
  size_t tail;
32
  size_t size;
33
  size_t capacity;
34
  BOOL closed;
35
  wMessage* array;
36
  CRITICAL_SECTION lock;
37
  HANDLE event;
38
39
  wObject object;
40
};
41
42
/**
43
 * Message Queue inspired from Windows:
44
 * http://msdn.microsoft.com/en-us/library/ms632590/
45
 */
46
47
/**
48
 * Properties
49
 */
50
51
wObject* MessageQueue_Object(wMessageQueue* queue)
52
8.02k
{
53
8.02k
  WINPR_ASSERT(queue);
54
8.02k
  return &queue->object;
55
8.02k
}
56
57
/**
58
 * Gets an event which is set when the queue is non-empty
59
 */
60
61
HANDLE MessageQueue_Event(wMessageQueue* queue)
62
0
{
63
0
  WINPR_ASSERT(queue);
64
0
  return queue->event;
65
0
}
66
67
/**
68
 * Gets the queue size
69
 */
70
71
size_t MessageQueue_Size(wMessageQueue* queue)
72
0
{
73
0
  WINPR_ASSERT(queue);
74
0
  EnterCriticalSection(&queue->lock);
75
0
  const size_t ret = queue->size;
76
0
  LeaveCriticalSection(&queue->lock);
77
0
  return ret;
78
0
}
79
80
size_t MessageQueue_Capacity(wMessageQueue* queue)
81
0
{
82
0
  WINPR_ASSERT(queue);
83
0
  EnterCriticalSection(&queue->lock);
84
0
  const size_t ret = queue->capacity;
85
0
  LeaveCriticalSection(&queue->lock);
86
0
  return ret;
87
0
}
88
89
/**
90
 * Methods
91
 */
92
93
BOOL MessageQueue_Wait(wMessageQueue* queue)
94
0
{
95
0
  BOOL status = FALSE;
96
97
0
  WINPR_ASSERT(queue);
98
0
  if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
99
0
    status = TRUE;
100
101
0
  return status;
102
0
}
103
104
static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count)
105
36.8k
{
106
36.8k
  const size_t increment = 128;
107
36.8k
  WINPR_ASSERT(queue);
108
109
36.8k
  const size_t required = queue->size + count;
110
  // check for overflow
111
36.8k
  if ((required < queue->size) || (required < count) ||
112
36.8k
      (required > (SIZE_MAX - increment) / sizeof(wMessage)))
113
0
    return FALSE;
114
115
36.8k
  if (required > queue->capacity)
116
36.8k
  {
117
36.8k
    const size_t old_capacity = queue->capacity;
118
36.8k
    const size_t new_capacity = required + increment;
119
120
36.8k
    wMessage* new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity);
121
36.8k
    if (!new_arr)
122
0
      return FALSE;
123
36.8k
    queue->array = new_arr;
124
36.8k
    queue->capacity = new_capacity;
125
36.8k
    ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage));
126
127
    /* rearrange wrapped entries:
128
     * fill up the newly available space and move tail
129
     * back by the amount of elements that have been moved to the newly
130
     * allocated space.
131
     */
132
36.8k
    if (queue->tail <= queue->head)
133
36.8k
    {
134
36.8k
      size_t tocopy = queue->tail;
135
36.8k
      size_t slots = new_capacity - old_capacity;
136
36.8k
      const size_t batch = (tocopy < slots) ? tocopy : slots;
137
36.8k
      CopyMemory(&(queue->array[old_capacity]), queue->array, batch * sizeof(wMessage));
138
139
      /* Tail is decremented. if the whole thing is appended
140
       * just move the existing tail by old_capacity */
141
36.8k
      if (tocopy < slots)
142
36.8k
      {
143
36.8k
        ZeroMemory(queue->array, batch * sizeof(wMessage));
144
36.8k
        queue->tail += old_capacity;
145
36.8k
      }
146
0
      else
147
0
      {
148
0
        const size_t remain = queue->tail - batch;
149
0
        const size_t movesize = remain * sizeof(wMessage);
150
0
        memmove_s(queue->array, queue->tail * sizeof(wMessage), &queue->array[batch],
151
0
                  movesize);
152
153
0
        const size_t zerooffset = remain;
154
0
        const size_t zerosize = (queue->tail - remain) * sizeof(wMessage);
155
0
        ZeroMemory(&queue->array[zerooffset], zerosize);
156
0
        queue->tail -= batch;
157
0
      }
158
36.8k
    }
159
36.8k
  }
160
161
36.8k
  return TRUE;
162
36.8k
}
163
164
BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message)
165
0
{
166
0
  wMessage* dst = NULL;
167
0
  BOOL ret = FALSE;
168
0
  WINPR_ASSERT(queue);
169
170
0
  if (!message)
171
0
    return FALSE;
172
173
0
  WINPR_ASSERT(queue);
174
0
  EnterCriticalSection(&queue->lock);
175
176
0
  if (queue->closed)
177
0
    goto out;
178
179
0
  if (!MessageQueue_EnsureCapacity(queue, 1))
180
0
    goto out;
181
182
0
  dst = &(queue->array[queue->tail]);
183
0
  *dst = *message;
184
0
  dst->time = GetTickCount64();
185
186
0
  queue->tail = (queue->tail + 1) % queue->capacity;
187
0
  queue->size++;
188
189
0
  if (queue->size > 0)
190
0
    (void)SetEvent(queue->event);
191
192
0
  if (message->id == WMQ_QUIT)
193
0
    queue->closed = TRUE;
194
195
0
  ret = TRUE;
196
0
out:
197
0
  LeaveCriticalSection(&queue->lock);
198
0
  return ret;
199
0
}
200
201
BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
202
0
{
203
0
  wMessage message = WINPR_C_ARRAY_INIT;
204
205
0
  message.context = context;
206
0
  message.id = type;
207
0
  message.wParam = wParam;
208
0
  message.lParam = lParam;
209
0
  message.Free = NULL;
210
211
0
  return MessageQueue_Dispatch(queue, &message);
212
0
}
213
214
BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
215
0
{
216
0
  return MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*)(size_t)nExitCode, NULL);
217
0
}
218
219
int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
220
0
{
221
0
  int status = -1;
222
223
0
  if (!MessageQueue_Wait(queue))
224
0
    return status;
225
226
0
  EnterCriticalSection(&queue->lock);
227
228
0
  if (queue->size > 0)
229
0
  {
230
0
    CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
231
0
    ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
232
0
    queue->head = (queue->head + 1) % queue->capacity;
233
0
    queue->size--;
234
235
0
    if (queue->size < 1)
236
0
      (void)ResetEvent(queue->event);
237
238
0
    status = (message->id != WMQ_QUIT) ? 1 : 0;
239
0
  }
240
241
0
  LeaveCriticalSection(&queue->lock);
242
243
0
  return status;
244
0
}
245
246
int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
247
0
{
248
0
  int status = 0;
249
250
0
  WINPR_ASSERT(queue);
251
0
  EnterCriticalSection(&queue->lock);
252
253
0
  if (queue->size > 0)
254
0
  {
255
0
    CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
256
0
    status = 1;
257
258
0
    if (remove)
259
0
    {
260
0
      ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
261
0
      queue->head = (queue->head + 1) % queue->capacity;
262
0
      queue->size--;
263
264
0
      if (queue->size < 1)
265
0
        (void)ResetEvent(queue->event);
266
0
    }
267
0
  }
268
269
0
  LeaveCriticalSection(&queue->lock);
270
271
0
  return status;
272
0
}
273
274
/**
275
 * Construction, Destruction
276
 */
277
278
wMessageQueue* MessageQueue_New(const wObject* callback)
279
36.8k
{
280
36.8k
  wMessageQueue* queue = NULL;
281
282
36.8k
  queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue));
283
36.8k
  if (!queue)
284
0
    return NULL;
285
286
36.8k
  if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000))
287
0
    goto fail;
288
289
36.8k
  if (!MessageQueue_EnsureCapacity(queue, 32))
290
0
    goto fail;
291
292
36.8k
  queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
293
36.8k
  if (!queue->event)
294
0
    goto fail;
295
296
36.8k
  if (callback)
297
28.8k
    queue->object = *callback;
298
299
36.8k
  return queue;
300
301
0
fail:
302
0
  WINPR_PRAGMA_DIAG_PUSH
303
0
  WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
304
0
  MessageQueue_Free(queue);
305
0
  WINPR_PRAGMA_DIAG_POP
306
0
  return NULL;
307
36.8k
}
308
309
void MessageQueue_Free(wMessageQueue* queue)
310
36.8k
{
311
36.8k
  if (!queue)
312
0
    return;
313
314
36.8k
  if (queue->event)
315
36.8k
    MessageQueue_Clear(queue);
316
317
36.8k
  (void)CloseHandle(queue->event);
318
36.8k
  DeleteCriticalSection(&queue->lock);
319
320
36.8k
  free(queue->array);
321
36.8k
  free(queue);
322
36.8k
}
323
324
int MessageQueue_Clear(wMessageQueue* queue)
325
36.8k
{
326
36.8k
  int status = 0;
327
328
36.8k
  WINPR_ASSERT(queue);
329
36.8k
  WINPR_ASSERT(queue->event);
330
331
36.8k
  EnterCriticalSection(&queue->lock);
332
333
36.8k
  while (queue->size > 0)
334
0
  {
335
0
    wMessage* msg = &(queue->array[queue->head]);
336
337
    /* Free resources of message. */
338
0
    if (queue->object.fnObjectUninit)
339
0
      queue->object.fnObjectUninit(msg);
340
0
    if (queue->object.fnObjectFree)
341
0
      queue->object.fnObjectFree(msg);
342
343
0
    ZeroMemory(msg, sizeof(wMessage));
344
345
0
    queue->head = (queue->head + 1) % queue->capacity;
346
0
    queue->size--;
347
0
  }
348
36.8k
  (void)ResetEvent(queue->event);
349
36.8k
  queue->closed = FALSE;
350
351
36.8k
  LeaveCriticalSection(&queue->lock);
352
353
36.8k
  return status;
354
36.8k
}