Coverage Report

Created: 2026-03-04 06:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/FreeRDP/winpr/libwinpr/utils/collections/StreamPool.c
Line
Count
Source
1
/**
2
 * WinPR: Windows Portable Runtime
3
 * Object Pool
4
 *
5
 * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
6
 *
7
 * Licensed under the Apache License, Version 2.0 (the "License");
8
 * you may not use this file except in compliance with the License.
9
 * You may obtain a copy of the License at
10
 *
11
 *     http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing, software
14
 * distributed under the License is distributed on an "AS IS" BASIS,
15
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 * See the License for the specific language governing permissions and
17
 * limitations under the License.
18
 */
19
20
#include <winpr/config.h>
21
22
#include <winpr/crt.h>
23
#include <winpr/wlog.h>
24
25
#include <winpr/collections.h>
26
27
#include "../stream.h"
28
#include "../log.h"
29
15.4k
#define TAG WINPR_TAG("utils.streampool")
30
31
struct s_StreamPoolEntry
32
{
33
#if defined(WITH_STREAMPOOL_DEBUG)
34
  char** msg;
35
  size_t lines;
36
#endif
37
  wStream* s;
38
};
39
40
struct s_wStreamPool
41
{
42
  size_t aSize;
43
  size_t aCapacity;
44
  struct s_StreamPoolEntry* aArray;
45
46
  size_t uSize;
47
  size_t uCapacity;
48
  struct s_StreamPoolEntry* uArray;
49
50
  CRITICAL_SECTION lock;
51
  BOOL synchronized;
52
  size_t defaultSize;
53
};
54
55
static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
56
30.8k
{
57
30.8k
  if (!entry)
58
0
    return;
59
60
30.8k
#if defined(WITH_STREAMPOOL_DEBUG)
61
30.8k
  free((void*)entry->msg);
62
30.8k
#endif
63
64
30.8k
  if (discardStream && entry->s)
65
15.4k
    Stream_Free(entry->s, entry->s->isAllocatedStream);
66
67
30.8k
  const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
68
30.8k
  *entry = empty;
69
30.8k
}
70
71
static struct s_StreamPoolEntry add_entry(wStream* s)
72
30.8k
{
73
30.8k
  struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
74
75
30.8k
#if defined(WITH_STREAMPOOL_DEBUG)
76
30.8k
  void* stack = winpr_backtrace(20);
77
30.8k
  if (stack)
78
30.8k
    entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79
30.8k
  winpr_backtrace_free(stack);
80
30.8k
#endif
81
82
30.8k
  entry.s = s;
83
30.8k
  return entry;
84
30.8k
}
85
86
/**
87
 * Lock the stream pool
88
 */
89
90
static inline void StreamPool_Lock(wStreamPool* pool)
91
61.7k
{
92
61.7k
  WINPR_ASSERT(pool);
93
61.7k
  if (pool->synchronized)
94
61.7k
    EnterCriticalSection(&pool->lock);
95
61.7k
}
96
97
/**
98
 * Unlock the stream pool
99
 */
