Coverage Report

Created: 2024-07-27 06:31

/src/libwebp/src/utils/thread_utils.c
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2011 Google Inc. All Rights Reserved.
2
//
3
// Use of this source code is governed by a BSD-style license
4
// that can be found in the COPYING file in the root of the source
5
// tree. An additional intellectual property rights grant can be found
6
// in the file PATENTS. All contributing project authors may
7
// be found in the AUTHORS file in the root of the source tree.
8
// -----------------------------------------------------------------------------
9
//
10
// Multi-threaded worker
11
//
12
// Author: Skal (pascal.massimino@gmail.com)
13
14
#include <assert.h>
15
#include <string.h>   // for memset()
16
#include "src/utils/thread_utils.h"
17
#include "src/utils/utils.h"
18
19
#ifdef WEBP_USE_THREAD
20
21
#if defined(_WIN32)
22
23
#include <windows.h>
24
typedef HANDLE pthread_t;
25
typedef CRITICAL_SECTION pthread_mutex_t;
26
27
#if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
28
#define USE_WINDOWS_CONDITION_VARIABLE
29
typedef CONDITION_VARIABLE pthread_cond_t;
30
#else
31
typedef struct {
32
  HANDLE waiting_sem_;
33
  HANDLE received_sem_;
34
  HANDLE signal_event_;
35
} pthread_cond_t;
36
#endif  // _WIN32_WINNT >= 0x600
37
38
#ifndef WINAPI_FAMILY_PARTITION
39
#define WINAPI_PARTITION_DESKTOP 1
40
#define WINAPI_FAMILY_PARTITION(x) x
41
#endif
42
43
#if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
44
#define USE_CREATE_THREAD
45
#endif
46
47
#else  // !_WIN32
48
49
#include <pthread.h>
50
51
#endif  // _WIN32
52
53
typedef struct {
54
  pthread_mutex_t mutex_;
55
  pthread_cond_t  condition_;
56
  pthread_t       thread_;
57
} WebPWorkerImpl;
58
59
#if defined(_WIN32)
60
61
//------------------------------------------------------------------------------
62
// simplistic pthread emulation layer
63
64
#include <process.h>
65
66
// _beginthreadex requires __stdcall
67
#define THREADFN unsigned int __stdcall
68
#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
69
70
#if _WIN32_WINNT >= 0x0501  // Windows XP or greater
71
#define WaitForSingleObject(obj, timeout) \
72
  WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)
73
#endif
74
75
static int pthread_create(pthread_t* const thread, const void* attr,
76
                          unsigned int (__stdcall* start)(void*), void* arg) {
77
  (void)attr;
78
#ifdef USE_CREATE_THREAD
79
  *thread = CreateThread(NULL,   /* lpThreadAttributes */
80
                         0,      /* dwStackSize */
81
                         start,
82
                         arg,
83
                         0,      /* dwStackSize */
84
                         NULL);  /* lpThreadId */
85
#else
86
  *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
87
                                      0,      /* unsigned stack_size */
88
                                      start,
89
                                      arg,
90
                                      0,      /* unsigned initflag */
91
                                      NULL);  /* unsigned *thrdaddr */
92
#endif
93
  if (*thread == NULL) return 1;
94
  SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
95
  return 0;
96
}
97
98
static int pthread_join(pthread_t thread, void** value_ptr) {
99
  (void)value_ptr;
100
  return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
101
          CloseHandle(thread) == 0);
102
}
103
104
// Mutex
105
static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
106
  (void)mutexattr;
107
#if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
108
  InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);
109
#else
110
  InitializeCriticalSection(mutex);
111
#endif
112
  return 0;
113
}
114
115
static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
116
  EnterCriticalSection(mutex);
117
  return 0;
118
}
119
120
static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
121
  LeaveCriticalSection(mutex);
122
  return 0;
123
}
124
125
static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
126
  DeleteCriticalSection(mutex);
127
  return 0;
128
}
129
130
// Condition
131
static int pthread_cond_destroy(pthread_cond_t* const condition) {
132
  int ok = 1;
133
#ifdef USE_WINDOWS_CONDITION_VARIABLE
134
  (void)condition;
135
#else
136
  ok &= (CloseHandle(condition->waiting_sem_) != 0);
137
  ok &= (CloseHandle(condition->received_sem_) != 0);
138
  ok &= (CloseHandle(condition->signal_event_) != 0);
139
#endif
140
  return !ok;
141
}
142
143
static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
144
  (void)cond_attr;
