Coverage Report

Created: 2025-11-16 06:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ntopng/src/ThreadPool.cpp
Line
Count
Source
1
/*
2
 *
3
 * (C) 2017-25 - ntop.org
4
 *
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software Foundation,
18
 * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19
 *
20
 */
21
22
#include "ntop_includes.h"
23
24
// #define THREAD_DEBUG 1
25
// #define TASK_DEBUG 1
26
27
/* **************************************************** */
28
29
0
static void *doRun(void *ptr) {
30
0
  Utils::setThreadName("ntopng-th-pool");
31
32
0
  ((ThreadPool *)ptr)->run();
33
0
  return (NULL);
34
0
}
35
36
/* **************************************************** */
37
38
0
ThreadPool::ThreadPool(char *comma_separated_affinity_mask) {
39
0
  if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__);
40
0
  m = new (std::nothrow) Mutex();
41
0
  pthread_cond_init(&condvar, NULL);
42
0
  terminating = false;
43
44
0
#ifdef __linux__
45
0
  CPU_ZERO(&affinity_mask);
46
47
0
  if (comma_separated_affinity_mask)
48
0
    Utils::setAffinityMask(comma_separated_affinity_mask, &affinity_mask);
49
0
  else if (ntop->getPrefs()->get_other_cpu_affinity())
50
0
    Utils::setAffinityMask(ntop->getPrefs()->get_other_cpu_affinity(),
51
0
                           &affinity_mask);
52
0
#endif
53
54
0
  num_threads = 0;
55
0
}
56
57
/* **************************************************** */
58
59
0
ThreadPool::~ThreadPool() {
60
0
  void *res;
61
62
0
  shutdown();
63
64
0
  for (vector<pthread_t>::const_iterator it = threadsState.begin();
65
0
       it != threadsState.end(); ++it) {
66
0
    pthread_cond_signal(&condvar);
67
0
    pthread_join(*it, &res);
68
0
  }
69
70
0
  threadsState.clear();
71
72
0
  while (!threads.empty()) {
73
0
    QueuedThreadData *q = threads.front();
74
75
0
    threads.pop();
76
0
    delete q;
77
0
  }
78
79
0
  pthread_cond_destroy(&condvar);
80
0
  delete m;
81
0
}
82
83
/* **************************************************** */
84
85
0
bool ThreadPool::spawn() {
86
0
  pthread_t new_thread;
87
88
0
  if (num_threads < CONST_MAX_NUM_THREADED_ACTIVITIES) {
89
0
    int rc;
90
91
0
    if ((rc = pthread_create(&new_thread, NULL, doRun, (void *)this)) == 0) {
92
0
      threadsState.push_back(new_thread);
93
94
0
#ifdef __linux__
95
0
      Utils::setThreadAffinityWithMask(new_thread, &affinity_mask);
96
0
#endif
97
0
      num_threads++;
98
99
#ifdef THREAD_DEBUG
100
      ntop->getTrace()->traceEvent(TRACE_NORMAL, "Spawn thread [total: %u]",
101
                                   num_threads);
102
#endif
103
104
0
      return (true);
105
0
    } else {
106
0
#ifndef WIN32
107
0
      ntop->getTrace()->traceEvent(TRACE_ERROR, "Failure spawning thread [%u thread(s)][rc: %d]",
108
0
           num_threads, rc);
109
0
#endif
110
0
    }
111
0
  }
112
113
0
  return (false); /* Something didn't work as expected */
114
0
}
115
116
/* **************************************************** */
117
118
0
void ThreadPool::run() {
119
#ifdef THREAD_DEBUG
120
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "*** Starting thread [%u]",
121
                               pthread_self());
122
#endif
123
124
0
  while (!isTerminating()) {
125
0
    QueuedThreadData *q;
126
127
#ifdef THREAD_DEBUG
128
    ntop->getTrace()->traceEvent(
129
         TRACE_NORMAL, "*** About to dequeue job [%u][terminating=%d]",
130
         pthread_self(), isTerminating());
131
#endif
132
133
0
    q = dequeueJob(true);
134
135
#ifdef THREAD_DEBUG
136
    ntop->getTrace()->traceEvent(
137
         TRACE_NORMAL, "*** Dequeued job [%u][terminating=%d][%s][%s]",
138
         pthread_self(), isTerminating(), q->script_path, q->iface->get_name());
139
#endif
140
141
0
    if ((q == NULL) || isTerminating()) {
142
0
      if (q) delete q;
143
0
      break;
144
0
    } else {
145
0
      q->run();
146
0
      delete q;
147
148
0
      Utils::setThreadName("ntopng-idle-thpool");
149
0
    }
150
0
  }
151
152
#ifdef THREAD_DEBUG
153
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "*** Terminating thread [%u]",
154
                               pthread_self());
