/src/FreeRDP/winpr/libwinpr/utils/collections/MessageQueue.c
Line | Count | Source |
1 | | /** |
2 | | * WinPR: Windows Portable Runtime |
3 | | * Message Queue |
4 | | * |
5 | | * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com> |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <winpr/config.h> |
21 | | |
22 | | #include <winpr/crt.h> |
23 | | #include <winpr/sysinfo.h> |
24 | | #include <winpr/assert.h> |
25 | | |
26 | | #include <winpr/collections.h> |
27 | | |
28 | | struct s_wMessageQueue |
29 | | { |
30 | | size_t head; |
31 | | size_t tail; |
32 | | size_t size; |
33 | | size_t capacity; |
34 | | BOOL closed; |
35 | | wMessage* array; |
36 | | CRITICAL_SECTION lock; |
37 | | HANDLE event; |
38 | | |
39 | | wObject object; |
40 | | }; |
41 | | |
42 | | /** |
43 | | * Message Queue inspired from Windows: |
44 | | * http://msdn.microsoft.com/en-us/library/ms632590/ |
45 | | */ |
46 | | |
47 | | /** |
48 | | * Properties |
49 | | */ |
50 | | |
51 | | wObject* MessageQueue_Object(wMessageQueue* queue) |
52 | 8.02k | { |
53 | 8.02k | WINPR_ASSERT(queue); |
54 | 8.02k | return &queue->object; |
55 | 8.02k | } |
56 | | |
57 | | /** |
58 | | * Gets an event which is set when the queue is non-empty |
59 | | */ |
60 | | |
61 | | HANDLE MessageQueue_Event(wMessageQueue* queue) |
62 | 0 | { |
63 | 0 | WINPR_ASSERT(queue); |
64 | 0 | return queue->event; |
65 | 0 | } |
66 | | |
67 | | /** |
68 | | * Gets the queue size |
69 | | */ |
70 | | |
71 | | size_t MessageQueue_Size(wMessageQueue* queue) |
72 | 0 | { |
73 | 0 | WINPR_ASSERT(queue); |
74 | 0 | EnterCriticalSection(&queue->lock); |
75 | 0 | const size_t ret = queue->size; |
76 | 0 | LeaveCriticalSection(&queue->lock); |
77 | 0 | return ret; |
78 | 0 | } |
79 | | |
80 | | size_t MessageQueue_Capacity(wMessageQueue* queue) |
81 | 0 | { |
82 | 0 | WINPR_ASSERT(queue); |
83 | 0 | EnterCriticalSection(&queue->lock); |
84 | 0 | const size_t ret = queue->capacity; |
85 | 0 | LeaveCriticalSection(&queue->lock); |
86 | 0 | return ret; |
87 | 0 | } |
88 | | |
89 | | /** |
90 | | * Methods |
91 | | */ |
92 | | |
93 | | BOOL MessageQueue_Wait(wMessageQueue* queue) |
94 | 0 | { |
95 | 0 | BOOL status = FALSE; |
96 | |
|
97 | 0 | WINPR_ASSERT(queue); |
98 | 0 | if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0) |
99 | 0 | status = TRUE; |
100 | |
|
101 | 0 | return status; |
102 | 0 | } |
103 | | |
104 | | static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count) |
105 | 36.8k | { |
106 | 36.8k | const size_t increment = 128; |
107 | 36.8k | WINPR_ASSERT(queue); |
108 | | |
109 | 36.8k | const size_t required = queue->size + count; |
110 | | // check for overflow |
111 | 36.8k | if ((required < queue->size) || (required < count) || |
112 | 36.8k | (required > (SIZE_MAX - increment) / sizeof(wMessage))) |
113 | 0 | return FALSE; |
114 | | |
115 | 36.8k | if (required > queue->capacity) |
116 | 36.8k | { |
117 | 36.8k | const size_t old_capacity = queue->capacity; |
118 | 36.8k | const size_t new_capacity = required + increment; |
119 | | |
120 | 36.8k | wMessage* new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity); |
121 | 36.8k | if (!new_arr) |
122 | 0 | return FALSE; |
123 | 36.8k | queue->array = new_arr; |
124 | 36.8k | queue->capacity = new_capacity; |
125 | 36.8k | ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage)); |
126 | | |
127 | | /* rearrange wrapped entries: |
128 | | * fill up the newly available space and move tail |
129 | | * back by the amount of elements that have been moved to the newly |
130 | | * allocated space. |
131 | | */ |
132 | 36.8k | if (queue->tail <= queue->head) |
133 | 36.8k | { |
134 | 36.8k | size_t tocopy = queue->tail; |
135 | 36.8k | size_t slots = new_capacity - old_capacity; |
136 | 36.8k | const size_t batch = (tocopy < slots) ? tocopy : slots; |
137 | 36.8k | CopyMemory(&(queue->array[old_capacity]), queue->array, batch * sizeof(wMessage)); |
138 | | |
139 | | /* Tail is decremented. if the whole thing is appended |
140 | | * just move the existing tail by old_capacity */ |
141 | 36.8k | if (tocopy < slots) |
142 | 36.8k | { |
143 | 36.8k | ZeroMemory(queue->array, batch * sizeof(wMessage)); |
144 | 36.8k | queue->tail += old_capacity; |
145 | 36.8k | } |
146 | 0 | else |
147 | 0 | { |
148 | 0 | const size_t remain = queue->tail - batch; |
149 | 0 | const size_t movesize = remain * sizeof(wMessage); |
150 | 0 | memmove_s(queue->array, queue->tail * sizeof(wMessage), &queue->array[batch], |
151 | 0 | movesize); |
152 | |
|
153 | 0 | const size_t zerooffset = remain; |
154 | 0 | const size_t zerosize = (queue->tail - remain) * sizeof(wMessage); |
155 | 0 | ZeroMemory(&queue->array[zerooffset], zerosize); |
156 | 0 | queue->tail -= batch; |
157 | 0 | } |
158 | 36.8k | } |
159 | 36.8k | } |
160 | | |
161 | 36.8k | return TRUE; |
162 | 36.8k | } |
163 | | |
164 | | BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message) |
165 | 0 | { |
166 | 0 | wMessage* dst = NULL; |
167 | 0 | BOOL ret = FALSE; |
168 | 0 | WINPR_ASSERT(queue); |
169 | |
|
170 | 0 | if (!message) |
171 | 0 | return FALSE; |
172 | | |
173 | 0 | WINPR_ASSERT(queue); |
174 | 0 | EnterCriticalSection(&queue->lock); |
175 | |
|
176 | 0 | if (queue->closed) |
177 | 0 | goto out; |
178 | | |
179 | 0 | if (!MessageQueue_EnsureCapacity(queue, 1)) |
180 | 0 | goto out; |
181 | | |
182 | 0 | dst = &(queue->array[queue->tail]); |
183 | 0 | *dst = *message; |
184 | 0 | dst->time = GetTickCount64(); |
185 | |
|
186 | 0 | queue->tail = (queue->tail + 1) % queue->capacity; |
187 | 0 | queue->size++; |
188 | |
|
189 | 0 | if (queue->size > 0) |
190 | 0 | (void)SetEvent(queue->event); |
191 | |
|
192 | 0 | if (message->id == WMQ_QUIT) |
193 | 0 | queue->closed = TRUE; |
194 | |
|
195 | 0 | ret = TRUE; |
196 | 0 | out: |
197 | 0 | LeaveCriticalSection(&queue->lock); |
198 | 0 | return ret; |
199 | 0 | } |
200 | | |
201 | | BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam) |
202 | 0 | { |
203 | 0 | wMessage message = WINPR_C_ARRAY_INIT; |
204 | |
|
205 | 0 | message.context = context; |
206 | 0 | message.id = type; |
207 | 0 | message.wParam = wParam; |
208 | 0 | message.lParam = lParam; |
209 | 0 | message.Free = NULL; |
210 | |
|
211 | 0 | return MessageQueue_Dispatch(queue, &message); |
212 | 0 | } |
213 | | |
214 | | BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode) |
215 | 0 | { |
216 | 0 | return MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*)(size_t)nExitCode, NULL); |
217 | 0 | } |
218 | | |
219 | | int MessageQueue_Get(wMessageQueue* queue, wMessage* message) |
220 | 0 | { |
221 | 0 | int status = -1; |
222 | |
|
223 | 0 | if (!MessageQueue_Wait(queue)) |
224 | 0 | return status; |
225 | | |
226 | 0 | EnterCriticalSection(&queue->lock); |
227 | |
|
228 | 0 | if (queue->size > 0) |
229 | 0 | { |
230 | 0 | CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); |
231 | 0 | ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); |
232 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
233 | 0 | queue->size--; |
234 | |
|
235 | 0 | if (queue->size < 1) |
236 | 0 | (void)ResetEvent(queue->event); |
237 | |
|
238 | 0 | status = (message->id != WMQ_QUIT) ? 1 : 0; |
239 | 0 | } |
240 | |
|
241 | 0 | LeaveCriticalSection(&queue->lock); |
242 | |
|
243 | 0 | return status; |
244 | 0 | } |
245 | | |
246 | | int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove) |
247 | 0 | { |
248 | 0 | int status = 0; |
249 | |
|
250 | 0 | WINPR_ASSERT(queue); |
251 | 0 | EnterCriticalSection(&queue->lock); |
252 | |
|
253 | 0 | if (queue->size > 0) |
254 | 0 | { |
255 | 0 | CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); |
256 | 0 | status = 1; |
257 | |
|
258 | 0 | if (remove) |
259 | 0 | { |
260 | 0 | ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); |
261 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
262 | 0 | queue->size--; |
263 | |
|
264 | 0 | if (queue->size < 1) |
265 | 0 | (void)ResetEvent(queue->event); |
266 | 0 | } |
267 | 0 | } |
268 | |
|
269 | 0 | LeaveCriticalSection(&queue->lock); |
270 | |
|
271 | 0 | return status; |
272 | 0 | } |
273 | | |
274 | | /** |
275 | | * Construction, Destruction |
276 | | */ |
277 | | |
278 | | wMessageQueue* MessageQueue_New(const wObject* callback) |
279 | 36.8k | { |
280 | 36.8k | wMessageQueue* queue = NULL; |
281 | | |
282 | 36.8k | queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue)); |
283 | 36.8k | if (!queue) |
284 | 0 | return NULL; |
285 | | |
286 | 36.8k | if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000)) |
287 | 0 | goto fail; |
288 | | |
289 | 36.8k | if (!MessageQueue_EnsureCapacity(queue, 32)) |
290 | 0 | goto fail; |
291 | | |
292 | 36.8k | queue->event = CreateEvent(NULL, TRUE, FALSE, NULL); |
293 | 36.8k | if (!queue->event) |
294 | 0 | goto fail; |
295 | | |
296 | 36.8k | if (callback) |
297 | 28.8k | queue->object = *callback; |
298 | | |
299 | 36.8k | return queue; |
300 | | |
301 | 0 | fail: |
302 | 0 | WINPR_PRAGMA_DIAG_PUSH |
303 | 0 | WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC |
304 | 0 | MessageQueue_Free(queue); |
305 | 0 | WINPR_PRAGMA_DIAG_POP |
306 | 0 | return NULL; |
307 | 36.8k | } |
308 | | |
309 | | void MessageQueue_Free(wMessageQueue* queue) |
310 | 36.8k | { |
311 | 36.8k | if (!queue) |
312 | 0 | return; |
313 | | |
314 | 36.8k | if (queue->event) |
315 | 36.8k | MessageQueue_Clear(queue); |
316 | | |
317 | 36.8k | (void)CloseHandle(queue->event); |
318 | 36.8k | DeleteCriticalSection(&queue->lock); |
319 | | |
320 | 36.8k | free(queue->array); |
321 | 36.8k | free(queue); |
322 | 36.8k | } |
323 | | |
324 | | int MessageQueue_Clear(wMessageQueue* queue) |
325 | 36.8k | { |
326 | 36.8k | int status = 0; |
327 | | |
328 | 36.8k | WINPR_ASSERT(queue); |
329 | 36.8k | WINPR_ASSERT(queue->event); |
330 | | |
331 | 36.8k | EnterCriticalSection(&queue->lock); |
332 | | |
333 | 36.8k | while (queue->size > 0) |
334 | 0 | { |
335 | 0 | wMessage* msg = &(queue->array[queue->head]); |
336 | | |
337 | | /* Free resources of message. */ |
338 | 0 | if (queue->object.fnObjectUninit) |
339 | 0 | queue->object.fnObjectUninit(msg); |
340 | 0 | if (queue->object.fnObjectFree) |
341 | 0 | queue->object.fnObjectFree(msg); |
342 | |
|
343 | 0 | ZeroMemory(msg, sizeof(wMessage)); |
344 | |
|
345 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
346 | 0 | queue->size--; |
347 | 0 | } |
348 | 36.8k | (void)ResetEvent(queue->event); |
349 | 36.8k | queue->closed = FALSE; |
350 | | |
351 | 36.8k | LeaveCriticalSection(&queue->lock); |
352 | | |
353 | 36.8k | return status; |
354 | 36.8k | } |