145
#ifdef USE_WINDOWS_CONDITION_VARIABLE
146
  InitializeConditionVariable(condition);
147
#else
148
  condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
149
  condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
150
  condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
151
  if (condition->waiting_sem_ == NULL ||
152
      condition->received_sem_ == NULL ||
153
      condition->signal_event_ == NULL) {
154
    pthread_cond_destroy(condition);
155
    return 1;
156
  }
157
#endif
158
  return 0;
159
}
160
161
static int pthread_cond_signal(pthread_cond_t* const condition) {
162
  int ok = 1;
163
#ifdef USE_WINDOWS_CONDITION_VARIABLE
164
  WakeConditionVariable(condition);
165
#else
166
  if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
167
    // a thread is waiting in pthread_cond_wait: allow it to be notified
168
    ok = SetEvent(condition->signal_event_);
169
    // wait until the event is consumed so the signaler cannot consume
170
    // the event via its own pthread_cond_wait.
171
    ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
172
           WAIT_OBJECT_0);
173
  }
174
#endif
175
  return !ok;
176
}
177
178
static int pthread_cond_wait(pthread_cond_t* const condition,
179
                             pthread_mutex_t* const mutex) {
180
  int ok;
181
#ifdef USE_WINDOWS_CONDITION_VARIABLE
182
  ok = SleepConditionVariableCS(condition, mutex, INFINITE);
183
#else
184
  // note that there is a consumer available so the signal isn't dropped in
185
  // pthread_cond_signal
186
  if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1;
187
  // now unlock the mutex so pthread_cond_signal may be issued
188
  pthread_mutex_unlock(mutex);
189
  ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
190
        WAIT_OBJECT_0);
191
  ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
192
  pthread_mutex_lock(mutex);
193
#endif
194
  return !ok;
195
}
196
197
#else  // !_WIN32
198
# define THREADFN void*
199
0
# define THREAD_RETURN(val) val
200
#endif  // _WIN32
201
202
//------------------------------------------------------------------------------
203
204
0
static THREADFN ThreadLoop(void* ptr) {
205
0
  WebPWorker* const worker = (WebPWorker*)ptr;
206
0
  WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
207
0
  int done = 0;
208
0
  while (!done) {
209
0
    pthread_mutex_lock(&impl->mutex_);
210
0
    while (worker->status_ == OK) {   // wait in idling mode
211
0
      pthread_cond_wait(&impl->condition_, &impl->mutex_);
212
0
    }
213
0
    if (worker->status_ == WORK) {
214
0
      WebPGetWorkerInterface()->Execute(worker);
215
0
      worker->status_ = OK;
216
0
    } else if (worker->status_ == NOT_OK) {   // finish the worker
217
0
      done = 1;
218
0
    }
219
    // signal to the main thread that we're done (for Sync())
220
    // Note the associated mutex does not need to be held when signaling the
221
    // condition. Unlocking the mutex first may improve performance in some
222
    // implementations, avoiding the case where the waiting thread can't
223
    // reacquire the mutex when woken.
224
0
    pthread_mutex_unlock(&impl->mutex_);
225
0
    pthread_cond_signal(&impl->condition_);
226
0
  }
227
0
  return THREAD_RETURN(NULL);    // Thread is finished
228
0
}
229
230
// main thread state control
231
0
static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) {
232
  // No-op when attempting to change state on a thread that didn't come up.
233
  // Checking status_ without acquiring the lock first would result in a data
234
  // race.
235
0
  WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
236
0
  if (impl == NULL) return;
237
238
0
  pthread_mutex_lock(&impl->mutex_);
239
0
  if (worker->status_ >= OK) {
240
    // wait for the worker to finish
241
0
    while (worker->status_ != OK) {
242
0
      pthread_cond_wait(&impl->condition_, &impl->mutex_);
243
0
    }
244
    // assign new status and release the working thread if needed
245
0
    if (new_status != OK) {
246
0
      worker->status_ = new_status;
247
      // Note the associated mutex does not need to be held when signaling the
248
      // condition. Unlocking the mutex first may improve performance in some
249
      // implementations, avoiding the case where the waiting thread can't
250
      // reacquire the mutex when woken.
251
0
      pthread_mutex_unlock(&impl->mutex_);
252
0
      pthread_cond_signal(&impl->condition_);
253
0
      return;
254
0
    }
255
0
  }
256
0
  pthread_mutex_unlock(&impl->mutex_);
257
0
}
258
259
#endif  // WEBP_USE_THREAD
260
261
//------------------------------------------------------------------------------
262
263
296
static void Init(WebPWorker* const worker) {
264
296
  memset(worker, 0, sizeof(*worker));
265
296
  worker->status_ = NOT_OK;
266
296
}
267
268
0
static int Sync(WebPWorker* const worker) {
269
0
#ifdef WEBP_USE_THREAD
270
0
  ChangeState(worker, OK);
271
0
#endif
272
0
  assert(worker->status_ <= OK);
273
0
  return !worker->had_error;
274
0
}
275
276
0
static int Reset(WebPWorker* const worker) {
277
0
  int ok = 1;
278
0
  worker->had_error = 0;
279
0
  if (worker->status_ < OK) {
280
0
#ifdef WEBP_USE_THREAD
281
0
    WebPWorkerImpl* const impl =
282
0
        (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl));
283
0
    worker->impl_ = (void*)impl;
284
0
    if (worker->impl_ == NULL) {
285
0
      return 0;
286
0
    }
287
0
    if (pthread_mutex_init(&impl->mutex_, NULL)) {
288
0
      goto Error;
289
0
    }
290
0
    if (pthread_cond_init(&impl->condition_, NULL)) {
291
0
      pthread_mutex_destroy(&impl->mutex_);
292
0
      goto Error;
293
0
    }
294
0
    pthread_mutex_lock(&impl->mutex_);
295
0
    ok = !pthread_create(&impl->thread_, NULL, ThreadLoop, worker);
296
0
    if (ok) worker->status_ = OK;
297
0
    pthread_mutex_unlock(&impl->mutex_);
298
0
    if (!ok) {
299
0
      pthread_mutex_destroy(&impl->mutex_);
300
0
      pthread_cond_destroy(&impl->condition_);
301
0
 Error:
302
0
      WebPSafeFree(impl);
303
0
      worker->impl_ = NULL;
304
0
      return 0;
305
0
    }
306
#else
307
    worker->status_ = OK;
308
#endif
309
0
  } else if (worker->status_ > OK) {
310
0
    ok = Sync(worker);
311
0
  }