155
#endif
156
0
}
157
158
/* ******************************************* */
159
160
0
bool ThreadPool::isQueueable(ThreadedActivityState cur_state) {
161
0
  switch (cur_state) {
162
0
  case threaded_activity_state_sleeping:
163
0
  case threaded_activity_state_unknown:
164
0
    return (true);
165
0
    break;
166
167
0
  default:
168
#ifdef THREAD_DEBUG
169
    ntop->getTrace()->traceEvent(TRACE_NORMAL,
170
         "ThreadedActivity::isQueueable(%s)",
171
         Utils::get_state_label(cur_state));
172
#endif
173
0
    return (false);
174
0
    break;
175
0
  }
176
177
0
  return (false); /* NOTREACHED */
178
0
}
179
180
/* **************************************************** */
181
182
bool ThreadPool::queueJob(ThreadedActivity *ta, char *script_path,
183
                          NetworkInterface *iface, time_t scheduled_time,
184
                          time_t deadline, PeriodicActivities *pa,
185
0
        bool hourly_daily_activity) {
186
0
  QueuedThreadData *q;
187
0
  ThreadedActivityStats *stats;
188
0
  ThreadedActivityState ta_state;
189
190
0
  if (isTerminating()) return (false);
191
192
0
  if (num_threads == 0) {
193
    /* Spawn an initial set of threads - do not do this in the constructor
194
     * as it is too early (creates issues on freebsd when running as service) */
195
0
    for (u_int i = 0; i < 5 /* Min number of threads */; i++) spawn();
196
0
  }
197
198
0
  if ((stats = ta->getThreadedActivityStats(iface, script_path, true)) == NULL)
199
0
    return (false);
200
0
  else
201
0
    ta_state = stats->getState();
202
203
0
  if (!isQueueable(ta_state)) {
204
    /* This task is already in queue */
205
206
#ifdef THREAD_DEBUG
207
    char deadline_buf[32];
208
    time_t stats_deadline = stats->getDeadline();
209
    struct tm deadline_tm;
210
211
    strftime(deadline_buf, sizeof(deadline_buf), "%H:%M:%S",
212
             localtime_r(&stats_deadline, &deadline_tm));
213
    ntop->getTrace()->traceEvent(TRACE_WARNING, "Unable to schedule %s [running: %u][deadline: %d (%s)][now: %d][param-deadline: %d (%d)]",
214
         script_path, ta_state == threaded_activity_state_running ? 1 : 0,
215
         stats_deadline, deadline_buf, now, deadline, deadline-now);
216
#endif
217
218
0
    if (ta_state == threaded_activity_state_queued) {
219
      /*
220
        If here, the periodic activity has been waiting in queue for too long
221
        and no thread has dequeued it. Hence, we can try and spawn an additional
222
        thread, up to a maximum
223
      */
224
#ifdef THREAD_DEBUG
225
      ntop->getTrace()->traceEvent(TRACE_WARNING,
226
           "Waiting in queue for too long [len: %u/%u][num_interfaces: %u]",
227
           threadsState.size(), CONST_MAX_NUM_THREADED_ACTIVITIES,
228
           ntop->get_num_interfaces() + 1);
229
#endif
230
0
      stats->setNotExecutedActivity(true);
231
0
    } else if (ta_state == threaded_activity_state_running &&
232
0
               (stats->getDeadline() < scheduled_time))
233
0
      stats->setSlowPeriodicActivity(true);
234
235
0
    return (false); /* Task still running or already queued, don't re-queue it */
236
0
  }
237
238
#ifdef THREAD_DEBUG
239
  ntop->getTrace()->traceEvent(TRACE_WARNING, "Scheduling %s [now: %d][param-deadline: %d (%d)]",
240
             script_path, now, deadline, deadline-now);
241
#endif
242
243
0
  q = new (std::nothrow) QueuedThreadData(ta, script_path, iface, deadline, pa, hourly_daily_activity);
244
245
0
  if (!q) {
246
0
    ntop->getTrace()->traceEvent(TRACE_WARNING, "Unable to create job");
247
0
    return (false);
248
0
  }
249
250
  /* With limited resorces we do not spawn new threads */
251
0
  if(!ntop->getPrefs()->limitResourcesUsage()) {
252
0
    if ((int)threads.size() > (num_threads - 3)) {
253
#ifdef THREAD_DEBUG
254
      ntop->getTrace()->traceEvent(TRACE_NORMAL, "Job queue: %u", threads.size());
255
#endif
256
      
257
0
      spawn(); /* Spawn a new thread if there are queued jobs */
258
0
    }
259
0
  }
260
261
0
  m->lock(__FILE__, __LINE__);
262
0
  if (stats) stats->setScheduledTime(scheduled_time);
263
264
0
  ta->set_state_queued(iface, script_path);
265
0
  threads.push(q);
266
267
0
  pthread_cond_signal(&condvar);
268
0
  m->unlock(__FILE__, __LINE__);
269
270
0
  return (true); /*  TODO: add a max queue len and return false */
271
0
}
272
273
/* **************************************************** */
274
275
0
QueuedThreadData *ThreadPool::dequeueJob(bool waitIfEmpty) {
276
0
  QueuedThreadData *q;
277
278
0
  m->lock(__FILE__, __LINE__);
279
0
  if (waitIfEmpty) {
280
#ifdef WIN32
281
    /*  Use sleep() on window as cond_wait seems to leak */
282
    sleep(1);
283
#else
284
0
    while (threads.empty() && (!isTerminating()))
285
0
      m->cond_wait(&condvar);
286
287
0
#endif
288
0
  }
289
290
0
  if (threads.empty() || isTerminating()) {
291
0
    q = NULL;
292
0
  } else {
293
0
    q = threads.front();
294
0
    threads.pop();
295
0
  }
296
297
#ifdef THREAD_DEBUG
298
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "Dequeued job [remaining: %u][%s]",
299
                               threads.size(), q ? q->iface->get_name() : "");
300
#endif
301
302
0
  m->unlock(__FILE__, __LINE__);
303
304
0
  return (q);
305
0
}
306
307
/* **************************************************** */
308
309
0
void ThreadPool::shutdown() {
310
#ifdef THREAD_DEBUG
311
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "*** %s() ***", __FUNCTION__);
312
#endif
313
314
0
  m->lock(__FILE__, __LINE__);
315
0
  terminating = true;
316
0
  pthread_cond_broadcast(&condvar);
317
0
  m->unlock(__FILE__, __LINE__);
318
0
}