Coverage Report

Created: 2026-06-15 06:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/logging-log4cxx/src/main/cpp/threadutility.cpp
Line
Count
Source
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#include "log4cxx/helpers/threadutility.h"
19
#if !defined(LOG4CXX)
20
  #define LOG4CXX 1
21
#endif
22
#include "log4cxx/private/log4cxx_private.h"
23
#include "log4cxx/helpers/loglog.h"
24
#include "log4cxx/helpers/transcoder.h"
25
26
#include <atomic>
27
#include <signal.h>
28
#include <mutex>
29
#include <list>
30
#include <condition_variable>
31
#include <algorithm>
32
33
#ifdef _WIN32
34
  #include <windows.h>
35
  #include <processthreadsapi.h>
36
#endif
37
38
#if LOG4CXX_EVENTS_AT_EXIT
39
#include <log4cxx/private/atexitregistry.h>
40
#endif
41
#if !defined(LOG4CXX)
42
  #define LOG4CXX 1
43
#endif
44
#include <log4cxx/helpers/aprinitializer.h>
45
46
namespace LOG4CXX_NS
47
{
48
namespace helpers
49
{
50
51
struct ThreadUtility::priv_data
52
{
53
  priv_data()
54
#if LOG4CXX_EVENTS_AT_EXIT
55
    : atExitRegistryRaii{ [this]{ stopThread(); } }
56
#endif
57
1
  {
58
1
  }
59
60
  ~priv_data()
61
1
  { stopThread(); }
62
63
  ThreadStartPre  start_pre{nullptr};
64
  ThreadStarted   started{nullptr};
65
  ThreadStartPost start_post{nullptr};
66
67
  using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
68
  struct NamedPeriodicFunction
69
  {
70
    LogString             name;
71
    Period                delay;
72
    TimePoint             nextRun;
73
    std::function<void()> f;
74
    int                   errorCount;
75
    bool                  removed;
76
  };
77
  using JobStore = std::list<NamedPeriodicFunction>;
78
  JobStore                  jobs;
79
  std::recursive_mutex      job_mutex;
80
  std::thread               thread;
81
  std::condition_variable   interrupt;
82
  std::mutex                interrupt_mutex;
83
  std::atomic<bool>         terminated{ false };
84
  int                       retryCount{ 2 };
85
  Period                    maxDelay{ 0 };
86
  std::atomic<bool>         threadIsActive{ false };
87
  LoggerPtr                 log;
88
89
  void doPeriodicTasks();
90
91
  void setTerminated()
92
913
  {
93
913
    std::lock_guard<std::recursive_mutex> lock(job_mutex);
94
913
    terminated.store(true);
95
913
  }
96
97
  void stopThread()
98
913
  {
99
913
    LOGLOG_DEBUG(log, "stopThread");
100
913
    setTerminated();
101
913
    interrupt.notify_all();
102
913
    if (thread.joinable())
103
0
      thread.join();
104
913
  }
105
106
#if LOG4CXX_EVENTS_AT_EXIT
107
  helpers::AtExitRegistry::Raii atExitRegistryRaii;
108
#endif
109
};
110
111
#if LOG4CXX_HAS_PTHREAD_SIGMASK
112
  static thread_local sigset_t old_mask;
113
  static thread_local bool sigmask_valid;
114
#endif
115
116
ThreadUtility::ThreadUtility()
117
1
  : m_priv( std::make_unique<priv_data>() )
118
1
{
119
  // Block signals by default.
120
1
  configureFuncs( std::bind( &ThreadUtility::preThreadBlockSignals, this ),
121
1
    nullptr,
122
1
    std::bind( &ThreadUtility::postThreadUnblockSignals, this ) );
123
1
}
124
125
1
ThreadUtility::~ThreadUtility() {}
126
127
auto ThreadUtility::instancePtr() -> ManagerPtr
128
912
{
129
912
  auto result = APRInitializer::getOrAddUnique<Manager>
130
912
    ( []() -> ObjectPtr
131
912
      { return std::make_shared<Manager>(); }
132
912
    );
133
912
  return result;
134
912
}
135
136
ThreadUtility* ThreadUtility::instance()
137
912
{
138
912
  return &instancePtr()->value();
139
912
}
140
141
void ThreadUtility::configure( ThreadConfigurationType type )
142
0
{
143
0
  auto utility = instance();
144
145
0
  if ( type == ThreadConfigurationType::NoConfiguration )
146
0
  {
147
0
    utility->configureFuncs( nullptr, nullptr, nullptr );
148
0
  }
149
0
  else if ( type == ThreadConfigurationType::NameThreadOnly )
150
0
  {
151
0
    utility->configureFuncs( nullptr,
152
0
      std::bind( &ThreadUtility::threadStartedNameThread, utility,
153
0
        std::placeholders::_1,
154
0
        std::placeholders::_2,
155
0
        std::placeholders::_3 ),
156
0
      nullptr );
157
0
  }
158
0
  else if ( type == ThreadConfigurationType::BlockSignalsOnly )
159
0
  {
160
0
    utility->configureFuncs( std::bind( &ThreadUtility::preThreadBlockSignals, utility ),
161
0
      nullptr,
162
0
      std::bind( &ThreadUtility::postThreadUnblockSignals, utility ) );
163
0
  }
164
0
  else if ( type == ThreadConfigurationType::BlockSignalsAndNameThread )
165
0
  {
166
0
    utility->configureFuncs( std::bind( &ThreadUtility::preThreadBlockSignals, utility ),
167
0
      std::bind( &ThreadUtility::threadStartedNameThread, utility,
168
0
        std::placeholders::_1,
169
0
        std::placeholders::_2,
170
0
        std::placeholders::_3 ),
171
0
      std::bind( &ThreadUtility::postThreadUnblockSignals, utility ) );
172
0
  }
173
0
}
174
175
void ThreadUtility::configureFuncs( ThreadStartPre pre_start,
176
  ThreadStarted started,
177
  ThreadStartPost post_start )
178
1
{
179
1
  m_priv->start_pre = pre_start;
180
1
  m_priv->started = started;
181
1
  m_priv->start_post = post_start;
182
1
}
183
184
void ThreadUtility::preThreadBlockSignals()
185
0
{
186
0
#if LOG4CXX_HAS_PTHREAD_SIGMASK
187
0
  sigset_t set;
188
0
  sigfillset(&set);
189
190
0
  if ( pthread_sigmask(SIG_SETMASK, &set, &old_mask) < 0 )
191
0
  {
192
0
    LOGLOG_ERROR( LOG4CXX_STR("Unable to set thread sigmask") );
193
0
    sigmask_valid = false;
194
0
  }
195
0
  else
196
0
  {
197
0
    sigmask_valid = true;
198
0
  }
199
200
0
#endif /* LOG4CXX_HAS_PTHREAD_SIGMASK */
201
0
}
202
203
void ThreadUtility::threadStartedNameThread(LogString threadName,
204
  std::thread::id /*threadId*/,
205
  std::thread::native_handle_type nativeHandle)
206
0
{
207
0
#if LOG4CXX_HAS_PTHREAD_SETNAME && !(defined(_WIN32) && defined(_LIBCPP_VERSION))
208
0
  LOG4CXX_ENCODE_CHAR(sthreadName, threadName);
209
0
  if (pthread_setname_np(static_cast<pthread_t>(nativeHandle), sthreadName.c_str()) < 0) {
210
0
    LOGLOG_ERROR(LOG4CXX_STR("unable to set thread name"));
211
0
  }
212
#elif defined(_WIN32)
213
  typedef HRESULT (WINAPI *TSetThreadDescription)(HANDLE, PCWSTR);
214
  static struct initialiser
215
  {
216
    HMODULE hKernelBase;
217
    TSetThreadDescription SetThreadDescription;
218
    initialiser()
219
      : hKernelBase(GetModuleHandleA("KernelBase.dll"))
220
      , SetThreadDescription(nullptr)
221
    {
222
      if (hKernelBase)
223
        SetThreadDescription = reinterpret_cast<TSetThreadDescription>(GetProcAddress(hKernelBase, "SetThreadDescription"));
224
    }
225
  } win32Func;
226
  if (win32Func.SetThreadDescription)
227
  {
228
    LOG4CXX_ENCODE_WCHAR(wthreadName, threadName);
229
    if(FAILED(win32Func.SetThreadDescription(static_cast<HANDLE>(nativeHandle), wthreadName.c_str())))
230
      LOGLOG_ERROR( LOG4CXX_STR("unable to set thread name") );
231
  }
232
#endif
233
0
}
234
235
void ThreadUtility::postThreadUnblockSignals()
236
0
{
237
0
#if LOG4CXX_HAS_PTHREAD_SIGMASK
238
239
  // Only restore the signal mask if we were able to set it in the first place.
240
0
  if ( sigmask_valid )
241
0
  {
242
0
    if ( pthread_sigmask(SIG_SETMASK, &old_mask, nullptr) < 0 )
243
0
    {
244
0
      LOGLOG_ERROR( LOG4CXX_STR("Unable to set thread sigmask") );
245
0
    }
246
0
  }
247
248
0
#endif /* LOG4CXX_HAS_PTHREAD_SIGMASK */
249
0
}
250
251
252
ThreadStartPre ThreadUtility::preStartFunction()
253
0
{
254
0
  return m_priv->start_pre;
255
0
}
256
257
ThreadStarted ThreadUtility::threadStartedFunction()
258
0
{
259
0
  return m_priv->started;
260
0
}
261
262
ThreadStartPost ThreadUtility::postStartFunction()
263
0
{
264
0
  return m_priv->start_post;
265
0
}
266
267
/**
268
 * Add a periodic task
269
 */
270
void ThreadUtility::addPeriodicTask(const LogString& name, std::function<void()> f, const Period& delay)
271
0
{
272
0
  if (!m_priv->log)
273
0
    m_priv->log = Logger::getLogger("ThreadUtility");
274
0
  LOGLOG_DEBUG(m_priv->log, LOG4CXX_STR("addPeriodicTask: ") << name);
275
0
  std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
276
0
  if (m_priv->maxDelay < delay)
277
0
    m_priv->maxDelay = delay;
278
0
  auto currentTime = std::chrono::system_clock::now();
279
0
  m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay, currentTime + delay, f, 0, false} );
280
281
  // Restart thread if it has stopped.
282
0
  if (!m_priv->threadIsActive.load() && m_priv->thread.joinable())
283
0
    m_priv->thread.join();
284
285
0
  if (!m_priv->thread.joinable())
286
0
  {
287
0
    m_priv->terminated.store(false);
288
0
    m_priv->threadIsActive.store(true);
289
0
    m_priv->thread = createThread(LOG4CXX_STR("log4cxx"), [this]()
290
0
      {
291
0
        LOGLOG_DEBUG(m_priv->log, "doPeriodicTasks: " << "started");
292
0
        m_priv->doPeriodicTasks();
293
0
        LOGLOG_DEBUG(m_priv->log, "doPeriodicTasks: " << "stopped");
294
0
        m_priv->threadIsActive.store(false);
295
0
      });
296
0
  }
297
0
  else
298
0
    m_priv->interrupt.notify_one();
299
0
}
300
301
/**
302
 * Is this already running a \c taskName periodic task?
303
 */