312
0
  assert(!ok || (worker->status_ == OK));
313
0
  return ok;
314
0
}
315
316
0
static void Execute(WebPWorker* const worker) {
317
0
  if (worker->hook != NULL) {
318
0
    worker->had_error |= !worker->hook(worker->data1, worker->data2);
319
0
  }
320
0
}
321
322
0
static void Launch(WebPWorker* const worker) {
323
0
#ifdef WEBP_USE_THREAD
324
0
  ChangeState(worker, WORK);
325
#else
326
  Execute(worker);
327
#endif
328
0
}
329
330
573
static void End(WebPWorker* const worker) {
331
573
#ifdef WEBP_USE_THREAD
332
573
  if (worker->impl_ != NULL) {
333
0
    WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
334
0
    ChangeState(worker, NOT_OK);
335
0
    pthread_join(impl->thread_, NULL);
336
0
    pthread_mutex_destroy(&impl->mutex_);
337
0
    pthread_cond_destroy(&impl->condition_);
338
0
    WebPSafeFree(impl);
339
0
    worker->impl_ = NULL;
340
0
  }
341
#else
342
  worker->status_ = NOT_OK;
343
  assert(worker->impl_ == NULL);
344
#endif
345
573
  assert(worker->status_ == NOT_OK);
346
573
}
347
348
//------------------------------------------------------------------------------
349
350
static WebPWorkerInterface g_worker_interface = {
351
  Init, Reset, Sync, Launch, Execute, End
352
};
353
354
0
int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {
355
0
  if (winterface == NULL ||
356
0
      winterface->Init == NULL || winterface->Reset == NULL ||
357
0
      winterface->Sync == NULL || winterface->Launch == NULL ||
358
0
      winterface->Execute == NULL || winterface->End == NULL) {
359
0
    return 0;
360
0
  }
361
0
  g_worker_interface = *winterface;
362
0
  return 1;
363
0
}
364
365
869
const WebPWorkerInterface* WebPGetWorkerInterface(void) {
366
869
  return &g_worker_interface;
367
869
}
368
369
//------------------------------------------------------------------------------