Coverage Report

Created: 2026-05-11 06:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/FreeRDP/winpr/libwinpr/utils/collections/StreamPool.c
Line
Count
Source
1
/**
2
 * WinPR: Windows Portable Runtime
3
 * Object Pool
4
 *
5
 * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
6
 *
7
 * Licensed under the Apache License, Version 2.0 (the "License");
8
 * you may not use this file except in compliance with the License.
9
 * You may obtain a copy of the License at
10
 *
11
 *     http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing, software
14
 * distributed under the License is distributed on an "AS IS" BASIS,
15
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 * See the License for the specific language governing permissions and
17
 * limitations under the License.
18
 */
19
20
#include <winpr/config.h>
21
22
#include <winpr/crt.h>
23
#include <winpr/wlog.h>
24
25
#include <winpr/collections.h>
26
27
#include "../stream.h"
28
#include "../log.h"
29
0
#define XTAG WINPR_TAG("utils.streampool")
30
31
struct s_StreamPoolEntry
32
{
33
#if defined(WITH_STREAMPOOL_DEBUG)
34
  char** msg;
35
  size_t lines;
36
#endif
37
  wStream* s;
38
};
39
40
struct s_wStreamPool
41
{
42
  size_t aSize;
43
  size_t aCapacity;
44
  struct s_StreamPoolEntry* aArray;
45
46
  size_t uSize;
47
  size_t uCapacity;
48
  struct s_StreamPoolEntry* uArray;
49
50
  CRITICAL_SECTION lock;
51
  BOOL synchronized;
52
  size_t defaultSize;
53
  wLog* log;
54
};
55
56
static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
57
0
{
58
0
  if (!entry)
59
0
    return;
60
61
0
#if defined(WITH_STREAMPOOL_DEBUG)
62
0
  free((void*)entry->msg);
63
0
#endif
64
65
0
  if (discardStream && entry->s)
66
0
    Stream_Free(entry->s, entry->s->isAllocatedStream);
67
68
0
  const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
69
0
  *entry = empty;
70
0
}
71
72
static struct s_StreamPoolEntry add_entry(wStream* s)
73
0
{
74
0
  struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
75
76
0
#if defined(WITH_STREAMPOOL_DEBUG)
77
0
  void* stack = winpr_backtrace(20);
78
0
  if (stack)
79
0
    entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
80
0
  winpr_backtrace_free(stack);
81
0
#endif
82
83
0
  entry.s = s;
84
0
  return entry;
85
0
}
86
87
/**
88
 * Lock the stream pool
89
 */
90
91
static inline void StreamPool_Lock(wStreamPool* pool)
92
0
{
93
0
  WINPR_ASSERT(pool);
94
0
  if (pool->synchronized)
95
0
    EnterCriticalSection(&pool->lock);
96
0
}
97
98
/**
99
 * Unlock the stream pool
100
 */
