Coverage Report

Created: 2025-07-01 06:08

/src/logging-log4cxx/src/main/cpp/asyncappender.cpp
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#include <log4cxx/asyncappender.h>
19
20
#include <log4cxx/helpers/loglog.h>
21
#include <log4cxx/spi/loggingevent.h>
22
#include <log4cxx/helpers/stringhelper.h>
23
#include <log4cxx/helpers/optionconverter.h>
24
#include <log4cxx/helpers/threadutility.h>
25
#include <log4cxx/private/appenderskeleton_priv.h>
26
#include <thread>
27
#include <atomic>
28
#include <condition_variable>
29
30
#if LOG4CXX_EVENTS_AT_EXIT
31
#include <log4cxx/private/atexitregistry.h>
32
#endif
33
34
using namespace LOG4CXX_NS;
35
using namespace LOG4CXX_NS::helpers;
36
using namespace LOG4CXX_NS::spi;
37
38
#if 15 < LOG4CXX_ABI_VERSION
39
namespace
40
{
41
#endif
42
43
/**
44
 * The default buffer size is set to 128 events.
45
*/
46
enum { DEFAULT_BUFFER_SIZE = 128 };
47
48
class DiscardSummary
49
{
50
  private:
51
    /**
52
     * First event of the highest severity.
53
    */
54
    LoggingEventPtr maxEvent;
55
56
    /**
57
    * Total count of messages discarded.
58
    */
59
    int count;
60
61
  public:
62
    /**
63
     * Create new instance.
64
     *
65
     * @param event event, may not be null.
66
    */
67
    DiscardSummary(const LoggingEventPtr& event);
68
    /** Copy constructor.  */
69
    DiscardSummary(const DiscardSummary& src);
70
    /** Assignment operator. */
71
    DiscardSummary& operator=(const DiscardSummary& src);
72
73
    /**
74
     * Add discarded event to summary.
75
     *
76
     * @param event event, may not be null.
77
    */
78
    void add(const LoggingEventPtr& event);
79
80
    /**
81
     * Create an event with a discard count and the message from \c maxEvent.
82
     *
83
     * @return the new event.
84
     */
85
    LoggingEventPtr createEvent(Pool& p);
86
87
#if LOG4CXX_ABI_VERSION <= 15
88
    static
89
    ::LOG4CXX_NS::spi::LoggingEventPtr createEvent(::LOG4CXX_NS::helpers::Pool& p,
90
      size_t discardedCount);
91
#endif
92
93
    /**
94
    * The number of messages discarded.
95
    */
96
0
    int getCount() const { return count; }
97
};
98
99
typedef std::map<LogString, DiscardSummary> DiscardMap;
100
101
#if 15 < LOG4CXX_ABI_VERSION
102
}
103
#endif
104
105
#ifdef __cpp_lib_hardware_interference_size
106
  using std::hardware_constructive_interference_size;
107
  using std::hardware_destructive_interference_size;
108
#else
109
  // 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
110
  constexpr std::size_t hardware_constructive_interference_size = 64;
111
  constexpr std::size_t hardware_destructive_interference_size = 64;
