/src/ntopng/src/ThreadPool.cpp
Line | Count | Source |
1 | | /* |
2 | | * |
3 | | * (C) 2017-25 - ntop.org |
4 | | * |
5 | | * |
6 | | * This program is free software; you can redistribute it and/or modify |
7 | | * it under the terms of the GNU General Public License as published by |
8 | | * the Free Software Foundation; either version 3 of the License, or |
9 | | * (at your option) any later version. |
10 | | * |
11 | | * This program is distributed in the hope that it will be useful, |
12 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | | * GNU General Public License for more details. |
15 | | * |
16 | | * You should have received a copy of the GNU General Public License |
17 | | * along with this program; if not, write to the Free Software Foundation, |
18 | | * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
19 | | * |
20 | | */ |
21 | | |
22 | | #include "ntop_includes.h" |
23 | | |
24 | | // #define THREAD_DEBUG 1 |
25 | | // #define TASK_DEBUG 1 |
26 | | |
27 | | /* **************************************************** */ |
28 | | |
29 | 0 | static void *doRun(void *ptr) { |
30 | 0 | Utils::setThreadName("ntopng-th-pool"); |
31 | |
|
32 | 0 | ((ThreadPool *)ptr)->run(); |
33 | 0 | return (NULL); |
34 | 0 | } |
35 | | |
36 | | /* **************************************************** */ |
37 | | |
38 | 0 | ThreadPool::ThreadPool(char *comma_separated_affinity_mask) { |
39 | 0 | if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__); |
40 | 0 | m = new (std::nothrow) Mutex(); |
41 | 0 | pthread_cond_init(&condvar, NULL); |
42 | 0 | terminating = false; |
43 | |
|
44 | 0 | #ifdef __linux__ |
45 | 0 | CPU_ZERO(&affinity_mask); |
46 | |
|
47 | 0 | if (comma_separated_affinity_mask) |
48 | 0 | Utils::setAffinityMask(comma_separated_affinity_mask, &affinity_mask); |
49 | 0 | else if (ntop->getPrefs()->get_other_cpu_affinity()) |
50 | 0 | Utils::setAffinityMask(ntop->getPrefs()->get_other_cpu_affinity(), |
51 | 0 | &affinity_mask); |
52 | 0 | #endif |
53 | |
|
54 | 0 | num_threads = 0; |
55 | 0 | } |
56 | | |
57 | | /* **************************************************** */ |
58 | | |
59 | 0 | ThreadPool::~ThreadPool() { |
60 | 0 | void *res; |
61 | |
|
62 | 0 | shutdown(); |
63 | |
|
64 | 0 | for (vector<pthread_t>::const_iterator it = threadsState.begin(); |
65 | 0 | it != threadsState.end(); ++it) { |
66 | 0 | pthread_cond_signal(&condvar); |
67 | 0 | pthread_join(*it, &res); |
68 | 0 | } |
69 | |
|
70 | 0 | threadsState.clear(); |
71 | |
|
72 | 0 | while (!threads.empty()) { |
73 | 0 | QueuedThreadData *q = threads.front(); |
74 | |
|
75 | 0 | threads.pop(); |
76 | 0 | delete q; |
77 | 0 | } |
78 | |
|
79 | 0 | pthread_cond_destroy(&condvar); |
80 | 0 | delete m; |
81 | 0 | } |
82 | | |
83 | | /* **************************************************** */ |
84 | | |
85 | 0 | bool ThreadPool::spawn() { |
86 | 0 | pthread_t new_thread; |
87 | |
|
88 | 0 | if (num_threads < CONST_MAX_NUM_THREADED_ACTIVITIES) { |
89 | 0 | int rc; |
90 | |
|
91 | 0 | if ((rc = pthread_create(&new_thread, NULL, doRun, (void *)this)) == 0) { |
92 | 0 | threadsState.push_back(new_thread); |
93 | |
|
94 | 0 | #ifdef __linux__ |
95 | 0 | Utils::setThreadAffinityWithMask(new_thread, &affinity_mask); |
96 | 0 | #endif |
97 | 0 | num_threads++; |
98 | |
|
99 | | #ifdef THREAD_DEBUG |
100 | | ntop->getTrace()->traceEvent(TRACE_NORMAL, "Spawn thread [total: %u]", |
101 | | num_threads); |
102 | | #endif |
103 | |
|
104 | 0 | return (true); |
105 | 0 | } else { |
106 | 0 | #ifndef WIN32 |
107 | 0 | ntop->getTrace()->traceEvent(TRACE_ERROR, "Failure spawning thread [%u thread(s)][rc: %d]", |
108 | 0 | num_threads, rc); |
109 | 0 | #endif |
110 | 0 | } |
111 | 0 | } |
112 | | |
113 | 0 | return (false); /* Something didn't work as expected */ |
114 | 0 | } |
115 | | |
116 | | /* **************************************************** */ |
117 | | |
118 | 0 | void ThreadPool::run() { |
119 | | #ifdef THREAD_DEBUG |
120 | | ntop->getTrace()->traceEvent(TRACE_NORMAL, "*** Starting thread [%u]", |
121 | | pthread_self()); |
122 | | #endif |
123 | |
|
124 | 0 | while (!isTerminating()) { |
125 | 0 | QueuedThreadData *q; |
126 | |
|
127 | | #ifdef THREAD_DEBUG |
128 | | ntop->getTrace()->traceEvent( |
129 | | TRACE_NORMAL, "*** About to dequeue job [%u][terminating=%d]", |
130 | | pthread_self(), isTerminating()); |
131 | | #endif |
132 | |
|
133 | 0 | q = dequeueJob(true); |
134 | |
|
135 | | #ifdef THREAD_DEBUG |
136 | | ntop->getTrace()->traceEvent( |
137 | | TRACE_NORMAL, "*** Dequeued job [%u][terminating=%d][%s][%s]", |
138 | | pthread_self(), isTerminating(), q->script_path, q->iface->get_name()); |
139 | | #endif |
140 | |
|
141 | 0 | if ((q == NULL) || isTerminating()) { |
142 | 0 | if (q) delete q; |
143 | 0 | break; |
144 | 0 | } else { |
145 | 0 | q->run(); |
146 | 0 | delete q; |
147 | |
|
148 | 0 | Utils::setThreadName("ntopng-idle-thpool"); |
149 | 0 | } |
150 | 0 | } |
151 | |
|
152 | | #ifdef THREAD_DEBUG |
153 | | ntop->getTrace()->traceEvent(TRACE_NORMAL, "*** Terminating thread [%u]", |
154 | | pthread_self()); |
155 | | #endif |
156 | 0 | } |
157 | | |
158 | | /* ******************************************* */ |
159 | | |
160 | 0 | bool ThreadPool::isQueueable(ThreadedActivityState cur_state) { |
161 | 0 | switch (cur_state) { |
162 | 0 | case threaded_activity_state_sleeping: |
163 | 0 | case threaded_activity_state_unknown: |
164 | 0 | return (true); |
165 | 0 | break; |
166 | | |
167 | 0 | default: |
168 | | #ifdef THREAD_DEBUG |
169 | | ntop->getTrace()->traceEvent(TRACE_NORMAL, |
170 | | "ThreadedActivity::isQueueable(%s)", |
171 | | Utils::get_state_label(cur_state)); |
172 | | #endif |
173 | 0 | return (false); |
174 | 0 | break; |
175 | 0 | } |
176 | | |
177 | 0 | return (false); /* NOTREACHED */ |
178 | 0 | } |
179 | | |
180 | | /* **************************************************** */ |
181 | | |
182 | | bool ThreadPool::queueJob(ThreadedActivity *ta, char *script_path, |
183 | | NetworkInterface *iface, time_t scheduled_time, |
184 | | time_t deadline, PeriodicActivities *pa, |
185 | 0 | bool hourly_daily_activity) { |
186 | 0 | QueuedThreadData *q; |
187 | 0 | ThreadedActivityStats *stats; |
188 | 0 | ThreadedActivityState ta_state; |
189 | |
|
190 | 0 | if (isTerminating()) return (false); |
191 | | |
192 | 0 | if (num_threads == 0) { |
193 | | /* Spawn an initial set of threads - do not do this in the constructor |
194 | | * as it is too early (creates issues on freebsd when running as service) */ |
195 | 0 | for (u_int i = 0; i < 5 /* Min number of threads */; i++) spawn(); |
196 | 0 | } |
197 | |
|
198 | 0 | if ((stats = ta->getThreadedActivityStats(iface, script_path, true)) == NULL) |
199 | 0 | return (false); |
200 | 0 | else |
201 | 0 | ta_state = stats->getState(); |
202 | | |
203 | 0 | if (!isQueueable(ta_state)) { |
204 | | /* This task is already in queue */ |
205 | |
|
206 | | #ifdef THREAD_DEBUG |
207 | | char deadline_buf[32]; |
208 | | time_t stats_deadline = stats->getDeadline(); |
209 | | struct tm deadline_tm; |
210 | | |
211 | | strftime(deadline_buf, sizeof(deadline_buf), "%H:%M:%S", |
212 | | localtime_r(&stats_deadline, &deadline_tm)); |
213 | | ntop->getTrace()->traceEvent(TRACE_WARNING, "Unable to schedule %s [running: %u][deadline: %d (%s)][now: %d][param-deadline: %d (%d)]", |
214 | | script_path, ta_state == threaded_activity_state_running ? 1 : 0, |
215 | | stats_deadline, deadline_buf, now, deadline, deadline-now); |
216 | | #endif |
217 | |
|
218 | 0 | if (ta_state == threaded_activity_state_queued) { |
219 | | /* |
220 | | If here, the periodic activity has been waiting in queue for too long |
221 | | and no thread has dequeued it. Hence, we can try and spawn an additional |
222 | | thread, up to a maximum |
223 | | */ |
224 | | #ifdef THREAD_DEBUG |
225 | | ntop->getTrace()->traceEvent(TRACE_WARNING, |
226 | | "Waiting in queue for too long [len: %u/%u][num_interfaces: %u]", |
227 | | threadsState.size(), CONST_MAX_NUM_THREADED_ACTIVITIES, |
228 | | ntop->get_num_interfaces() + 1); |
229 | | #endif |
230 | 0 | stats->setNotExecutedActivity(true); |
231 | 0 | } else if (ta_state == threaded_activity_state_running && |
232 | 0 | (stats->getDeadline() < scheduled_time)) |
233 | 0 | stats->setSlowPeriodicActivity(true); |
234 | |
|
235 | 0 | return (false); /* Task still running or already queued, don't re-queue it */ |
236 | 0 | } |
237 | | |
238 | | #ifdef THREAD_DEBUG |
239 | | ntop->getTrace()->traceEvent(TRACE_WARNING, "Scheduling %s [now: %d][param-deadline: %d (%d)]", |
240 | | script_path, now, deadline, deadline-now); |
241 | | #endif |
242 | | |
243 | 0 | q = new (std::nothrow) QueuedThreadData(ta, script_path, iface, deadline, pa, hourly_daily_activity); |
244 | |
|
245 | 0 | if (!q) { |
246 | 0 | ntop->getTrace()->traceEvent(TRACE_WARNING, "Unable to create job"); |
247 | 0 | return (false); |
248 | 0 | } |
249 | | |
250 | | /* With limited resorces we do not spawn new threads */ |
251 | 0 | if(!ntop->getPrefs()->limitResourcesUsage()) { |
252 | 0 | if ((int)threads.size() > (num_threads - 3)) { |
253 | | #ifdef THREAD_DEBUG |
254 | | ntop->getTrace()->traceEvent(TRACE_NORMAL, "Job queue: %u", threads.size()); |
255 | | #endif |
256 | | |
257 | 0 | spawn(); /* Spawn a new thread if there are queued jobs */ |
258 | 0 | } |
259 | 0 | } |
260 | |
|
261 | 0 | m->lock(__FILE__, __LINE__); |
262 | 0 | if (stats) stats->setScheduledTime(scheduled_time); |
263 | |
|
264 | 0 | ta->set_state_queued(iface, script_path); |
265 | 0 | threads.push(q); |
266 | |
|
267 | 0 | pthread_cond_signal(&condvar); |
268 | 0 | m->unlock(__FILE__, __LINE__); |
269 | |
|
270 | 0 | return (true); /* TODO: add a max queue len and return false */ |
271 | 0 | } |
272 | | |
273 | | /* **************************************************** */ |
274 | | |
275 | 0 | QueuedThreadData *ThreadPool::dequeueJob(bool waitIfEmpty) { |
276 | 0 | QueuedThreadData *q; |
277 | |
|
278 | 0 | m->lock(__FILE__, __LINE__); |
279 | 0 | if (waitIfEmpty) { |
280 | | #ifdef WIN32 |
281 | | /* Use sleep() on window as cond_wait seems to leak */ |
282 | | sleep(1); |
283 | | #else |
284 | 0 | while (threads.empty() && (!isTerminating())) |
285 | 0 | m->cond_wait(&condvar); |
286 | |
|
287 | 0 | #endif |
288 | 0 | } |
289 | |
|
290 | 0 | if (threads.empty() || isTerminating()) { |
291 | 0 | q = NULL; |
292 | 0 | } else { |
293 | 0 | q = threads.front(); |
294 | 0 | threads.pop(); |
295 | 0 | } |
296 | |
|
297 | | #ifdef THREAD_DEBUG |
298 | | ntop->getTrace()->traceEvent(TRACE_NORMAL, "Dequeued job [remaining: %u][%s]", |
299 | | threads.size(), q ? q->iface->get_name() : ""); |
300 | | #endif |
301 | |
|
302 | 0 | m->unlock(__FILE__, __LINE__); |
303 | |
|
304 | 0 | return (q); |
305 | 0 | } |
306 | | |
307 | | /* **************************************************** */ |
308 | | |
309 | 0 | void ThreadPool::shutdown() { |
310 | | #ifdef THREAD_DEBUG |
311 | | ntop->getTrace()->traceEvent(TRACE_NORMAL, "*** %s() ***", __FUNCTION__); |
312 | | #endif |
313 | |
|
314 | 0 | m->lock(__FILE__, __LINE__); |
315 | 0 | terminating = true; |
316 | 0 | pthread_cond_broadcast(&condvar); |
317 | 0 | m->unlock(__FILE__, __LINE__); |
318 | 0 | } |