100
101
static inline void StreamPool_Unlock(wStreamPool* pool)
102
61.7k
{
103
61.7k
  WINPR_ASSERT(pool);
104
61.7k
  if (pool->synchronized)
105
61.7k
    LeaveCriticalSection(&pool->lock);
106
61.7k
}
107
108
static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
109
61.7k
{
110
61.7k
  WINPR_ASSERT(pool);
111
112
61.7k
  size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113
61.7k
  size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114
61.7k
  struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
115
116
61.7k
  size_t new_cap = 0;
117
61.7k
  if (*cap == 0)
118
30.8k
    new_cap = *size + count;
119
30.8k
  else if (*size + count > *cap)
120
0
    new_cap = (*size + count + 2) / 2 * 3;
121
30.8k
  else if ((*size + count) < *cap / 3)
122
30.8k
    new_cap = *cap / 2;
123
124
61.7k
  if (new_cap > 0)
125
61.7k
  {
126
61.7k
    struct s_StreamPoolEntry* new_arr = nullptr;
127
128
61.7k
    if (*cap < *size + count)
129
30.8k
      *cap += count;
130
131
61.7k
    new_arr =
132
61.7k
        (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
133
61.7k
    if (!new_arr)
134
0
      return FALSE;
135
61.7k
    *cap = new_cap;
136
61.7k
    *array = new_arr;
137
61.7k
  }
138
61.7k
  return TRUE;
139
61.7k
}
140
141
/**
142
 * Methods
143
 */
144
145
static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
146
15.4k
{
147
15.4k
  WINPR_ASSERT(pool);
148
149
15.4k
  const size_t pcount = 1;
150
15.4k
  const size_t off = index + pcount;
151
15.4k
  if (pool->uSize >= off)
152
15.4k
  {
153
30.8k
    for (size_t x = 0; x < pcount; x++)
154
15.4k
    {
155
15.4k
      struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156
15.4k
      discard_entry(cur, FALSE);
157
15.4k
    }
158
15.4k
    MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159
15.4k
               (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
160
15.4k
    pool->uSize -= pcount;
161
15.4k
  }
162
15.4k
}
163
164
/**
165
 * Adds a used stream to the pool.
166
 */
167
168
static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
169
15.4k
{
170
15.4k
  StreamPool_EnsureCapacity(pool, 1, TRUE);
171
15.4k
  pool->uArray[pool->uSize] = add_entry(s);
172
15.4k
  pool->uSize++;
173
15.4k
}
174
175
/**
176
 * Removes a used stream from the pool.
177
 */
178
179
static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
180
15.4k
{
181
15.4k
  WINPR_ASSERT(pool);
182
15.4k
  for (size_t index = 0; index < pool->uSize; index++)
183
15.4k
  {
184
15.4k
    struct s_StreamPoolEntry* cur = &pool->uArray[index];
185
15.4k
    if (cur->s == s)
186
15.4k
    {
187
15.4k
      StreamPool_ShiftUsed(pool, index);
188
15.4k
      break;
189
15.4k
    }
190
15.4k
  }
191
15.4k
}
192
193
static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
194
0
{
195
0
  WINPR_ASSERT(pool);
196
197
0
  const size_t pcount = 1;
198
0
  const size_t off = index + pcount;
199
0
  if (pool->aSize >= off)
200
0
  {
201
0
    for (size_t x = 0; x < pcount; x++)
202
0
    {
203
0
      struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204
0
      discard_entry(cur, FALSE);
205
0
    }
206
207
0
    MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208
0
               (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
209
0
    pool->aSize -= pcount;
210
0
  }
211
0
}
212
213
/**
214
 * Gets a stream from the pool.
215
 */
216
217
wStream* StreamPool_Take(wStreamPool* pool, size_t size)
218
15.4k
{
219
15.4k
  BOOL found = FALSE;
220
15.4k
  size_t foundIndex = 0;
221
15.4k
  wStream* s = nullptr;
222
223
15.4k
  StreamPool_Lock(pool);
224
225
15.4k
  if (size == 0)
226
15.4k
    size = pool->defaultSize;
227
228
15.4k
  for (size_t index = 0; index < pool->aSize; index++)
229
0
  {
230
0
    struct s_StreamPoolEntry* cur = &pool->aArray[index];
231
0
    s = cur->s;
232
233
0
    if (Stream_Capacity(s) >= size)
234
0
    {
235
0
      found = TRUE;
236
0
      foundIndex = index;
237
0
      break;
238
0
    }
239
0
  }
240
241
15.4k
  if (!found)
242
15.4k
  {
243
15.4k
    s = Stream_New(nullptr, size);
244
15.4k
    if (!s)
245
0
      goto out_fail;
246
15.4k
  }
247
0
  else if (s)
248
0
  {
249
0
    Stream_ResetPosition(s);
250
0
    if (!Stream_SetLength(s, Stream_Capacity(s)))
251
0
      goto out_fail;
252
0
    StreamPool_ShiftAvailable(pool, foundIndex);
253
0
  }
254
255
15.4k
  if (s)
256
15.4k
  {
257
15.4k
    s->pool = pool;
258
15.4k
    s->count = 1;
259
15.4k
    StreamPool_AddUsed(pool, s);
260
15.4k
  }
261
262
15.4k
out_fail:
263
15.4k
  StreamPool_Unlock(pool);
264
265
15.4k
  return s;
266
15.4k
}
267
268
/**
269
 * Returns an object to the pool.
270
 */
271
272
static void StreamPool_Remove(wStreamPool* pool, wStream* s)
273
15.4k
{
274
15.4k
  StreamPool_EnsureCapacity(pool, 1, FALSE);
275
15.4k
  Stream_EnsureValidity(s);
276
15.4k
  for (size_t x = 0; x < pool->aSize; x++)
277
0
  {
278
0
    wStream* cs = pool->aArray[x].s;
279
0
    if (cs == s)
280
0
      return;
281
0
  }
282
15.4k
  pool->aArray[(pool->aSize)++] = add_entry(s);
283
15.4k
  StreamPool_RemoveUsed(pool, s);
284
15.4k
}
285
286
static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
287
15.4k
{
288
15.4k
  StreamPool_Lock(pool);
289
15.4k
  StreamPool_Remove(pool, s);
290
15.4k
  StreamPool_Unlock(pool);
291
15.4k
}
292
293
void StreamPool_Return(wStreamPool* pool, wStream* s)
294
0
{
295
0
  WINPR_ASSERT(pool);
296
0
  if (!s)
297
0
    return;
298
299
0
  StreamPool_Lock(pool);
300
0
  StreamPool_Remove(pool, s);
301
0
  StreamPool_Unlock(pool);
302
0
}
303
304
/**
305
 * Increment stream reference count
306
 */
307
308
void Stream_AddRef(wStream* s)
309
123
{
310
123
  WINPR_ASSERT(s);
311
123
  s->count++;
312
123
}
313
314
/**
315
 * Decrement stream reference count
316
 */
317
318
void Stream_Release(wStream* s)
319
20.4k
{
320
20.4k
  WINPR_ASSERT(s);
321
322
20.4k
  if (s->count > 0)
323
20.4k
    s->count--;
324
20.4k
  if (s->count == 0)
325
20.3k
  {
326
20.3k
    if (s->pool)
327
15.4k
      StreamPool_ReleaseOrReturn(s->pool, s);
328
4.94k
    else
329
4.94k
      Stream_Free(s, TRUE);
330
20.3k
  }
331
20.4k
}
332
333
/**
334
 * Find stream in pool using pointer inside buffer
335
 */
336
337
wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
338
0
{
339
0
  wStream* s = nullptr;
340
341
0
  StreamPool_Lock(pool);
342
343
0
  for (size_t index = 0; index < pool->uSize; index++)
344
0
  {
345
0
    struct s_StreamPoolEntry* cur = &pool->uArray[index];
346
347
0
    if ((ptr >= Stream_Buffer(cur->s)) &&
348
0
        (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
349
0
    {
350
0
      s = cur->s;
351
0
      break;
352
0
    }
353
0
  }
354
355
0
  StreamPool_Unlock(pool);
356
357
0
  return s;
358
0
}
359
360
/**
361
 * Releases the streams currently cached in the pool.
362
 */
363
364
void StreamPool_Clear(wStreamPool* pool)
365
15.4k
{
366
15.4k
  StreamPool_Lock(pool);
367
368
30.8k
  for (size_t x = 0; x < pool->aSize; x++)
369
15.4k
  {
370
15.4k
    struct s_StreamPoolEntry* cur = &pool->aArray[x];
371
15.4k
    discard_entry(cur, TRUE);
372
15.4k
  }
373
15.4k
  pool->aSize = 0;
374
375
15.4k
  if (pool->uSize > 0)
376
0
  {
377
0
    WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
378
0
              pool->uSize);
379
0
    for (size_t x = 0; x < pool->uSize; x++)
380
0
    {
381
0
      struct s_StreamPoolEntry* cur = &pool->uArray[x];
382
0
      discard_entry(cur, TRUE);
383
0
    }
384
0
    pool->uSize = 0;
385
0
  }
386
387
15.4k
  StreamPool_Unlock(pool);
388
15.4k
}
389
390
size_t StreamPool_UsedCount(wStreamPool* pool)
391
15.4k
{
392
15.4k
  StreamPool_Lock(pool);
393
15.4k
  size_t usize = pool->uSize;
394
15.4k
  StreamPool_Unlock(pool);
395
15.4k
  return usize;
396
15.4k
}
397
398
/**
399
 * Construction, Destruction
400
 */
401
402
wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
403
15.4k
{
404
15.4k
  wStreamPool* pool = nullptr;
405
406
15.4k
  pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
407
408
15.4k
  if (pool)
409
15.4k
  {
410
15.4k
    pool->synchronized = synchronized;
411
15.4k
    pool->defaultSize = defaultSize;
412
413
15.4k
    if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
414
0
      goto fail;
415
15.4k
    if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
416
0
      goto fail;
417
418
15.4k
    if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
419
0
      goto fail;
420
15.4k
  }
421
422
15.4k
  return pool;
423
0
fail:
424
0
  WINPR_PRAGMA_DIAG_PUSH
425
0
  WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
426
0
  StreamPool_Free(pool);
427
0
  WINPR_PRAGMA_DIAG_POP
428
0
  return nullptr;
429
15.4k
}
430
431
void StreamPool_Free(wStreamPool* pool)
432
15.4k
{
433
15.4k
  if (pool)
434
15.4k
  {
435
15.4k
    StreamPool_Clear(pool);
436
437
15.4k
    DeleteCriticalSection(&pool->lock);
438
439
15.4k
    free(pool->aArray);
440
15.4k
    free(pool->uArray);
441
442
15.4k
    free(pool);
443
15.4k
  }
444
15.4k
}
445
446
char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
447
0
{
448
0
  WINPR_ASSERT(pool);
449
450
0
  if (!buffer || (size < 1))
451
0
    return nullptr;
452
453
0
  size_t used = 0;
454
0
  int offset = _snprintf(buffer, size - 1,
455
0
                         "aSize    =%" PRIuz ", uSize    =%" PRIuz ", aCapacity=%" PRIuz
456
0
                         ", uCapacity=%" PRIuz,
457
0
                         pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
458
0
  if ((offset > 0) && ((size_t)offset < size))
459
0
    used += (size_t)offset;
460
461
0
#if defined(WITH_STREAMPOOL_DEBUG)
462
0
  StreamPool_Lock(pool);
463
464
0
  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
465
0
  if ((offset > 0) && ((size_t)offset < size - used))
466
0
    used += (size_t)offset;
467
0
  for (size_t x = 0; x < pool->uSize; x++)
468
0
  {
469
0
    const struct s_StreamPoolEntry* cur = &pool->uArray[x];
470
0
    WINPR_ASSERT(cur->msg || (cur->lines == 0));
471
472
0
    for (size_t y = 0; y < cur->lines; y++)
473
0
    {
474
0
      offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
475
0
                         y, cur->msg[y]);
476
0
      if ((offset > 0) && ((size_t)offset < size - used))
477
0
        used += (size_t)offset;
478
0
    }
479
0
  }
480
481
0
  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
482
0
  if ((offset > 0) && ((size_t)offset < size - used))
483
0
    used += (size_t)offset;
484
485
0
  struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
486
0
  void* stack = winpr_backtrace(20);
487
0
  if (stack)
488
0
    entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
489
0
  winpr_backtrace_free(stack);
490
491
0
  for (size_t x = 0; x < entry.lines; x++)
492
0
  {
493
0
    const char* msg = entry.msg[x];
494
0
    offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
495
0
    if ((offset > 0) && ((size_t)offset < size - used))
496
0
      used += (size_t)offset;
497
0
  }
498
0
  free((void*)entry.msg);
499
0
  StreamPool_Unlock(pool);
500
0
#endif
501
0
  buffer[used] = '\0';
502
0
  return buffer;
503
0
}
504
505
BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
506
15.4k
{
507
15.4k
  wLog* log = WLog_Get(TAG);
508
509
  /* HACK: We disconnected the transport above, now wait without a read or write lock until all
510
   * streams in use have been returned to the pool. */
511
15.4k
  while (timeoutMS > 0)
512
15.4k
  {
513
15.4k
    const size_t used = StreamPool_UsedCount(pool);
514
15.4k
    if (used == 0)
515
15.4k
      return TRUE;
516
0
    WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
517
518
0
    char buffer[4096] = WINPR_C_ARRAY_INIT;
519
0
    StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
520
0
    WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
521
522
0
    UINT32 diff = 10;
523
0
    if (timeoutMS != INFINITE)
524
0
    {
525
0
      diff = timeoutMS > 10 ? 10 : timeoutMS;
526
0
      timeoutMS -= diff;
527
0
    }
528
0
    Sleep(diff);
529
0
  }
530
531
0
  return FALSE;
532
15.4k
}