112
#endif
113
114
struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkeletonPrivate
115
{
116
  AsyncAppenderPriv()
117
0
    : AppenderSkeletonPrivate()
118
0
    , buffer(DEFAULT_BUFFER_SIZE)
119
0
    , bufferSize(DEFAULT_BUFFER_SIZE)
120
0
    , dispatcher()
121
0
    , locationInfo(false)
122
0
    , blocking(true)
123
#if LOG4CXX_EVENTS_AT_EXIT
124
    , atExitRegistryRaii([this]{stopDispatcher();})
125
#endif
126
0
    , eventCount(0)
127
0
    , dispatchedCount(0)
128
0
    , commitCount(0)
129
0
    { }
130
131
  ~AsyncAppenderPriv()
132
0
  {
133
0
    stopDispatcher();
134
0
  }
135
136
  /**
137
   * Event buffer.
138
  */
139
  struct EventData
140
  {
141
    LoggingEventPtr event;
142
    size_t pendingCount;
143
  };
144
  std::vector<EventData> buffer;
145
146
  /**
147
   *  Mutex used to guard access to buffer and discardMap.
148
   */
149
  std::mutex bufferMutex;
150
151
  std::condition_variable bufferNotFull;
152
  std::condition_variable bufferNotEmpty;
153
154
  /**
155
    * Map of DiscardSummary objects keyed by logger name.
156
  */
157
  DiscardMap discardMap;
158
159
  /**
160
   * The maximum number of undispatched events.
161
  */
162
  int bufferSize;
163
164
  /**
165
   * Nested appenders.
166
  */
167
  helpers::AppenderAttachableImpl appenders;
168
169
  /**
170
   *  Dispatcher.
171
   */
172
  std::thread dispatcher;
173
174
  void stopDispatcher()
175
0
  {
176
0
    this->setClosed();
177
0
    bufferNotEmpty.notify_all();
178
0
    bufferNotFull.notify_all();
179
180
0
    if (dispatcher.joinable())
181
0
    {
182
0
      dispatcher.join();
183
0
    }
184
0
  }
185
186
  /**
187
   * Should location info be included in dispatched messages.
188
  */
189
  bool locationInfo;
190
191
  /**
192
   * Does appender block when buffer is full.
193
  */
194
  bool blocking;
195
196
#if LOG4CXX_EVENTS_AT_EXIT
197
  helpers::AtExitRegistry::Raii atExitRegistryRaii;
198
#endif
199
200
  /**
201
   * Used to calculate the buffer position at which to store the next event.
202
  */
203
  alignas(hardware_constructive_interference_size) std::atomic<size_t> eventCount;
204
205
  /**
206
   * Used to calculate the buffer position from which to extract the next event.
207
  */
208
  alignas(hardware_constructive_interference_size) std::atomic<size_t> dispatchedCount;
209
210
  /**
211
   * Used to communicate to the dispatch thread when an event is committed in buffer.
212
  */
213
  alignas(hardware_constructive_interference_size) std::atomic<size_t> commitCount;
214
215
  bool isClosed()
216
0
  {
217
0
    std::lock_guard<std::mutex> lock(this->bufferMutex);
218
0
    return this->closed;
219
0
  }
220
221
  void setClosed()
222
0
  {
223
0
    std::lock_guard<std::mutex> lock(this->bufferMutex);
224
0
    this->closed = true;
225
0
  }
226
227
  /**
228
   * Used to ensure the dispatch thread does not wait when a logging thread is waiting.
229
  */
230
  int blockedCount{0};
231
};
232
233
234
IMPLEMENT_LOG4CXX_OBJECT(AsyncAppender)
235
236
0
#define priv static_cast<AsyncAppenderPriv*>(m_priv.get())
237
238
AsyncAppender::AsyncAppender()
239
0
  : AppenderSkeleton(std::make_unique<AsyncAppenderPriv>())
240
0
{
241
0
}
Unexecuted instantiation: log4cxx::AsyncAppender::AsyncAppender()
Unexecuted instantiation: log4cxx::AsyncAppender::AsyncAppender()
242
243
AsyncAppender::~AsyncAppender()
244
0
{
245
0
  finalize();
246
0
}
247
248
void AsyncAppender::addAppender(const AppenderPtr newAppender)
249
0
{
250
0
  priv->appenders.addAppender(newAppender);
251
0
}
252
253
254
void AsyncAppender::setOption(const LogString& option,
255
  const LogString& value)
256
0
{
257
0
  if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("LOCATIONINFO"), LOG4CXX_STR("locationinfo")))
258
0
  {
259
0
    setLocationInfo(OptionConverter::toBoolean(value, false));
260
0
  }
261
262
0
  if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BUFFERSIZE"), LOG4CXX_STR("buffersize")))
263
0
  {
264
0
    setBufferSize(OptionConverter::toInt(value, DEFAULT_BUFFER_SIZE));
265
0
  }
266
267
0
  if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BLOCKING"), LOG4CXX_STR("blocking")))
268
0
  {
269
0
    setBlocking(OptionConverter::toBoolean(value, true));
270
0
  }
271
0
  else
272
0
  {
273
0
    AppenderSkeleton::setOption(option, value);
274
0
  }
