Coverage Report

Created: 2025-07-01 06:46

/src/FreeRDP/winpr/libwinpr/utils/collections/StreamPool.c
Line
Count
Source (jump to first uncovered line)
1
/**
2
 * WinPR: Windows Portable Runtime
3
 * Object Pool
4
 *
5
 * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
6
 *
7
 * Licensed under the Apache License, Version 2.0 (the "License");
8
 * you may not use this file except in compliance with the License.
9
 * You may obtain a copy of the License at
10
 *
11
 *     http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing, software
14
 * distributed under the License is distributed on an "AS IS" BASIS,
15
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 * See the License for the specific language governing permissions and
17
 * limitations under the License.
18
 */
19
20
#include <winpr/config.h>
21
22
#include <winpr/crt.h>
23
#include <winpr/wlog.h>
24
25
#include <winpr/collections.h>
26
27
#include "../stream.h"
28
#include "../log.h"
29
0
#define TAG WINPR_TAG("utils.streampool")
30
31
struct s_StreamPoolEntry
32
{
33
#if defined(WITH_STREAMPOOL_DEBUG)
34
  char** msg;
35
  size_t lines;
36
#endif
37
  wStream* s;
38
};
39
40
struct s_wStreamPool
41
{
42
  size_t aSize;
43
  size_t aCapacity;
44
  struct s_StreamPoolEntry* aArray;
45
46
  size_t uSize;
47
  size_t uCapacity;
48
  struct s_StreamPoolEntry* uArray;
49
50
  CRITICAL_SECTION lock;
51
  BOOL synchronized;
52
  size_t defaultSize;
53
};
54
55
static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
56
0
{
57
0
  if (!entry)
58
0
    return;
59
60
0
#if defined(WITH_STREAMPOOL_DEBUG)
61
0
  free((void*)entry->msg);
62
0
#endif
63
64
0
  if (discardStream && entry->s)
65
0
    Stream_Free(entry->s, entry->s->isAllocatedStream);
66
67
0
  const struct s_StreamPoolEntry empty = { 0 };
68
0
  *entry = empty;
69
0
}
70
71
static struct s_StreamPoolEntry add_entry(wStream* s)
72
0
{
73
0
  struct s_StreamPoolEntry entry = { 0 };
74
75
0
#if defined(WITH_STREAMPOOL_DEBUG)
76
0
  void* stack = winpr_backtrace(20);
77
0
  if (stack)
78
0
    entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79
0
  winpr_backtrace_free(stack);
80
0
#endif
81
82
0
  entry.s = s;
83
0
  return entry;
84
0
}
85
86
/**
87
 * Lock the stream pool
88
 */
89
90
static INLINE void StreamPool_Lock(wStreamPool* pool)
91
0
{
92
0
  WINPR_ASSERT(pool);
93
0
  if (pool->synchronized)
94
0
    EnterCriticalSection(&pool->lock);
95
0
}
96
97
/**
98
 * Unlock the stream pool
99
 */