304
bool ThreadUtility::hasPeriodicTask(const LogString& name)
305
0
{
306
0
  std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
307
0
  auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
308
0
    , [&name](const priv_data::NamedPeriodicFunction& item)
309
0
    { return !item.removed && name == item.name; }
310
0
    );
311
0
  return m_priv->jobs.end() != pItem;
312
0
}
313
314
/**
315
 * Remove all periodic tasks and stop the processing thread
316
 */
317
void ThreadUtility::removeAllPeriodicTasks()
318
912
{
319
912
  LOGLOG_DEBUG(m_priv->log, "removeAllPeriodicTasks");
320
912
  {
321
912
    std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
322
912
    while (!m_priv->jobs.empty())
323
0
      m_priv->jobs.pop_back();
324
912
  }
325
912
  m_priv->stopThread();
326
912
}
327
328
/**
329
 * Remove the \c taskName periodic task
330
 */
331
void ThreadUtility::removePeriodicTask(const LogString& name)
332
0
{
333
0
  std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
334
0
  auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
335
0
    , [&name](const priv_data::NamedPeriodicFunction& item)
336
0
    { return !item.removed && name == item.name; }
337
0
    );
338
0
  if (m_priv->jobs.end() != pItem)
339
0
  {
340
0
    LOGLOG_DEBUG(m_priv->log, LOG4CXX_STR("removePeriodicTask: ") << name);
341
0
    pItem->removed = true;
342
0
    m_priv->interrupt.notify_one();
343
0
  }
