Coverage Report

Created: 2026-02-26 06:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/libde265/libde265/threads.cc
Line
Count
Source
1
/*
2
 * H.265 video codec.
3
 * Copyright (c) 2013-2014 struktur AG, Dirk Farin <farin@struktur.de>
4
 *
5
 * This file is part of libde265.
6
 *
7
 * libde265 is free software: you can redistribute it and/or modify
8
 * it under the terms of the GNU Lesser General Public License as
9
 * published by the Free Software Foundation, either version 3 of
10
 * the License, or (at your option) any later version.
11
 *
12
 * libde265 is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU Lesser General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU Lesser General Public License
18
 * along with libde265.  If not, see <http://www.gnu.org/licenses/>.
19
 */
20
21
#include "threads.h"
22
#include <assert.h>
23
#include <string.h>
24
25
#if defined(_MSC_VER) || defined(__MINGW32__)
26
# include <malloc.h>
27
#elif defined(HAVE_ALLOCA_H)
28
# include <alloca.h>
29
#endif
30
31
32
de265_progress_lock::de265_progress_lock()
33
0
{
34
0
  mProgress = 0;
35
0
}
36
37
de265_progress_lock::~de265_progress_lock()
38
0
{
39
0
}
40
41
void de265_progress_lock::wait_for_progress(int progress)
42
0
{
43
0
  if (mProgress >= progress) {
44
0
    return;
45
0
  }
46
47
0
  std::unique_lock<std::mutex> lock(mutex);
48
0
  while (mProgress < progress) {
49
0
    cond.wait(lock);
50
0
  }
51
0
}
52
53
void de265_progress_lock::set_progress(int progress)
54
0
{
55
0
  std::unique_lock<std::mutex> lock(mutex);
56
57
0
  if (progress>mProgress) {
58
0
    mProgress = progress;
59
60
0
    cond.notify_all();
61
0
  }
62
0
}
63
64
void de265_progress_lock::increase_progress(int progress)
65
0
{
66
0
  std::unique_lock<std::mutex> lock(mutex);
67
68
0
  mProgress += progress;
69
0
  cond.notify_all();
70
0
}
71
72
int  de265_progress_lock::get_progress() const
73
0
{
74
0
  return mProgress;
75
0
}
76
77
78
79
80
#include "libde265/decctx.h"
81
82
#if 0
83
const char* line="--------------------------------------------------";
84
void printblks(const thread_pool* pool)
85
{
86
  int w = pool->tasks[0].data.task_ctb.ctx->current_sps->PicWidthInCtbsY;
87
  int h = pool->tasks[0].data.task_ctb.ctx->current_sps->PicHeightInCtbsY;
88
89
  printf("active threads: %d  queue len: %d\n",pool->num_threads_working,pool->num_tasks);
90
91
  char *const p = (char *)alloca(w * h * sizeof(char));
92
  assert(p != NULL);
93
  memset(p,' ',w*h);
94
95
  for (int i=0;i<pool->num_tasks;i++) {
96
    int b = 0; //pool->tasks[i].num_blockers;
97
    int x = pool->tasks[i].data.task_ctb.ctb_x;
98
    int y = pool->tasks[i].data.task_ctb.ctb_y;
99
    p[y*w+x] = b+'0';
100
  }
101
102
  for (int i=0;i<pool->num_threads_working;i++) {
103
    int x = pool->ctbx[i];
104
    int y = pool->ctby[i];
105
    p[y*w+x] = '*';
106
  }
107
108
  printf("+%s+\n",line+50-w);
109
  for (int y=0;y<h;y++)
110
    {
111
      printf("|");
112
      for (int x=0;x<w;x++)
113
        {
114
          printf("%c",p[x+y*w]);
115
        }
116
      printf("|\n");
117
    }
118
  printf("+%s+\n",line+50-w);
119
}
120
#endif
121
122
123
static void worker_thread(thread_pool* pool)
124
55
{
125
55
  while(true) {
126
127
55
    thread_task* task = nullptr;
128
129
55
    {
130
55
      std::unique_lock<std::mutex> lock(pool->mutex);
131
132
      // wait until we can pick a task or until the pool has been stopped
133
134
89
      for (;;) {
135
        // end waiting if thread-pool has been stopped or we have a task to execute
136
137
89
        if (pool->stopped || pool->tasks.size()>0) {
138
55
          break;
139
55
        }
140
141
        //printf("going idle\n");
142
34
        pool->cond_var.wait(lock);
143
34
      }
144
145
      // if the pool was shut down, end the execution
146
147
55
      if (pool->stopped) {
148
55
        return;
149
55
      }
150
151
152
      // get a task
153
154
0
      task = pool->tasks.front();
155
0
      pool->tasks.pop_front();
156
157
0
      pool->num_threads_working++;
158
159
      //printblks(pool);
160
0
    }
161
162
    // execute the task
163
164
0
    task->work();
165
166
    // end processing and check if this was the last task to be processed
167
168
    // TODO: the num_threads_working can probably be an atomic integer
169
0
    std::unique_lock<std::mutex> lock(pool->mutex);
170
171
0
    pool->num_threads_working--;
172
0
  }
173
55
}
174
175
176
de265_error start_thread_pool(thread_pool* pool, int num_threads)
177
55
{
178
55
  de265_error err = DE265_OK;
179
180
  // limit number of threads to maximum
181
182
55
  if (num_threads > MAX_THREADS) {
183
0
    num_threads = MAX_THREADS;
184
0
    err = DE265_WARNING_NUMBER_OF_THREADS_LIMITED_TO_MAXIMUM;
185
0
  }
186
187
55
  pool->num_threads = 0; // will be increased below
188
189
55
  {
190
55
    std::unique_lock<std::mutex> lock(pool->mutex);
191
192
55
    pool->num_threads_working = 0;
193
55
    pool->stopped = false;
194
55
  }
195
196
  // start worker threads
197
198
110
  for (int i=0; i<num_threads; i++) {
199
55
    pool->thread[i] = std::thread(worker_thread, pool);
200
55
    pool->num_threads++;
201
55
  }
202
203
55
  return err;
204
55
}
205
206
207
void stop_thread_pool(thread_pool* pool)
208
55
{
209
55
  {
210
55
    std::unique_lock<std::mutex> lock(pool->mutex);
211
55
    pool->stopped = true;
212
55
  }
213
214
55
  pool->cond_var.notify_all();
215
216
110
  for (int i=0;i<pool->num_threads;i++) {
217
55
    pool->thread[i].join();
218
55
  }
219
55
}
220
221
222
void   add_task(thread_pool* pool, thread_task* task)
223
0
{
224
0
  std::unique_lock<std::mutex> lock(pool->mutex);
225
226
0
  if (!pool->stopped) {
227
228
0
    pool->tasks.push_back(task);
229
230
    // wake up one thread
231
232
0
    pool->cond_var.notify_one();
233
0
  }
234
0
}