275
0
}
276
277
278
void AsyncAppender::doAppend(const spi::LoggingEventPtr& event, Pool& pool1)
279
0
{
280
0
  doAppendImpl(event, pool1);
281
0
}
282
283
void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p)
284
0
{
285
0
  if (priv->bufferSize <= 0)
286
0
  {
287
0
    priv->appenders.appendLoopOnAppenders(event, p);
288
0
  }
289
290
  // Get a copy of this thread's diagnostic context
291
0
  event->LoadDC();
292
293
0
  if (!priv->dispatcher.joinable())
294
0
  {
295
0
    std::lock_guard<std::recursive_mutex> lock(priv->mutex);
296
0
    if (!priv->dispatcher.joinable())
297
0
      priv->dispatcher = ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"), &AsyncAppender::dispatch, this );
298
0
  }
299
0
  while (true)
300
0
  {
301
0
    auto pendingCount = priv->eventCount - priv->dispatchedCount;
302
0
    if (0 <= pendingCount && pendingCount < priv->bufferSize)
303
0
    {
304
      // Claim a slot in the ring buffer
305
0
      auto oldEventCount = priv->eventCount++;
306
0
      auto index = oldEventCount % priv->buffer.size();
307
      // Wait for a free slot
308
0
      while (priv->bufferSize <= oldEventCount - priv->dispatchedCount)
309
0
        std::this_thread::yield(); // Allow the dispatch thread to free a slot
310
      // Write to the ring buffer
311
0
      priv->buffer[index] = AsyncAppenderPriv::EventData{event, pendingCount};
312
      // Notify the dispatch thread that an event has been added
313
0
      auto failureCount = 0;
314
0
      auto savedEventCount = oldEventCount;
315
0
      while (!priv->commitCount.compare_exchange_weak(oldEventCount, oldEventCount + 1, std::memory_order_release))
316
0
      {
317
0
        oldEventCount = savedEventCount;
318
0
        if (2 < ++failureCount) // Did the scheduler suspend a thread between claiming a slot and advancing commitCount?
319
0
          std::this_thread::yield(); // Wait a bit
320
0
      }
321
0
      priv->bufferNotEmpty.notify_all();
322
0
      break;
323
0
    }
324
    //
325
    //   Following code is only reachable if buffer is full or eventCount has overflowed
326
    //
327
0
    std::unique_lock<std::mutex> lock(priv->bufferMutex);
328
0
    priv->bufferNotEmpty.notify_all();
329
    //
330
    //   if blocking and thread is not already interrupted
331
    //      and not the dispatcher then
332
    //      wait for a buffer notification
333
0
    bool discard = true;
334
335
0
    if (priv->blocking
336
0
      && !priv->closed
337
0
      && (priv->dispatcher.get_id() != std::this_thread::get_id()) )
338
0
    {
339
0
      ++priv->blockedCount;
340
0
      priv->bufferNotFull.wait(lock, [this]()
341
0
      {
342
0
        return priv->eventCount - priv->dispatchedCount < priv->bufferSize;
343
0
      });
344
0
      --priv->blockedCount;
345
0
      discard = false;
346
0
    }
347
348
    //
349
    //   if blocking is false or thread has been interrupted
350
    //   add event to discard map.
351
    //
352
0
    if (discard)
353
0
    {
354
0
      LogString loggerName = event->getLoggerName();
355
0
      DiscardMap::iterator iter = priv->discardMap.find(loggerName);
356
357
0
      if (iter == priv->discardMap.end())
358
0
      {
359
0
        DiscardSummary summary(event);
360
0
        priv->discardMap.insert(DiscardMap::value_type(loggerName, summary));
361
0
      }
362
0
      else
363
0
      {
364
0
        (*iter).second.add(event);
365
0
      }
366
367
0
      break;
368
0
    }
369
0
  }
370
0
}
371
372
void AsyncAppender::close()
373
0
{
374
0
  priv->stopDispatcher();
375
0
  for (auto item : priv->appenders.getAllAppenders())
376
0
  {
377
0
    item->close();
378
0
  }
379
0
}
380
381
AppenderList AsyncAppender::getAllAppenders() const
382
0
{
383
0
  return priv->appenders.getAllAppenders();
384
0
}
385
386
AppenderPtr AsyncAppender::getAppender(const LogString& n) const
387
0
{
388
0
  return priv->appenders.getAppender(n);
389
0
}
390
391
bool AsyncAppender::isAttached(const AppenderPtr appender) const
392
0
{
393
0
  return priv->appenders.isAttached(appender);
394
0
}
395
396
bool AsyncAppender::requiresLayout() const
397
0
{
398
0
  return false;
399
0
}
400
401
void AsyncAppender::removeAllAppenders()
402
0
{
403
0
  priv->appenders.removeAllAppenders();
404
0
}
405
406
void AsyncAppender::removeAppender(const AppenderPtr appender)
407
0
{
408
0
  priv->appenders.removeAppender(appender);
409
0
}
410
411
void AsyncAppender::removeAppender(const LogString& n)
412
0
{
413
0
  priv->appenders.removeAppender(n);
414
0
}
415
416
bool AsyncAppender::replaceAppender(const AppenderPtr& oldAppender, const AppenderPtr& newAppender)
417
0
{
418
0
  return priv->appenders.replaceAppender(oldAppender, newAppender);
419
0
}
420
421
void AsyncAppender::replaceAppenders( const AppenderList& newList)
422
0
{
423
0
  priv->appenders.replaceAppenders(newList);
424
0
}
425
426
bool AsyncAppender::getLocationInfo() const
427
0
{
428
0
  return priv->locationInfo;
429
0
}
430
431
void AsyncAppender::setLocationInfo(bool flag)
432
0
{
433
0
  priv->locationInfo = flag;
434
0
}
435
436
437
void AsyncAppender::setBufferSize(int size)
438
0
{
439
0
  if (size < 0)
440
0
  {
441
0
    throw IllegalArgumentException(LOG4CXX_STR("size argument must be non-negative"));
442
0
  }
443
444
0
  std::lock_guard<std::mutex> lock(priv->bufferMutex);
445
0
  priv->bufferSize = (size < 1) ? 1 : size;
446
0
  priv->buffer.resize(priv->bufferSize);
447
0
  priv->bufferNotFull.notify_all();
448
0
}
449
450
int AsyncAppender::getBufferSize() const
451
0
{
452
0
  return priv->bufferSize;
453
0
}
454
455
void AsyncAppender::setBlocking(bool value)
456
0
{
457
0
  std::lock_guard<std::mutex> lock(priv->bufferMutex);
458
0
  priv->blocking = value;
459
0
  priv->bufferNotFull.notify_all();
460
0
}
461
462
bool AsyncAppender::getBlocking() const
463
0
{
464
0
  return priv->blocking;
465
0
}
466
467
DiscardSummary::DiscardSummary(const LoggingEventPtr& event) :
468
0
  maxEvent(event), count(1)