344
0
}
345
346
/**
347
 * Remove any periodic task matching \c namePrefix
348
 */
349
void ThreadUtility::removePeriodicTasksMatching(const LogString& namePrefix)
350
0
{
351
0
  while (1)
352
0
  {
353
0
    std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
354
0
    auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
355
0
      , [&namePrefix](const priv_data::NamedPeriodicFunction& item)
356
0
      { return !item.removed && namePrefix.size() <= item.name.size() && item.name.substr(0, namePrefix.size()) == namePrefix; }
357
0
      );
358
0
    if (m_priv->jobs.end() == pItem)
359
0
      break;
360
0
    pItem->removed = true;
361
0
  }
362
0
  m_priv->interrupt.notify_one();
363
0
}
364
365
// Run ready tasks
366
void ThreadUtility::priv_data::doPeriodicTasks()
367
0
{
368
0
  while (!this->terminated.load())
369
0
  {
370
0
    auto currentTime = std::chrono::system_clock::now();
371
0
    TimePoint nextOperationTime = currentTime + this->maxDelay;
372
373
    // Run each due task with job_mutex released, so a long-running callback
374
    // (e.g. a reconnect blocked on network I/O) does not stall removePeriodicTask()
375
0
    while (true)
376
0
    {
377
0
      std::function<void()> taskCallback;
378
0
      LogString taskName;
379
0
      Period taskDelay;
380
0
      bool foundTask = false;
381
382
      // Take a copy of the next due task while holding the lock
383
0
      {
384
0
        if (this->terminated.load())
385
0
          return;
386
0
        std::lock_guard<std::recursive_mutex> lock(this->job_mutex);
387
0
        auto pItem = std::find_if(this->jobs.begin(), this->jobs.end()
388
0
          , [currentTime](const NamedPeriodicFunction& item)
389
0
          { return !item.removed && item.nextRun <= currentTime; }
390
0
          );
391
0
        if (pItem != this->jobs.end())
392
0
        {
393
0
          taskCallback = pItem->f;
394
0
          taskName = pItem->name;
395
0
          taskDelay = pItem->delay;
396
0
          foundTask = true;
397
0
        }
398
0
      }
399
400
      // No more tasks are due, leave the loop
401
0
      if (!foundTask)
402
0
      {
403
0
        break;
404
0
      }
405
406
      // Execute the callback outside the lock
407
0
      bool success = false;
408
0
      try
409
0
      {
410
0
        taskCallback();
411
0
        success = true;
412
0
      }
413
0
      catch (std::exception& ex)
414
0
      {
415
0
        LogLog::warn(taskName, ex);
416
0
      }
417
0
      catch (...)
418
0
      {
419
0
        LogLog::warn(taskName + LOG4CXX_STR(" threw an exception"));
420
0
      }
421
422
      // Re-find the task (it may have been removed while running) and reschedule it
423
0
      {
424
0
        std::lock_guard<std::recursive_mutex> lock(this->job_mutex);
425
0
        auto pItem = std::find_if(this->jobs.begin(), this->jobs.end()
426
0
          , [&taskName](const NamedPeriodicFunction& item)
427
0
          { return !item.removed && taskName == item.name; }
428
0
          );
429
430
0
        if (pItem != this->jobs.end())
431
0
        {
432
          // Always push nextRun out, so a failing task waits before the next retry
433
0
          pItem->nextRun = std::chrono::system_clock::now() + taskDelay;
434
0
          if (success)
435
0
            pItem->errorCount = 0;
436
0
          else
437
0
            ++pItem->errorCount;
438
0
        }
439
0
      }
440
0
    }
441
442
    // Update nextOperationTime under the lock
443
0
    {
444
0
      std::lock_guard<std::recursive_mutex> lock(this->job_mutex);
445
0
      for (const auto& item : this->jobs)
446
0
      {
447
0
        if (!item.removed && item.nextRun < nextOperationTime)
448
0
        {
449
0
          nextOperationTime = item.nextRun;
450
0
        }
451
0
      }
452
0
    }
453
454
    // Delete removed and faulty tasks
455
0
    while (1)
456
0
    {
457
0
      std::lock_guard<std::recursive_mutex> lock(this->job_mutex);
458
0
      auto pItem = std::find_if(this->jobs.begin(), this->jobs.end()
459
0
        , [this](const NamedPeriodicFunction& item)
460
0
        { return item.removed || this->retryCount < item.errorCount; }
461
0
        );
462
0
      if (this->jobs.end() == pItem)
463
0
        break;
464
0
      LOGLOG_DEBUG(this->log, LOG4CXX_STR("doPeriodicTasks: erase ") << pItem->name);
465
0
      this->jobs.erase(pItem);
466
0
      if (this->jobs.empty())
467
0
        return;
468
0
    }
469
470
    // Wait until the next task is due or an add/remove/shutdown wakes us
471
0
    std::unique_lock<std::mutex> lock(this->interrupt_mutex);
472
0
    this->interrupt.wait_until(lock, nextOperationTime);
473
0
  }
474
0
}
475
476
} //namespace helpers
477
} //namespace log4cxx