Coverage Report

Created: 2026-05-30 06:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/libde265/libde265/threads.cc
Line
Count
Source
1
/*
2
 * H.265 video codec.
3
 * Copyright (c) 2013-2014 struktur AG, Dirk Farin <farin@struktur.de>
4
 *
5
 * This file is part of libde265.
6
 *
7
 * libde265 is free software: you can redistribute it and/or modify
8
 * it under the terms of the GNU Lesser General Public License as
9
 * published by the Free Software Foundation, either version 3 of
10
 * the License, or (at your option) any later version.
11
 *
12
 * libde265 is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU Lesser General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU Lesser General Public License
18
 * along with libde265.  If not, see <http://www.gnu.org/licenses/>.
19
 */
20
21
#include "threads.h"
22
#include <assert.h>
23
#include <string.h>
24
25
#if defined(_MSC_VER) || defined(__MINGW32__)
26
# include <malloc.h>
27
#elif defined(HAVE_ALLOCA_H)
28
# include <alloca.h>
29
#endif
30
31
32
de265_progress_lock::de265_progress_lock()
33
0
{
34
0
  mProgress = 0;
35
0
}
36
37
de265_progress_lock::~de265_progress_lock()
38
0
{
39
0
}
40
41
void de265_progress_lock::wait_for_progress(int progress)
42
0
{
43
0
  if (mProgress >= progress) {
44
0
    return;
45
0
  }
46
47
0
  std::unique_lock<std::mutex> lock(mutex);
48
0
  while (mProgress < progress) {
49
0
    cond.wait(lock);
50
0
  }
51
0
}
52
53
void de265_progress_lock::set_progress(int progress)
54
0
{
55
0
  std::unique_lock<std::mutex> lock(mutex);
56
57
0
  if (progress>mProgress) {
58
0
    mProgress = progress;
59
60
0
    cond.notify_all();
61
0
  }
62
0
}
63
64
void de265_progress_lock::increase_progress(int progress)
65
0
{
66
0
  std::unique_lock<std::mutex> lock(mutex);
67
68
0
  mProgress += progress;
69
0
  cond.notify_all();
70
0
}
71
72
int  de265_progress_lock::get_progress() const
73
0
{
74
0
  return mProgress;
75
0
}
76
77
78
79
80
#include "libde265/decctx.h"
81
82
#if 0
83
const char* line="--------------------------------------------------";
84
void printblks(const thread_pool* pool)
85
{
86
  int w = pool->tasks[0].data.task_ctb.ctx->current_sps->PicWidthInCtbsY;
87
  int h = pool->tasks[0].data.task_ctb.ctx->current_sps->PicHeightInCtbsY;
88
89
  printf("active threads: %d  queue len: %d\n",pool->num_threads_working,pool->num_tasks);
90
91
  char *const p = (char *)alloca(w * h * sizeof(char));
92
  assert(p != nullptr);
93
  memset(p,' ',w*h);
94
95
  for (int i=0;i<pool->num_tasks;i++) {
96
    int b = 0; //pool->tasks[i].num_blockers;
97
    int x = pool->tasks[i].data.task_ctb.ctb_x;
98
    int y = pool->tasks[i].data.task_ctb.ctb_y;
99
    p[y*w+x] = b+'0';
100
  }
101
102
  for (int i=0;i<pool->num_threads_working;i++) {
103
    int x = pool->ctbx[i];
104
    int y = pool->ctby[i];
105
    p[y*w+x] = '*';
106
  }
107
108
  printf("+%s+\n",line+50-w);
109
  for (int y=0;y<h;y++)
110
    {
111
      printf("|");
112
      for (int x=0;x<w;x++)
113
        {
114
          printf("%c",p[x+y*w]);
115
        }
116
      printf("|\n");
117
    }
118
  printf("+%s+\n",line+50-w);
119
}
120
#endif
121
122
123
static void worker_thread(thread_pool* pool)
124
1.68k
{
125
1.68k
  while(true) {
126
127
1.68k
    thread_task* task = nullptr;
128
129
1.68k
    {
130
1.68k
      std::unique_lock<std::mutex> lock(pool->mutex);
131
132
      // wait until we can pick a task or until the pool has been stopped
133
134
2.03k
      for (;;) {
135
        // end waiting if thread-pool has been stopped or we have a task to execute
136
137
2.03k
        if (pool->stopped || pool->tasks.size()>0) {
138
1.68k
          break;
139
1.68k
        }
140
141
        //printf("going idle\n");
142
354
        pool->cond_var.wait(lock);
143
354
      }
144
145
      // if the pool was shut down, end the execution
146
147
1.68k
      if (pool->stopped) {
148
1.68k
        return;
149
1.68k
      }
150
151
152
      // get a task
153
154
0
      task = pool->tasks.front();
155
0
      pool->tasks.pop_front();
156
157
0
      pool->num_threads_working++;
158
159
      //printblks(pool);
160
0
    }
161
162
    // execute the task
163
164
0
    task->work();
165
166
    // end processing and check if this was the last task to be processed
167
168
    // TODO: the num_threads_working can probably be an atomic integer
169
0
    std::unique_lock<std::mutex> lock(pool->mutex);
170
171
0
    pool->num_threads_working--;
172
0
  }
173
1.68k
}
174
175
176
de265_error thread_pool::start(int num_threads_to_start)
177
1.68k
{
178
1.68k
  de265_error err = DE265_OK;
179
180
  // limit number of threads to maximum
181
182
1.68k
  if (num_threads_to_start > MAX_THREADS) {
183
0
    num_threads_to_start = MAX_THREADS;
184
0
    err = DE265_WARNING_NUMBER_OF_THREADS_LIMITED_TO_MAXIMUM;
185
0
  }
186
187
1.68k
  num_threads = 0; // will be increased below
188
189
1.68k
  {
190
1.68k
    std::unique_lock<std::mutex> lock(mutex);
191
192
1.68k
    num_threads_working = 0;
193
1.68k
    stopped = false;
194
1.68k
  }
195
196
  // start worker threads
197
198
3.36k
  for (int i=0; i<num_threads_to_start; i++) {
199
1.68k
    thread[i] = std::thread(worker_thread, this);
200
1.68k
    num_threads++;
201
1.68k
  }
202
203
1.68k
  return err;
204
1.68k
}
205
206
207
void thread_pool::stop()
208
1.68k
{
209
1.68k
  {
210
1.68k
    std::unique_lock<std::mutex> lock(mutex);
211
1.68k
    stopped = true;
212
1.68k
  }
213
214
1.68k
  cond_var.notify_all();
215
216
3.36k
  for (int i=0;i<num_threads;i++) {
217
1.68k
    thread[i].join();
218
1.68k
  }
219
1.68k
}
220
221
222
void thread_pool::add_task(thread_task* task)
223
0
{
224
0
  std::unique_lock<std::mutex> lock(mutex);
225
226
0
  if (!stopped) {
227
228
0
    tasks.push_back(task);
229
230
    // wake up one thread
231
232
0
    cond_var.notify_one();
233
0
  }
234
0
}