469
0
{
470
0
}
471
472
DiscardSummary::DiscardSummary(const DiscardSummary& src) :
473
0
  maxEvent(src.maxEvent), count(src.count)
474
0
{
475
0
}
476
477
DiscardSummary& DiscardSummary::operator=(const DiscardSummary& src)
478
0
{
479
0
  maxEvent = src.maxEvent;
480
0
  count = src.count;
481
0
  return *this;
482
0
}
483
484
void DiscardSummary::add(const LoggingEventPtr& event)
485
0
{
486
0
  if (event->getLevel()->toInt() > maxEvent->getLevel()->toInt())
487
0
  {
488
0
    maxEvent = event;
489
0
  }
490
491
0
  count++;
492
0
}
493
494
LoggingEventPtr DiscardSummary::createEvent(Pool& p)
495
0
 {
496
0
  LogString msg(LOG4CXX_STR("Discarded "));
497
0
  StringHelper::toString(count, p, msg);
498
0
  msg.append(LOG4CXX_STR(" messages due to a full event buffer including: "));
499
0
  msg.append(maxEvent->getMessage());
500
0
  return std::make_shared<LoggingEvent>(
501
0
        maxEvent->getLoggerName(),
502
0
        maxEvent->getLevel(),
503
0
        msg,
504
0
        LocationInfo::getLocationUnavailable() );
505
0
}
506
507
#if LOG4CXX_ABI_VERSION <= 15
508
::LOG4CXX_NS::spi::LoggingEventPtr
509
DiscardSummary::createEvent(::LOG4CXX_NS::helpers::Pool& p,
510
  size_t discardedCount)
