/src/FreeRDP/winpr/libwinpr/utils/collections/MessageQueue.c
Line | Count | Source |
1 | | /** |
2 | | * WinPR: Windows Portable Runtime |
3 | | * Message Queue |
4 | | * |
5 | | * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com> |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <winpr/config.h> |
21 | | |
22 | | #include <winpr/crt.h> |
23 | | #include <winpr/sysinfo.h> |
24 | | #include <winpr/assert.h> |
25 | | |
26 | | #include <winpr/collections.h> |
27 | | |
28 | | struct s_wMessageQueue |
29 | | { |
30 | | size_t head; |
31 | | size_t tail; |
32 | | size_t size; |
33 | | size_t capacity; |
34 | | BOOL closed; |
35 | | wMessage* array; |
36 | | CRITICAL_SECTION lock; |
37 | | HANDLE event; |
38 | | |
39 | | wObject object; |
40 | | }; |
41 | | |
42 | | /** |
43 | | * Message Queue inspired from Windows: |
44 | | * http://msdn.microsoft.com/en-us/library/ms632590/ |
45 | | */ |
46 | | |
47 | | /** |
48 | | * Properties |
49 | | */ |
50 | | |
51 | | wObject* MessageQueue_Object(wMessageQueue* queue) |
52 | 8.11k | { |
53 | 8.11k | WINPR_ASSERT(queue); |
54 | 8.11k | return &queue->object; |
55 | 8.11k | } |
56 | | |
57 | | /** |
58 | | * Gets an event which is set when the queue is non-empty |
59 | | */ |
60 | | |
61 | | HANDLE MessageQueue_Event(wMessageQueue* queue) |
62 | 0 | { |
63 | 0 | WINPR_ASSERT(queue); |
64 | 0 | return queue->event; |
65 | 0 | } |
66 | | |
67 | | /** |
68 | | * Gets the queue size |
69 | | */ |
70 | | |
71 | | size_t MessageQueue_Size(wMessageQueue* queue) |
72 | 0 | { |
73 | 0 | WINPR_ASSERT(queue); |
74 | 0 | EnterCriticalSection(&queue->lock); |
75 | 0 | const size_t ret = queue->size; |
76 | 0 | LeaveCriticalSection(&queue->lock); |
77 | 0 | return ret; |
78 | 0 | } |
79 | | |
80 | | size_t MessageQueue_Capacity(wMessageQueue* queue) |
81 | 0 | { |
82 | 0 | WINPR_ASSERT(queue); |
83 | 0 | EnterCriticalSection(&queue->lock); |
84 | 0 | const size_t ret = queue->capacity; |
85 | 0 | LeaveCriticalSection(&queue->lock); |
86 | 0 | return ret; |
87 | 0 | } |
88 | | |
89 | | /** |
90 | | * Methods |
91 | | */ |
92 | | |
93 | | BOOL MessageQueue_Wait(wMessageQueue* queue) |
94 | 0 | { |
95 | 0 | BOOL status = FALSE; |
96 | |
|
97 | 0 | WINPR_ASSERT(queue); |
98 | 0 | if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0) |
99 | 0 | status = TRUE; |
100 | |
|
101 | 0 | return status; |
102 | 0 | } |
103 | | |
104 | | static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count) |
105 | 38.9k | { |
106 | 38.9k | BOOL res = TRUE; |
107 | 38.9k | const size_t increment = 128; |
108 | 38.9k | WINPR_ASSERT(queue); |
109 | | |
110 | 38.9k | const size_t required = queue->size + count; |
111 | | // check for overflow |
112 | 38.9k | if ((required < queue->size) || (required < count) || |
113 | 38.9k | (required > (SIZE_MAX - increment) / sizeof(wMessage))) |
114 | 0 | return FALSE; |
115 | | |
116 | 38.9k | if (required > queue->capacity) |
117 | 38.9k | { |
118 | 38.9k | const size_t old_capacity = queue->capacity; |
119 | 38.9k | const size_t new_capacity = required + increment; |
120 | | |
121 | 38.9k | wMessage* new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity); |
122 | 38.9k | if (!new_arr) |
123 | 0 | return FALSE; |
124 | 38.9k | queue->array = new_arr; |
125 | 38.9k | queue->capacity = new_capacity; |
126 | 38.9k | ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage)); |
127 | | |
128 | | /* rearrange wrapped entries: |
129 | | * fill up the newly available space and move tail |
130 | | * back by the amount of elements that have been moved to the newly |
131 | | * allocated space. |
132 | | */ |
133 | 38.9k | if (queue->tail <= queue->head) |
134 | 38.9k | { |
135 | 38.9k | size_t tocopy = queue->tail; |
136 | 38.9k | size_t slots = new_capacity - old_capacity; |
137 | 38.9k | const size_t batch = (tocopy < slots) ? tocopy : slots; |
138 | 38.9k | CopyMemory(&(queue->array[old_capacity]), queue->array, batch * sizeof(wMessage)); |
139 | | |
140 | | /* Tail is decremented. if the whole thing is appended |
141 | | * just move the existing tail by old_capacity */ |
142 | 38.9k | if (tocopy < slots) |
143 | 38.9k | { |
144 | 38.9k | ZeroMemory(queue->array, batch * sizeof(wMessage)); |
145 | 38.9k | queue->tail += old_capacity; |
146 | 38.9k | } |
147 | 0 | else |
148 | 0 | { |
149 | 0 | const size_t remain = queue->tail - batch; |
150 | 0 | const size_t movesize = remain * sizeof(wMessage); |
151 | 0 | res = memmove_s(queue->array, queue->tail * sizeof(wMessage), &queue->array[batch], |
152 | 0 | movesize) >= 0; |
153 | |
|
154 | 0 | const size_t zerooffset = remain; |
155 | 0 | const size_t zerosize = (queue->tail - remain) * sizeof(wMessage); |
156 | 0 | ZeroMemory(&queue->array[zerooffset], zerosize); |
157 | 0 | queue->tail -= batch; |
158 | 0 | } |
159 | 38.9k | } |
160 | 38.9k | } |
161 | | |
162 | 38.9k | return res; |
163 | 38.9k | } |
164 | | |
165 | | BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message) |
166 | 0 | { |
167 | 0 | wMessage* dst = nullptr; |
168 | 0 | BOOL ret = FALSE; |
169 | 0 | WINPR_ASSERT(queue); |
170 | |
|
171 | 0 | if (!message) |
172 | 0 | return FALSE; |
173 | | |
174 | 0 | WINPR_ASSERT(queue); |
175 | 0 | EnterCriticalSection(&queue->lock); |
176 | |
|
177 | 0 | if (queue->closed) |
178 | 0 | goto out; |
179 | | |
180 | 0 | if (!MessageQueue_EnsureCapacity(queue, 1)) |
181 | 0 | goto out; |
182 | | |
183 | 0 | dst = &(queue->array[queue->tail]); |
184 | 0 | *dst = *message; |
185 | 0 | dst->time = GetTickCount64(); |
186 | |
|
187 | 0 | queue->tail = (queue->tail + 1) % queue->capacity; |
188 | 0 | queue->size++; |
189 | |
|
190 | 0 | if (queue->size > 0) |
191 | 0 | (void)SetEvent(queue->event); |
192 | |
|
193 | 0 | if (message->id == WMQ_QUIT) |
194 | 0 | queue->closed = TRUE; |
195 | |
|
196 | 0 | ret = TRUE; |
197 | 0 | out: |
198 | 0 | LeaveCriticalSection(&queue->lock); |
199 | 0 | return ret; |
200 | 0 | } |
201 | | |
202 | | BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam) |
203 | 0 | { |
204 | 0 | wMessage message = WINPR_C_ARRAY_INIT; |
205 | |
|
206 | 0 | message.context = context; |
207 | 0 | message.id = type; |
208 | 0 | message.wParam = wParam; |
209 | 0 | message.lParam = lParam; |
210 | 0 | message.Free = nullptr; |
211 | |
|
212 | 0 | return MessageQueue_Dispatch(queue, &message); |
213 | 0 | } |
214 | | |
215 | | BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode) |
216 | 0 | { |
217 | 0 | return MessageQueue_Post(queue, nullptr, WMQ_QUIT, (void*)(size_t)nExitCode, nullptr); |
218 | 0 | } |
219 | | |
220 | | int MessageQueue_Get(wMessageQueue* queue, wMessage* message) |
221 | 0 | { |
222 | 0 | int status = -1; |
223 | |
|
224 | 0 | if (!MessageQueue_Wait(queue)) |
225 | 0 | return status; |
226 | | |
227 | 0 | EnterCriticalSection(&queue->lock); |
228 | |
|
229 | 0 | if (queue->size > 0) |
230 | 0 | { |
231 | 0 | CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); |
232 | 0 | ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); |
233 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
234 | 0 | queue->size--; |
235 | |
|
236 | 0 | if (queue->size < 1) |
237 | 0 | (void)ResetEvent(queue->event); |
238 | |
|
239 | 0 | status = (message->id != WMQ_QUIT) ? 1 : 0; |
240 | 0 | } |
241 | |
|
242 | 0 | LeaveCriticalSection(&queue->lock); |
243 | |
|
244 | 0 | return status; |
245 | 0 | } |
246 | | |
247 | | int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove) |
248 | 0 | { |
249 | 0 | int status = 0; |
250 | |
|
251 | 0 | WINPR_ASSERT(queue); |
252 | 0 | EnterCriticalSection(&queue->lock); |
253 | |
|
254 | 0 | if (queue->size > 0) |
255 | 0 | { |
256 | 0 | CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); |
257 | 0 | status = 1; |
258 | |
|
259 | 0 | if (remove) |
260 | 0 | { |
261 | 0 | ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); |
262 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
263 | 0 | queue->size--; |
264 | |
|
265 | 0 | if (queue->size < 1) |
266 | 0 | { |
267 | 0 | if (ResetEvent(queue->event)) |
268 | 0 | status = -1; |
269 | 0 | } |
270 | 0 | } |
271 | 0 | } |
272 | |
|
273 | 0 | LeaveCriticalSection(&queue->lock); |
274 | |
|
275 | 0 | return status; |
276 | 0 | } |
277 | | |
278 | | /** |
279 | | * Construction, Destruction |
280 | | */ |
281 | | |
282 | | wMessageQueue* MessageQueue_New(const wObject* callback) |
283 | 38.9k | { |
284 | 38.9k | wMessageQueue* queue = nullptr; |
285 | | |
286 | 38.9k | queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue)); |
287 | 38.9k | if (!queue) |
288 | 0 | return nullptr; |
289 | | |
290 | 38.9k | if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000)) |
291 | 0 | goto fail; |
292 | | |
293 | 38.9k | if (!MessageQueue_EnsureCapacity(queue, 32)) |
294 | 0 | goto fail; |
295 | | |
296 | 38.9k | queue->event = CreateEvent(nullptr, TRUE, FALSE, nullptr); |
297 | 38.9k | if (!queue->event) |
298 | 0 | goto fail; |
299 | | |
300 | 38.9k | if (callback) |
301 | 30.8k | queue->object = *callback; |
302 | | |
303 | 38.9k | return queue; |
304 | | |
305 | 0 | fail: |
306 | 0 | WINPR_PRAGMA_DIAG_PUSH |
307 | 0 | WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC |
308 | 0 | MessageQueue_Free(queue); |
309 | 0 | WINPR_PRAGMA_DIAG_POP |
310 | 0 | return nullptr; |
311 | 38.9k | } |
312 | | |
313 | | void MessageQueue_Free(wMessageQueue* queue) |
314 | 38.9k | { |
315 | 38.9k | if (!queue) |
316 | 0 | return; |
317 | | |
318 | 38.9k | if (queue->event) |
319 | 38.9k | MessageQueue_Clear(queue); |
320 | | |
321 | 38.9k | (void)CloseHandle(queue->event); |
322 | 38.9k | DeleteCriticalSection(&queue->lock); |
323 | | |
324 | 38.9k | free(queue->array); |
325 | 38.9k | free(queue); |
326 | 38.9k | } |
327 | | |
328 | | int MessageQueue_Clear(wMessageQueue* queue) |
329 | 38.9k | { |
330 | 38.9k | int status = 0; |
331 | | |
332 | 38.9k | WINPR_ASSERT(queue); |
333 | 38.9k | WINPR_ASSERT(queue->event); |
334 | | |
335 | 38.9k | EnterCriticalSection(&queue->lock); |
336 | | |
337 | 38.9k | while (queue->size > 0) |
338 | 0 | { |
339 | 0 | wMessage* msg = &(queue->array[queue->head]); |
340 | | |
341 | | /* Free resources of message. */ |
342 | 0 | if (queue->object.fnObjectUninit) |
343 | 0 | queue->object.fnObjectUninit(msg); |
344 | 0 | if (queue->object.fnObjectFree) |
345 | 0 | queue->object.fnObjectFree(msg); |
346 | |
|
347 | 0 | ZeroMemory(msg, sizeof(wMessage)); |
348 | |
|
349 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
350 | 0 | queue->size--; |
351 | 0 | } |
352 | 38.9k | if (!ResetEvent(queue->event)) |
353 | 0 | status = -1; |
354 | 38.9k | queue->closed = FALSE; |
355 | | |
356 | 38.9k | LeaveCriticalSection(&queue->lock); |
357 | | |
358 | 38.9k | return status; |
359 | 38.9k | } |