100
101
static INLINE void StreamPool_Unlock(wStreamPool* pool)
102
0
{
103
0
  WINPR_ASSERT(pool);
104
0
  if (pool->synchronized)
105
0
    LeaveCriticalSection(&pool->lock);
106
0
}
107
108
static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
109
0
{
110
0
  WINPR_ASSERT(pool);
111
112
0
  size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113
0
  size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114
0
  struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
115
116
0
  size_t new_cap = 0;
117
0
  if (*cap == 0)
118
0
    new_cap = *size + count;
119
0
  else if (*size + count > *cap)
120
0
    new_cap = (*size + count + 2) / 2 * 3;
121
0
  else if ((*size + count) < *cap / 3)
122
0
    new_cap = *cap / 2;
123
124
0
  if (new_cap > 0)
125
0
  {
126
0
    struct s_StreamPoolEntry* new_arr = NULL;
127
128
0
    if (*cap < *size + count)
129
0
      *cap += count;
130
131
0
    new_arr =
132
0
        (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
133
0
    if (!new_arr)
134
0
      return FALSE;
135
0
    *cap = new_cap;
136
0
    *array = new_arr;
137
0
  }
138
0
  return TRUE;
139
0
}
140
141
/**
142
 * Methods
143
 */
144
145
static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
146
0
{
147
0
  WINPR_ASSERT(pool);
148
149
0
  const size_t pcount = 1;
150
0
  const size_t off = index + pcount;
151
0
  if (pool->uSize >= off)
152
0
  {
153
0
    for (size_t x = 0; x < pcount; x++)
154
0
    {
155
0
      struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156
0
      discard_entry(cur, FALSE);
157
0
    }
158
0
    MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159
0
               (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
160
0
    pool->uSize -= pcount;
161
0
  }
162
0
}
163
164
/**
165
 * Adds a used stream to the pool.
166
 */
167
168
static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
169
0
{
170
0
  StreamPool_EnsureCapacity(pool, 1, TRUE);
171
0
  pool->uArray[pool->uSize] = add_entry(s);
172
0
  pool->uSize++;
173
0
}
174
175
/**
176
 * Removes a used stream from the pool.
177
 */
178
179
static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
180
0
{
181
0
  WINPR_ASSERT(pool);
182
0
  for (size_t index = 0; index < pool->uSize; index++)
183
0
  {
184
0
    struct s_StreamPoolEntry* cur = &pool->uArray[index];
185
0
    if (cur->s == s)
186
0
    {
187
0
      StreamPool_ShiftUsed(pool, index);
188
0
      break;
189
0
    }
190
0
  }
191
0
}
192
193
static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
194
0
{
195
0
  WINPR_ASSERT(pool);
196
197
0
  const size_t pcount = 1;
198
0
  const size_t off = index + pcount;
199
0
  if (pool->aSize >= off)
200
0
  {
201
0
    for (size_t x = 0; x < pcount; x++)
202
0
    {
203
0
      struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204
0
      discard_entry(cur, FALSE);
205
0
    }
206
207
0
    MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208
0
               (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
209
0
    pool->aSize -= pcount;
210
0
  }
211
0
}
212
213
/**
214
 * Gets a stream from the pool.
215
 */
216
217
wStream* StreamPool_Take(wStreamPool* pool, size_t size)
218
0
{
219
0
  BOOL found = FALSE;
220
0
  size_t foundIndex = 0;
221
0
  wStream* s = NULL;
222
223
0
  StreamPool_Lock(pool);
224
225
0
  if (size == 0)
226
0
    size = pool->defaultSize;
227
228
0
  for (size_t index = 0; index < pool->aSize; index++)
229
0
  {
230
0
    struct s_StreamPoolEntry* cur = &pool->aArray[index];
231
0
    s = cur->s;
232
233
0
    if (Stream_Capacity(s) >= size)
234
0
    {
235
0
      found = TRUE;
236
0
      foundIndex = index;
237
0
      break;
238
0
    }
239
0
  }
240
241
0
  if (!found)
242
0
  {
243
0
    s = Stream_New(NULL, size);
244
0
    if (!s)
245
0
      goto out_fail;
246
0
  }
247
0
  else if (s)
248
0
  {
249
0
    Stream_SetPosition(s, 0);
250
0
    Stream_SetLength(s, Stream_Capacity(s));
251
0
    StreamPool_ShiftAvailable(pool, foundIndex);
252
0
  }
253
254
0
  if (s)
255
0
  {
256
0
    s->pool = pool;
257
0
    s->count = 1;
258
0
    StreamPool_AddUsed(pool, s);
259
0
  }
260
261
0
out_fail:
262
0
  StreamPool_Unlock(pool);
263
264
0
  return s;
265
0
}
266
267
/**
268
 * Returns an object to the pool.
269
 */
270
271
static void StreamPool_Remove(wStreamPool* pool, wStream* s)
272
0
{
273
0
  StreamPool_EnsureCapacity(pool, 1, FALSE);
274
0
  Stream_EnsureValidity(s);
275
0
  for (size_t x = 0; x < pool->aSize; x++)
276
0
  {
277
0
    wStream* cs = pool->aArray[x].s;
278
0
    if (cs == s)
279
0
      return;
280
0
  }
281
0
  pool->aArray[(pool->aSize)++] = add_entry(s);
282
0
  StreamPool_RemoveUsed(pool, s);
283
0
}
284
285
static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
286
0
{
287
0
  StreamPool_Lock(pool);
288
0
  StreamPool_Remove(pool, s);
289
0
  StreamPool_Unlock(pool);
290
0
}
291
292
void StreamPool_Return(wStreamPool* pool, wStream* s)
293
0
{
294
0
  WINPR_ASSERT(pool);
295
0
  if (!s)
296
0
    return;
297
298
0
  StreamPool_Lock(pool);
299
0
  StreamPool_Remove(pool, s);
300
0
  StreamPool_Unlock(pool);
301
0
}
302
303
/**
304
 * Increment stream reference count
305
 */
306
307
void Stream_AddRef(wStream* s)
308
0
{
309
0
  WINPR_ASSERT(s);
310
0
  s->count++;
311
0
}
312
313
/**
314
 * Decrement stream reference count
315
 */
316
317
void Stream_Release(wStream* s)
318
0
{
319
0
  WINPR_ASSERT(s);
320
321
0
  if (s->count > 0)
322
0
    s->count--;
323
0
  if (s->count == 0)
324
0
  {
325
0
    if (s->pool)
326
0
      StreamPool_ReleaseOrReturn(s->pool, s);
327
0
    else
328
0
      Stream_Free(s, TRUE);
329
0
  }
330
0
}
331
332
/**
333
 * Find stream in pool using pointer inside buffer
334
 */
335
336
wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
337
0
{
338
0
  wStream* s = NULL;
339
340
0
  StreamPool_Lock(pool);
341
342
0
  for (size_t index = 0; index < pool->uSize; index++)
343
0
  {
344
0
    struct s_StreamPoolEntry* cur = &pool->uArray[index];
345
346
0
    if ((ptr >= Stream_Buffer(cur->s)) &&
347
0
        (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
348
0
    {
349
0
      s = cur->s;
350
0
      break;
351
0
    }
352
0
  }
353
354
0
  StreamPool_Unlock(pool);
355
356
0
  return s;
357
0
}
358
359
/**
360
 * Releases the streams currently cached in the pool.
361
 */
362
363
void StreamPool_Clear(wStreamPool* pool)
364
0
{
365
0
  StreamPool_Lock(pool);
366
367
0
  for (size_t x = 0; x < pool->aSize; x++)
368
0
  {
369
0
    struct s_StreamPoolEntry* cur = &pool->aArray[x];
370
0
    discard_entry(cur, TRUE);
371
0
  }
372
0
  pool->aSize = 0;
373
374
0
  if (pool->uSize > 0)
375
0
  {
376
0
    WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
377
0
              pool->uSize);
378
0
    for (size_t x = 0; x < pool->uSize; x++)
379
0
    {
380
0
      struct s_StreamPoolEntry* cur = &pool->uArray[x];
381
0
      discard_entry(cur, TRUE);
382
0
    }
383
0
    pool->uSize = 0;
384
0
  }
385
386
0
  StreamPool_Unlock(pool);
387
0
}
388
389
size_t StreamPool_UsedCount(wStreamPool* pool)
390
0
{
391
0
  StreamPool_Lock(pool);
392
0
  size_t usize = pool->uSize;
393
0
  StreamPool_Unlock(pool);
394
0
  return usize;
395
0
}
396
397
/**
398
 * Construction, Destruction
399
 */
400
401
wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
402
0
{
403
0
  wStreamPool* pool = NULL;
404
405
0
  pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
406
407
0
  if (pool)
408
0
  {
409
0
    pool->synchronized = synchronized;
410
0
    pool->defaultSize = defaultSize;
411
412
0
    if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
413
0
      goto fail;
414
0
    if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
415
0
      goto fail;
416
417
0
    InitializeCriticalSectionAndSpinCount(&pool->lock, 4000);
418
0
  }
419
420
0
  return pool;
421
0
fail:
422
0
  WINPR_PRAGMA_DIAG_PUSH
423
0
  WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
424
0
  StreamPool_Free(pool);
425
0
  WINPR_PRAGMA_DIAG_POP
426
0
  return NULL;
427
0
}
428
429
void StreamPool_Free(wStreamPool* pool)
430
0
{
431
0
  if (pool)
432
0
  {
433
0
    StreamPool_Clear(pool);
434
435
0
    DeleteCriticalSection(&pool->lock);
436
437
0
    free(pool->aArray);
438
0
    free(pool->uArray);
439
440
0
    free(pool);
441
0
  }
442
0
}
443
444
char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
445
0
{
446
0
  WINPR_ASSERT(pool);
447
448
0
  if (!buffer || (size < 1))
449
0
    return NULL;
450
451
0
  size_t used = 0;
452
0
  int offset = _snprintf(buffer, size - 1,
453
0
                         "aSize    =%" PRIuz ", uSize    =%" PRIuz ", aCapacity=%" PRIuz
454
0
                         ", uCapacity=%" PRIuz,
455
0
                         pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
456
0
  if ((offset > 0) && ((size_t)offset < size))
457
0
    used += (size_t)offset;
458
459
0
#if defined(WITH_STREAMPOOL_DEBUG)
460
0
  StreamPool_Lock(pool);
461
462
0
  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
463
0
  if ((offset > 0) && ((size_t)offset < size - used))
464
0
    used += (size_t)offset;
465
0
  for (size_t x = 0; x < pool->uSize; x++)
466
0
  {
467
0
    const struct s_StreamPoolEntry* cur = &pool->uArray[x];
468
0
    WINPR_ASSERT(cur->msg || (cur->lines == 0));
469
470
0
    for (size_t y = 0; y < cur->lines; y++)
471
0
    {
472
0
      offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
473
0
                         y, cur->msg[y]);
474
0
      if ((offset > 0) && ((size_t)offset < size - used))
475
0
        used += (size_t)offset;
476
0
    }
477
0
  }
478
479
0
  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
480
0
  if ((offset > 0) && ((size_t)offset < size - used))
481
0
    used += (size_t)offset;
482
483
0
  struct s_StreamPoolEntry entry = { 0 };
484
0
  void* stack = winpr_backtrace(20);
485
0
  if (stack)
486
0
    entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
487
0
  winpr_backtrace_free(stack);
488
489
0
  for (size_t x = 0; x < entry.lines; x++)
490
0
  {
491
0
    const char* msg = entry.msg[x];
492
0
    offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
493
0
    if ((offset > 0) && ((size_t)offset < size - used))
494
0
      used += (size_t)offset;
495
0
  }
496
0
  free((void*)entry.msg);
497
0
  StreamPool_Unlock(pool);
498
0
#endif
499
0
  buffer[used] = '\0';
500
0
  return buffer;
501
0
}
502
503
BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
504
0
{
505
0
  wLog* log = WLog_Get(TAG);
506
507
  /* HACK: We disconnected the transport above, now wait without a read or write lock until all
508
   * streams in use have been returned to the pool. */
509
0
  while (timeoutMS > 0)
510
0
  {
511
0
    const size_t used = StreamPool_UsedCount(pool);
512
0
    if (used == 0)
513
0
      return TRUE;
514
0
    WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
515
516
0
    char buffer[4096] = { 0 };
517
0
    StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
518
0
    WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
519
520
0
    UINT32 diff = 10;
521
0
    if (timeoutMS != INFINITE)
522
0
    {
523
0
      diff = timeoutMS > 10 ? 10 : timeoutMS;
524
0
      timeoutMS -= diff;
525
0
    }
526
0
    Sleep(diff);
527
0
  }
528
529
0
  return FALSE;
530
0
}