511
0
{
512
0
  LogString msg(LOG4CXX_STR("Discarded "));
513
0
  StringHelper::toString(discardedCount, p, msg);
514
0
  msg.append(LOG4CXX_STR(" messages due to a full event buffer"));
515
516
0
  return std::make_shared<LoggingEvent>(
517
0
        LOG4CXX_STR(""),
518
0
        LOG4CXX_NS::Level::getError(),
519
0
        msg,
520
0
        LocationInfo::getLocationUnavailable() );
521
0
}
522
#endif
523
524
525
void AsyncAppender::dispatch()
526
0
{
527
0
  size_t discardCount = 0;
528
0
  std::vector<size_t> pendingCountHistogram(priv->bufferSize, 0);
529
0
  bool isActive = true;
530
531
0
  while (isActive)
532
0
  {
533
0
    Pool p;
534
0
    LoggingEventList events;
535
0
    events.reserve(priv->bufferSize);
536
0
    for (int count = 0; count < 2 && priv->dispatchedCount == priv->commitCount; ++count)
537
0
      std::this_thread::yield(); // Wait a bit
538
0
    if (priv->dispatchedCount == priv->commitCount)
539
0
    {
540
0
      std::unique_lock<std::mutex> lock(priv->bufferMutex);
541
0
      priv->bufferNotEmpty.wait(lock, [this]() -> bool
542
0
        { return 0 < priv->blockedCount || priv->dispatchedCount != priv->commitCount || priv->closed; }
543
0
      );
544
0
    }
545
0
    isActive = !priv->isClosed();
546
547
0
    while (events.size() < priv->bufferSize && priv->dispatchedCount != priv->commitCount)
548
0
    {
549
0
      auto index = priv->dispatchedCount % priv->buffer.size();
550
0
      const auto& data = priv->buffer[index];
551
0
      events.push_back(data.event);
552
0
      if (data.pendingCount < pendingCountHistogram.size())
553
0
        ++pendingCountHistogram[data.pendingCount];
554
0
      ++priv->dispatchedCount;
555
0
    }
556
0
    priv->bufferNotFull.notify_all();
557
0
    {
558
0
      std::lock_guard<std::mutex> lock(priv->bufferMutex);
559
0
      for (auto discardItem : priv->discardMap)
560
0
      {
561
0
        events.push_back(discardItem.second.createEvent(p));
562
0
        discardCount += discardItem.second.getCount();
563
0
      }
564
0
      priv->discardMap.clear();
565
0
    }
566
567
0
    for (auto item : events)
568
0
    {
569
0
      try
570
0
      {
571
0
        priv->appenders.appendLoopOnAppenders(item, p);
572
0
      }
573
0
      catch (std::exception& ex)
574
0
      {
575
0
        if (!priv->isClosed())
576
0
        {
577
0
          priv->errorHandler->error(LOG4CXX_STR("async dispatcher"), ex, 0, item);
578
0
          isActive = false;
579
0
        }
580
0
      }
581
0
      catch (...)
582
0
      {
583
0
        if (!priv->isClosed())
584
0
        {
585
0
          priv->errorHandler->error(LOG4CXX_STR("async dispatcher"));
586
0
          isActive = false;
587
0
        }
588
0
      }
589
0
    }
590
0
  }
591
0
  if (LogLog::isDebugEnabled())
592
0
  {
593
0
    Pool p;
594
0
    LogString msg(LOG4CXX_STR("AsyncAppender"));
595
0
    msg += LOG4CXX_STR(" discardCount ");
596
0
    StringHelper::toString(discardCount, p, msg);
597
0
    msg += LOG4CXX_STR(" pendingCountHistogram");
598
0
    for (auto item : pendingCountHistogram)
599
0
    {
600
0
      msg += logchar(' ');
601
0
      StringHelper::toString(item, p, msg);
602
0
    }
603
0
    LogLog::debug(msg);
604
0
  }
605
606
0
}