/src/FreeRDP/winpr/libwinpr/utils/collections/MessageQueue.c
Line | Count | Source (jump to first uncovered line) |
1 | | /** |
2 | | * WinPR: Windows Portable Runtime |
3 | | * Message Queue |
4 | | * |
5 | | * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com> |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <winpr/config.h> |
21 | | |
22 | | #include <winpr/crt.h> |
23 | | #include <winpr/sysinfo.h> |
24 | | #include <winpr/assert.h> |
25 | | |
26 | | #include <winpr/collections.h> |
27 | | |
28 | | struct s_wMessageQueue |
29 | | { |
30 | | size_t head; |
31 | | size_t tail; |
32 | | size_t size; |
33 | | size_t capacity; |
34 | | BOOL closed; |
35 | | wMessage* array; |
36 | | CRITICAL_SECTION lock; |
37 | | HANDLE event; |
38 | | |
39 | | wObject object; |
40 | | }; |
41 | | |
42 | | /** |
43 | | * Message Queue inspired from Windows: |
44 | | * http://msdn.microsoft.com/en-us/library/ms632590/ |
45 | | */ |
46 | | |
47 | | /** |
48 | | * Properties |
49 | | */ |
50 | | |
51 | | wObject* MessageQueue_Object(wMessageQueue* queue) |
52 | 0 | { |
53 | 0 | WINPR_ASSERT(queue); |
54 | 0 | return &queue->object; |
55 | 0 | } |
56 | | |
57 | | /** |
58 | | * Gets an event which is set when the queue is non-empty |
59 | | */ |
60 | | |
61 | | HANDLE MessageQueue_Event(wMessageQueue* queue) |
62 | 0 | { |
63 | 0 | WINPR_ASSERT(queue); |
64 | 0 | return queue->event; |
65 | 0 | } |
66 | | |
67 | | /** |
68 | | * Gets the queue size |
69 | | */ |
70 | | |
71 | | size_t MessageQueue_Size(wMessageQueue* queue) |
72 | 0 | { |
73 | 0 | WINPR_ASSERT(queue); |
74 | 0 | EnterCriticalSection(&queue->lock); |
75 | 0 | const size_t ret = queue->size; |
76 | 0 | LeaveCriticalSection(&queue->lock); |
77 | 0 | return ret; |
78 | 0 | } |
79 | | |
80 | | /** |
81 | | * Methods |
82 | | */ |
83 | | |
84 | | BOOL MessageQueue_Wait(wMessageQueue* queue) |
85 | 0 | { |
86 | 0 | BOOL status = FALSE; |
87 | |
|
88 | 0 | WINPR_ASSERT(queue); |
89 | 0 | if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0) |
90 | 0 | status = TRUE; |
91 | |
|
92 | 0 | return status; |
93 | 0 | } |
94 | | |
95 | | static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count) |
96 | 0 | { |
97 | 0 | WINPR_ASSERT(queue); |
98 | | |
99 | 0 | if (queue->size + count >= queue->capacity) |
100 | 0 | { |
101 | 0 | wMessage* new_arr = NULL; |
102 | 0 | size_t old_capacity = queue->capacity; |
103 | 0 | size_t new_capacity = queue->capacity * 2; |
104 | |
|
105 | 0 | if (new_capacity < queue->size + count) |
106 | 0 | new_capacity = queue->size + count; |
107 | |
|
108 | 0 | new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity); |
109 | 0 | if (!new_arr) |
110 | 0 | return FALSE; |
111 | 0 | queue->array = new_arr; |
112 | 0 | queue->capacity = new_capacity; |
113 | 0 | ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage)); |
114 | | |
115 | | /* rearrange wrapped entries */ |
116 | 0 | if (queue->tail <= queue->head) |
117 | 0 | { |
118 | 0 | CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage)); |
119 | 0 | queue->tail += old_capacity; |
120 | 0 | } |
121 | 0 | } |
122 | | |
123 | 0 | return TRUE; |
124 | 0 | } |
125 | | |
126 | | BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message) |
127 | 0 | { |
128 | 0 | wMessage* dst = NULL; |
129 | 0 | BOOL ret = FALSE; |
130 | 0 | WINPR_ASSERT(queue); |
131 | | |
132 | 0 | if (!message) |
133 | 0 | return FALSE; |
134 | | |
135 | 0 | WINPR_ASSERT(queue); |
136 | 0 | EnterCriticalSection(&queue->lock); |
137 | |
|
138 | 0 | if (queue->closed) |
139 | 0 | goto out; |
140 | | |
141 | 0 | if (!MessageQueue_EnsureCapacity(queue, 1)) |
142 | 0 | goto out; |
143 | | |
144 | 0 | dst = &(queue->array[queue->tail]); |
145 | 0 | *dst = *message; |
146 | 0 | dst->time = GetTickCount64(); |
147 | |
|
148 | 0 | queue->tail = (queue->tail + 1) % queue->capacity; |
149 | 0 | queue->size++; |
150 | |
|
151 | 0 | if (queue->size > 0) |
152 | 0 | (void)SetEvent(queue->event); |
153 | |
|
154 | 0 | if (message->id == WMQ_QUIT) |
155 | 0 | queue->closed = TRUE; |
156 | |
|
157 | 0 | ret = TRUE; |
158 | 0 | out: |
159 | 0 | LeaveCriticalSection(&queue->lock); |
160 | 0 | return ret; |
161 | 0 | } |
162 | | |
163 | | BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam) |
164 | 0 | { |
165 | 0 | wMessage message = { 0 }; |
166 | |
|
167 | 0 | message.context = context; |
168 | 0 | message.id = type; |
169 | 0 | message.wParam = wParam; |
170 | 0 | message.lParam = lParam; |
171 | 0 | message.Free = NULL; |
172 | |
|
173 | 0 | return MessageQueue_Dispatch(queue, &message); |
174 | 0 | } |
175 | | |
176 | | BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode) |
177 | 0 | { |
178 | 0 | return MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*)(size_t)nExitCode, NULL); |
179 | 0 | } |
180 | | |
181 | | int MessageQueue_Get(wMessageQueue* queue, wMessage* message) |
182 | 0 | { |
183 | 0 | int status = -1; |
184 | |
|
185 | 0 | if (!MessageQueue_Wait(queue)) |
186 | 0 | return status; |
187 | | |
188 | 0 | EnterCriticalSection(&queue->lock); |
189 | |
|
190 | 0 | if (queue->size > 0) |
191 | 0 | { |
192 | 0 | CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); |
193 | 0 | ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); |
194 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
195 | 0 | queue->size--; |
196 | |
|
197 | 0 | if (queue->size < 1) |
198 | 0 | (void)ResetEvent(queue->event); |
199 | |
|
200 | 0 | status = (message->id != WMQ_QUIT) ? 1 : 0; |
201 | 0 | } |
202 | |
|
203 | 0 | LeaveCriticalSection(&queue->lock); |
204 | |
|
205 | 0 | return status; |
206 | 0 | } |
207 | | |
208 | | int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove) |
209 | 0 | { |
210 | 0 | int status = 0; |
211 | |
|
212 | 0 | WINPR_ASSERT(queue); |
213 | 0 | EnterCriticalSection(&queue->lock); |
214 | |
|
215 | 0 | if (queue->size > 0) |
216 | 0 | { |
217 | 0 | CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); |
218 | 0 | status = 1; |
219 | |
|
220 | 0 | if (remove) |
221 | 0 | { |
222 | 0 | ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); |
223 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
224 | 0 | queue->size--; |
225 | |
|
226 | 0 | if (queue->size < 1) |
227 | 0 | (void)ResetEvent(queue->event); |
228 | 0 | } |
229 | 0 | } |
230 | |
|
231 | 0 | LeaveCriticalSection(&queue->lock); |
232 | |
|
233 | 0 | return status; |
234 | 0 | } |
235 | | |
236 | | /** |
237 | | * Construction, Destruction |
238 | | */ |
239 | | |
240 | | wMessageQueue* MessageQueue_New(const wObject* callback) |
241 | 0 | { |
242 | 0 | wMessageQueue* queue = NULL; |
243 | |
|
244 | 0 | queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue)); |
245 | 0 | if (!queue) |
246 | 0 | return NULL; |
247 | | |
248 | 0 | if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000)) |
249 | 0 | goto fail; |
250 | | |
251 | 0 | if (!MessageQueue_EnsureCapacity(queue, 32)) |
252 | 0 | goto fail; |
253 | | |
254 | 0 | queue->event = CreateEvent(NULL, TRUE, FALSE, NULL); |
255 | 0 | if (!queue->event) |
256 | 0 | goto fail; |
257 | | |
258 | 0 | if (callback) |
259 | 0 | queue->object = *callback; |
260 | |
|
261 | 0 | return queue; |
262 | | |
263 | 0 | fail: |
264 | 0 | WINPR_PRAGMA_DIAG_PUSH |
265 | 0 | WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC |
266 | 0 | MessageQueue_Free(queue); |
267 | 0 | WINPR_PRAGMA_DIAG_POP |
268 | 0 | return NULL; |
269 | 0 | } |
270 | | |
271 | | void MessageQueue_Free(wMessageQueue* queue) |
272 | 0 | { |
273 | 0 | if (!queue) |
274 | 0 | return; |
275 | | |
276 | 0 | if (queue->event) |
277 | 0 | MessageQueue_Clear(queue); |
278 | |
|
279 | 0 | (void)CloseHandle(queue->event); |
280 | 0 | DeleteCriticalSection(&queue->lock); |
281 | |
|
282 | 0 | free(queue->array); |
283 | 0 | free(queue); |
284 | 0 | } |
285 | | |
286 | | int MessageQueue_Clear(wMessageQueue* queue) |
287 | 0 | { |
288 | 0 | int status = 0; |
289 | |
|
290 | 0 | WINPR_ASSERT(queue); |
291 | 0 | WINPR_ASSERT(queue->event); |
292 | | |
293 | 0 | EnterCriticalSection(&queue->lock); |
294 | |
|
295 | 0 | while (queue->size > 0) |
296 | 0 | { |
297 | 0 | wMessage* msg = &(queue->array[queue->head]); |
298 | | |
299 | | /* Free resources of message. */ |
300 | 0 | if (queue->object.fnObjectUninit) |
301 | 0 | queue->object.fnObjectUninit(msg); |
302 | 0 | if (queue->object.fnObjectFree) |
303 | 0 | queue->object.fnObjectFree(msg); |
304 | |
|
305 | 0 | ZeroMemory(msg, sizeof(wMessage)); |
306 | |
|
307 | 0 | queue->head = (queue->head + 1) % queue->capacity; |
308 | 0 | queue->size--; |
309 | 0 | } |
310 | 0 | (void)ResetEvent(queue->event); |
311 | 0 | queue->closed = FALSE; |
312 | |
|
313 | 0 | LeaveCriticalSection(&queue->lock); |
314 | |
|
315 | 0 | return status; |
316 | 0 | } |