101
102
static inline void StreamPool_Unlock(wStreamPool* pool)
103
0
{
104
0
  WINPR_ASSERT(pool);
105
0
  if (pool->synchronized)
106
0
    LeaveCriticalSection(&pool->lock);
107
0
}
108
109
static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
110
0
{
111
0
  WINPR_ASSERT(pool);
112
113
0
  size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
114
0
  size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
115
0
  struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
116
117
0
  size_t new_cap = 0;
118
0
  if (*cap == 0)
119
0
    new_cap = *size + count;
120
0
  else if (*size + count > *cap)
121
0
    new_cap = (*size + count + 2) / 2 * 3;
122
0
  else if ((*size + count) < *cap / 3)
123
0
    new_cap = *cap / 2;
124
125
0
  if (new_cap > 0)
126
0
  {
127
0
    struct s_StreamPoolEntry* new_arr = nullptr;
128
129
0
    if (*cap < *size + count)
130
0
      *cap += count;
131
132
0
    new_arr =
133
0
        (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
134
0
    if (!new_arr)
135
0
      return FALSE;
136
0
    *cap = new_cap;
137
0
    *array = new_arr;
138
0
  }
139
0
  return TRUE;
140
0
}
141
142
/**
143
 * Methods
144
 */
145
146
static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
147
0
{
148
0
  WINPR_ASSERT(pool);
149
150
0
  const size_t pcount = 1;
151
0
  const size_t off = index + pcount;
152
0
  if (pool->uSize >= off)
153
0
  {
154
0
    for (size_t x = 0; x < pcount; x++)
155
0
    {
156
0
      struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
157
0
      discard_entry(cur, FALSE);
158
0
    }
159
0
    MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
160
0
               (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
161
0
    pool->uSize -= pcount;
162
0
  }
163
0
}
164
165
/**
166
 * Adds a used stream to the pool.
167
 */
168
169
static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
170
0
{
171
0
  StreamPool_EnsureCapacity(pool, 1, TRUE);
172
0
  pool->uArray[pool->uSize] = add_entry(s);
173
0
  pool->uSize++;
174
0
}
175
176
/**
177
 * Removes a used stream from the pool.
178
 */
179
180
static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
181
0
{
182
0
  WINPR_ASSERT(pool);
183
0
  for (size_t index = 0; index < pool->uSize; index++)
184
0
  {
185
0
    struct s_StreamPoolEntry* cur = &pool->uArray[index];
186
0
    if (cur->s == s)
187
0
    {
188
0
      StreamPool_ShiftUsed(pool, index);
189
0
      break;
190
0
    }
191
0
  }
192
0
}
193
194
static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
195
0
{
196
0
  WINPR_ASSERT(pool);
197
198
0
  const size_t pcount = 1;
199
0
  const size_t off = index + pcount;
200
0
  if (pool->aSize >= off)
201
0
  {
202
0
    for (size_t x = 0; x < pcount; x++)
203
0
    {
204
0
      struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
205
0
      discard_entry(cur, FALSE);
206
0
    }
207
208
0
    MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
209
0
               (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
210
0
    pool->aSize -= pcount;
211
0
  }
212
0
}
213
214
/**
215
 * Gets a stream from the pool.
216
 */
217
218
wStream* StreamPool_Take(wStreamPool* pool, size_t size)
219
0
{
220
0
  BOOL found = FALSE;
221
0
  size_t foundIndex = 0;
222
0
  wStream* s = nullptr;
223
224
0
  StreamPool_Lock(pool);
225
226
0
  if (size == 0)
227
0
    size = pool->defaultSize;
228
229
0
  for (size_t index = 0; index < pool->aSize; index++)
230
0
  {
231
0
    struct s_StreamPoolEntry* cur = &pool->aArray[index];
232
0
    s = cur->s;
233
234
0
    if (Stream_Capacity(s) >= size)
235
0
    {
236
0
      found = TRUE;
237
0
      foundIndex = index;
238
0
      break;
239
0
    }
240
0
  }
241
242
0
  if (!found)
243
0
  {
244
0
    s = Stream_New(nullptr, size);
245
0
    if (!s)
246
0
      goto out_fail;
247
0
  }
248
0
  else if (s)
249
0
  {
250
0
    Stream_ResetPosition(s);
251
0
    if (!Stream_SetLength(s, Stream_Capacity(s)))
252
0
      goto out_fail;
253
0
    StreamPool_ShiftAvailable(pool, foundIndex);
254
0
  }
255
256
0
  if (s)
257
0
  {
258
0
    s->pool = pool;
259
0
    s->count = 1;
260
0
    StreamPool_AddUsed(pool, s);
261
0
  }
262
263
0
out_fail:
264
0
  StreamPool_Unlock(pool);
265
266
0
  return s;
267
0
}
268
269
/**
270
 * Returns an object to the pool.
271
 */
272
273
static void StreamPool_Remove(wStreamPool* pool, wStream* s)
274
0
{
275
0
  StreamPool_EnsureCapacity(pool, 1, FALSE);
276
0
  Stream_EnsureValidity(s);
277
0
  for (size_t x = 0; x < pool->aSize; x++)
278
0
  {
279
0
    wStream* cs = pool->aArray[x].s;
280
0
    if (cs == s)
281
0
      return;
282
0
  }
283
0
  pool->aArray[(pool->aSize)++] = add_entry(s);
284
0
  StreamPool_RemoveUsed(pool, s);
285
0
}
286
287
static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
288
0
{
289
0
  StreamPool_Lock(pool);
290
0
  StreamPool_Remove(pool, s);
291
0
  StreamPool_Unlock(pool);
292
0
}
293
294
void StreamPool_Return(wStreamPool* pool, wStream* s)
295
0
{
296
0
  WINPR_ASSERT(pool);
297
0
  if (!s)
298
0
    return;
299
300
0
  StreamPool_Lock(pool);
301
0
  StreamPool_Remove(pool, s);
302
0
  StreamPool_Unlock(pool);
303
0
}
304
305
/**
306
 * Increment stream reference count
307
 */
308
309
void Stream_AddRef(wStream* s)
310
0
{
311
0
  WINPR_ASSERT(s);
312
0
  s->count++;
313
0
}
314
315
/**
316
 * Decrement stream reference count
317
 */
318
319
void Stream_Release(wStream* s)
320
0
{
321
0
  WINPR_ASSERT(s);
322
323
0
  if (s->count > 0)
324
0
    s->count--;
325
0
  if (s->count == 0)
326
0
  {
327
0
    if (s->pool)
328
0
      StreamPool_ReleaseOrReturn(s->pool, s);
329
0
    else
330
0
      Stream_Free(s, TRUE);
331
0
  }
332
0
}
333
334
/**
335
 * Find stream in pool using pointer inside buffer
336
 */
337
338
wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
339
0
{
340
0
  wStream* s = nullptr;
341
342
0
  StreamPool_Lock(pool);
343
344
0
  for (size_t index = 0; index < pool->uSize; index++)
345
0
  {
346
0
    struct s_StreamPoolEntry* cur = &pool->uArray[index];
347
348
0
    if ((ptr >= Stream_Buffer(cur->s)) &&
349
0
        (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
350
0
    {
351
0
      s = cur->s;
352
0
      break;
353
0
    }
354
0
  }
355
356
0
  StreamPool_Unlock(pool);
357
358
0
  return s;
359
0
}
360
361
/**
362
 * Releases the streams currently cached in the pool.
363
 */
364
365
void StreamPool_Clear(wStreamPool* pool)
366
0
{
367
0
  StreamPool_Lock(pool);
368
369
0
  for (size_t x = 0; x < pool->aSize; x++)
370
0
  {
371
0
    struct s_StreamPoolEntry* cur = &pool->aArray[x];
372
0
    discard_entry(cur, TRUE);
373
0
  }
374
0
  pool->aSize = 0;
375
376
0
  if (pool->uSize > 0)
377
0
  {
378
0
    WLog_Print(pool->log, WLOG_WARN,
379
0
               "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
380
0
               pool->uSize);
381
0
    for (size_t x = 0; x < pool->uSize; x++)
382
0
    {
383
0
      struct s_StreamPoolEntry* cur = &pool->uArray[x];
384
0
      discard_entry(cur, TRUE);
385
0
    }
386
0
    pool->uSize = 0;
387
0
  }
388
389
0
  StreamPool_Unlock(pool);
390
0
}
391
392
size_t StreamPool_UsedCount(wStreamPool* pool)
393
0
{
394
0
  StreamPool_Lock(pool);
395
0
  size_t usize = pool->uSize;
396
0
  StreamPool_Unlock(pool);
397
0
  return usize;
398
0
}
399
400
/**
401
 * Construction, Destruction
402
 */
403
404
wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
405
0
{
406
0
  wStreamPool* pool = calloc(1, sizeof(wStreamPool));
407
408
0
  if (!pool)
409
0
    return nullptr;
410
411
0
  pool->log = WLog_Create(XTAG, WLog_GetRoot());
412
0
  if (!pool->log)
413
0
    goto fail;
414
415
0
  pool->synchronized = synchronized;
416
0
  pool->defaultSize = defaultSize;
417
418
0
  if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
419
0
    goto fail;
420
0
  if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
421
0
    goto fail;
422
423
0
  if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
424
0
    goto fail;
425
426
0
  return pool;
427
0
fail:
428
0
  WINPR_PRAGMA_DIAG_PUSH
429
0
  WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
430
0
  StreamPool_Free(pool);
431
0
  WINPR_PRAGMA_DIAG_POP
432
0
  return nullptr;
433
0
}
434
435
void StreamPool_Free(wStreamPool* pool)
436
0
{
437
0
  if (!pool)
438
0
    return;
439
440
0
  StreamPool_Clear(pool);
441
442
0
  DeleteCriticalSection(&pool->lock);
443
444
0
  free(pool->aArray);
445
0
  free(pool->uArray);
446
447
0
  WLog_Discard(pool->log);
448
0
  free(pool);
449
0
}
450
451
char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
452
0
{
453
0
  WINPR_ASSERT(pool);
454
455
0
  if (!buffer || (size < 1))
456
0
    return nullptr;
457
458
0
  size_t used = 0;
459
0
  int offset = _snprintf(buffer, size - 1,
460
0
                         "aSize    =%" PRIuz ", uSize    =%" PRIuz ", aCapacity=%" PRIuz
461
0
                         ", uCapacity=%" PRIuz,
462
0
                         pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
463
0
  if ((offset > 0) && ((size_t)offset < size))
464
0
    used += (size_t)offset;
465
466
0
#if defined(WITH_STREAMPOOL_DEBUG)
467
0
  StreamPool_Lock(pool);
468
469
0
  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
470
0
  if ((offset > 0) && ((size_t)offset < size - used))
471
0
    used += (size_t)offset;
472
0
  for (size_t x = 0; x < pool->uSize; x++)
473
0
  {
474
0
    const struct s_StreamPoolEntry* cur = &pool->uArray[x];
475
0
    WINPR_ASSERT(cur->msg || (cur->lines == 0));
476
477
0
    for (size_t y = 0; y < cur->lines; y++)
478
0
    {
479
0
      offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
480
0
                         y, cur->msg[y]);
481
0
      if ((offset > 0) && ((size_t)offset < size - used))
482
0
        used += (size_t)offset;
483
0
    }
484
0
  }
485
486
0
  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
487
0
  if ((offset > 0) && ((size_t)offset < size - used))
488
0
    used += (size_t)offset;
489
490
0
  struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
491
0
  void* stack = winpr_backtrace(20);
492
0
  if (stack)
493
0
    entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
494
0
  winpr_backtrace_free(stack);
495
496
0
  for (size_t x = 0; x < entry.lines; x++)
497
0
  {
498
0
    const char* msg = entry.msg[x];
499
0
    offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
500
0
    if ((offset > 0) && ((size_t)offset < size - used))
501
0
      used += (size_t)offset;
502
0
  }
503
0
  free((void*)entry.msg);
504
0
  StreamPool_Unlock(pool);
505
0
#endif
506
0
  buffer[used] = '\0';
507
0
  return buffer;
508
0
}
509
510
BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
511
0
{
512
  /* HACK: We disconnected the transport above, now wait without a read or write lock until all
513
   * streams in use have been returned to the pool. */
514
0
  while (timeoutMS > 0)
515
0
  {
516
0
    const size_t used = StreamPool_UsedCount(pool);
517
0
    if (used == 0)
518
0
      return TRUE;
519
0
    WLog_Print(pool->log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
520
521
0
    char buffer[4096] = WINPR_C_ARRAY_INIT;
522
0
    StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
523
0
    WLog_Print(pool->log, WLOG_TRACE, "Pool statistics: %s", buffer);
524
525
0
    UINT32 diff = 10;
526
0
    if (timeoutMS != INFINITE)
527
0
    {
528
0
      diff = timeoutMS > 10 ? 10 : timeoutMS;
529
0
      timeoutMS -= diff;
530
0
    }
531
0
    Sleep(diff);
532
0
  }
533
534
0
  return FALSE;
535
0
}