Coverage Report

Created: 2025-10-10 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/logging-log4cxx/src/main/cpp/asyncappender.cpp
Line
Count
Source
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#include <log4cxx/asyncappender.h>
19
20
#include <log4cxx/helpers/loglog.h>
21
#include <log4cxx/spi/loggingevent.h>
22
#include <log4cxx/helpers/stringhelper.h>
23
#include <log4cxx/helpers/optionconverter.h>
24
#include <log4cxx/helpers/threadutility.h>
25
#include <log4cxx/private/appenderskeleton_priv.h>
26
#include <thread>
27
#include <atomic>
28
#include <condition_variable>
29
30
#if LOG4CXX_EVENTS_AT_EXIT
31
#include <log4cxx/private/atexitregistry.h>
32
#endif
33
34
using namespace LOG4CXX_NS;
35
using namespace LOG4CXX_NS::helpers;
36
using namespace LOG4CXX_NS::spi;
37
38
#if 15 < LOG4CXX_ABI_VERSION
39
namespace
40
{
41
#endif
42
43
/**
44
 * The default buffer size is set to 128 events.
45
*/
46
enum { DEFAULT_BUFFER_SIZE = 128 };
47
48
class DiscardSummary
49
{
50
  private:
51
    /**
52
     * First event of the highest severity.
53
    */
54
    LoggingEventPtr maxEvent;
55
56
    /**
57
    * Total count of messages discarded.
58
    */
59
    int count;
60
61
    /**
62
    * Why created
63
    */
64
    LogString reason;
65
66
  public:
67
    /**
68
     * Create new instance.
69
     *
70
     * @param event must not be null.
71
    */
72
    DiscardSummary(const LoggingEventPtr& event, const LogString& reason);
73
74
    /** Move values from \c src into a new instance.
75
    */
76
    DiscardSummary(DiscardSummary&& src);
77
#if 15 < LOG4CXX_ABI_VERSION
78
    /** Copy constructor.  */
79
    DiscardSummary(const DiscardSummary&) = delete;
80
    /** Assignment operator. */
81
    DiscardSummary& operator=(const DiscardSummary&) = delete;
82
#else
83
    /**
84
     * Create new instance.
85
     *
86
     * @param event event, may not be null.
87
    */
88
    DiscardSummary(const LoggingEventPtr& event);
89
    /** Copy constructor.  */
90
    DiscardSummary(const DiscardSummary& src);
91
    /** Assignment operator. */
92
    DiscardSummary& operator=(const DiscardSummary& src);
93
#endif
94
95
    /**
96
     * Add discarded event to summary.
97
     *
98
     * @param event event, may not be null.
99
    */
100
    void add(const LoggingEventPtr& event);
101
102
    /**
103
     * Create an event with a discard count and the message from \c maxEvent.
104
     *
105
     * @return the new event.
106
     */
107
    LoggingEventPtr createEvent(Pool& p);
108
109
#if LOG4CXX_ABI_VERSION <= 15
110
    static
111
    ::LOG4CXX_NS::spi::LoggingEventPtr createEvent(::LOG4CXX_NS::helpers::Pool& p,
112
      size_t discardedCount);
113
#endif
114
115
    /**
116
    * The number of messages discarded.
117
    */
118
0
    int getCount() const { return count; }
119
};
120
121
typedef std::map<LogString, DiscardSummary> DiscardMap;
122
123
#if 15 < LOG4CXX_ABI_VERSION
124
}
125
#endif
126
127
#ifdef __cpp_lib_hardware_interference_size
128
  using std::hardware_constructive_interference_size;
129
  using std::hardware_destructive_interference_size;
130
#else
131
  // 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
132
  constexpr std::size_t hardware_constructive_interference_size = 64;
133
  constexpr std::size_t hardware_destructive_interference_size = 64;
134
#endif
135
136
struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkeletonPrivate
137
{
138
  AsyncAppenderPriv()
139
0
    : AppenderSkeletonPrivate()
140
0
    , buffer(DEFAULT_BUFFER_SIZE)
141
0
    , bufferSize(DEFAULT_BUFFER_SIZE)
142
0
    , dispatcher()
143
0
    , locationInfo(false)
144
0
    , blocking(true)
145
#if LOG4CXX_EVENTS_AT_EXIT
146
    , atExitRegistryRaii([this]{stopDispatcher();})
147
#endif
148
0
    , eventCount(0)
149
0
    , dispatchedCount(0)
150
0
    , commitCount(0)
151
0
    { }
152
153
  ~AsyncAppenderPriv()
154
0
  {
155
0
    stopDispatcher();
156
0
  }
157
158
  /**
159
   * Event buffer.
160
  */
161
  struct EventData
162
  {
163
    LoggingEventPtr event;
164
    size_t pendingCount;
165
  };
166
  std::vector<EventData> buffer;
167
168
  /**
169
   *  Mutex used to guard access to buffer and discardMap.
170
   */
171
  std::mutex bufferMutex;
172
173
  std::condition_variable bufferNotFull;
174
  std::condition_variable bufferNotEmpty;
175
176
  /**
177
    * Map of DiscardSummary objects keyed by logger name.
178
  */
179
  DiscardMap discardMap;
180
181
  /**
182
   * The maximum number of undispatched events.
183
  */
184
  int bufferSize;
185
186
  /**
187
   * Nested appenders.
188
  */
189
  helpers::AppenderAttachableImpl appenders;
190
191
  /**
192
   *  Dispatcher.
193
   */
194
  std::thread dispatcher;
195
196
  void stopDispatcher()
197
0
  {
198
0
    this->setClosed();
199
0
    bufferNotEmpty.notify_all();
200
0
    bufferNotFull.notify_all();
201
202
0
    if (dispatcher.joinable())
203
0
    {
204
0
      dispatcher.join();
205
0
    }
206
0
  }
207
208
  /**
209
   * Should location info be included in dispatched messages.
210
  */
211
  bool locationInfo;
212
213
  /**
214
   * Does appender block when buffer is full.
215
  */
216
  bool blocking;
217
218
#if LOG4CXX_EVENTS_AT_EXIT
219
  helpers::AtExitRegistry::Raii atExitRegistryRaii;
220
#endif
221
222
  /**
223
   * Used to calculate the buffer position at which to store the next event.
224
  */
225
  alignas(hardware_constructive_interference_size) std::atomic<size_t> eventCount;
226
227
  /**
228
   * Used to calculate the buffer position from which to extract the next event.
229
  */
230
  alignas(hardware_constructive_interference_size) std::atomic<size_t> dispatchedCount;
231
232
  /**
233
   * Used to communicate to the dispatch thread when an event is committed in buffer.
234
  */
235
  alignas(hardware_constructive_interference_size) std::atomic<size_t> commitCount;
236
237
  bool isClosed()
238
0
  {
239
0
    std::lock_guard<std::mutex> lock(this->bufferMutex);
240
0
    return this->closed;
241
0
  }
242
243
  void setClosed()
244
0
  {
245
0
    std::lock_guard<std::mutex> lock(this->bufferMutex);
246
0
    this->closed = true;
247
0
  }
248
249
  /**
250
   * Used to ensure the dispatch thread does not wait when a logging thread is waiting.
251
  */
252
  alignas(hardware_constructive_interference_size) int blockedCount{0};
253
};
254
255
256
IMPLEMENT_LOG4CXX_OBJECT(AsyncAppender)
257
258
0
#define priv static_cast<AsyncAppenderPriv*>(m_priv.get())
259
260
AsyncAppender::AsyncAppender()
261
0
  : AppenderSkeleton(std::make_unique<AsyncAppenderPriv>())
262
0
{
263
0
}
Unexecuted instantiation: log4cxx::AsyncAppender::AsyncAppender()
Unexecuted instantiation: log4cxx::AsyncAppender::AsyncAppender()
264
265
AsyncAppender::~AsyncAppender()
266
0
{
267
0
  finalize();
268
0
}
269
270
void AsyncAppender::addAppender(const AppenderPtr newAppender)
271
0
{
272
0
  priv->appenders.addAppender(newAppender);
273
0
}
274
275
276
void AsyncAppender::setOption(const LogString& option,
277
  const LogString& value)
278
0
{
279
0
  if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("LOCATIONINFO"), LOG4CXX_STR("locationinfo")))
280
0
  {
281
0
    setLocationInfo(OptionConverter::toBoolean(value, false));
282
0
  }
283
284
0
  if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BUFFERSIZE"), LOG4CXX_STR("buffersize")))
285
0
  {
286
0
    setBufferSize(OptionConverter::toInt(value, DEFAULT_BUFFER_SIZE));
287
0
  }
288
289
0
  if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BLOCKING"), LOG4CXX_STR("blocking")))
290
0
  {
291
0
    setBlocking(OptionConverter::toBoolean(value, true));
292
0
  }
293
0
  else
294
0
  {
295
0
    AppenderSkeleton::setOption(option, value);
296
0
  }
297
0
}
298
299
300
void AsyncAppender::doAppend(const spi::LoggingEventPtr& event, Pool& pool1)
301
0
{
302
0
  doAppendImpl(event, pool1);
303
0
}
304
305
void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p)
306
0
{
307
0
  if (priv->bufferSize <= 0)
308
0
  {
309
0
    priv->appenders.appendLoopOnAppenders(event, p);
310
0
    return;
311
0
  }
312
313
  // Get a copy of this thread's diagnostic context
314
0
  event->LoadDC();
315
316
0
  if (!priv->dispatcher.joinable())
317
0
  {
318
0
    std::lock_guard<std::recursive_mutex> lock(priv->mutex);
319
0
    if (!priv->dispatcher.joinable())
320
0
      priv->dispatcher = ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"), &AsyncAppender::dispatch, this );
321
0
  }
322
0
  if (priv->dispatcher.get_id() == std::this_thread::get_id()) // From an appender attached to this?
323
0
  {
324
0
    std::unique_lock<std::mutex> lock(priv->bufferMutex);
325
0
    auto loggerName = event->getLoggerName();
326
0
    auto iter = priv->discardMap.find(loggerName);
327
0
    if (priv->discardMap.end() == iter)
328
0
      priv->discardMap.emplace(loggerName, DiscardSummary{ event, LOG4CXX_STR("from an attached appender") });
329
0
    else
330
0
      iter->second.add(event);
331
0
  }
332
0
  else while (true)
333
0
  {
334
0
    auto pendingCount = priv->eventCount - priv->dispatchedCount;
335
0
    if (0 <= pendingCount && pendingCount < priv->bufferSize)
336
0
    {
337
      // Claim a slot in the ring buffer
338
0
      auto oldEventCount = priv->eventCount++;
339
0
      auto index = oldEventCount % priv->buffer.size();
340
      // Wait for a free slot
341
0
      while (priv->bufferSize <= oldEventCount - priv->dispatchedCount)
342
0
        std::this_thread::yield(); // Allow the dispatch thread to free a slot
343
      // Write to the ring buffer
344
0
      priv->buffer[index] = AsyncAppenderPriv::EventData{event, pendingCount};
345
      // Notify the dispatch thread that an event has been added
346
0
      auto failureCount = 0;
347
0
      auto savedEventCount = oldEventCount;
348
0
      while (!priv->commitCount.compare_exchange_weak(oldEventCount, oldEventCount + 1, std::memory_order_release))
349
0
      {
350
0
        oldEventCount = savedEventCount;
351
0
        if (2 < ++failureCount) // Did the scheduler suspend a thread between claiming a slot and advancing commitCount?
352
0
          std::this_thread::yield(); // Wait a bit
353
0
      }
354
0
      priv->bufferNotEmpty.notify_all();
355
0
      break;
356
0
    }
357
    //
358
    //   Following code is only reachable if buffer is full or eventCount has overflowed
359
    //
360
0
    std::unique_lock<std::mutex> lock(priv->bufferMutex);
361
0
    priv->bufferNotEmpty.notify_all();
362
    //
363
    //   if blocking and thread is not already interrupted
364
    //      and not the dispatcher then
365
    //      wait for a buffer notification
366
0
    bool discard = true;
367
368
0
    if (priv->blocking
369
0
      && !priv->closed)
370
0
    {
371
0
      ++priv->blockedCount;
372
0
      priv->bufferNotFull.wait(lock, [this]()
373
0
      {
374
0
        return priv->eventCount - priv->dispatchedCount < priv->bufferSize;
375
0
      });
376
0
      --priv->blockedCount;
377
0
      discard = false;
378
0
    }
379
380
    //
381
    //   if blocking is false or thread has been interrupted
382
    //   add event to discard map.
383
    //
384
0
    if (discard)
385
0
    {
386
0
      LogString loggerName = event->getLoggerName();
387
0
      DiscardMap::iterator iter = priv->discardMap.find(loggerName);
388
389
0
      if (iter == priv->discardMap.end())
390
0
      {
391
0
        priv->discardMap.emplace(loggerName, DiscardSummary{ event, LOG4CXX_STR("due to a full event buffer") });
392
0
      }
393
0
      else
394
0
      {
395
0
        iter->second.add(event);
396
0
      }
397
398
0
      break;
399
0
    }
400
0
  }
401
0
}
402
403
void AsyncAppender::close()
404
0
{
405
0
  priv->stopDispatcher();
406
0
  for (auto item : priv->appenders.getAllAppenders())
407
0
  {
408
0
    item->close();
409
0
  }
410
0
}
411
412
AppenderList AsyncAppender::getAllAppenders() const
413
0
{
414
0
  return priv->appenders.getAllAppenders();
415
0
}
416
417
AppenderPtr AsyncAppender::getAppender(const LogString& n) const
418
0
{
419
0
  return priv->appenders.getAppender(n);
420
0
}
421
422
bool AsyncAppender::isAttached(const AppenderPtr appender) const
423
0
{
424
0
  return priv->appenders.isAttached(appender);
425
0
}
426
427
bool AsyncAppender::requiresLayout() const
428
0
{
429
0
  return false;
430
0
}
431
432
void AsyncAppender::removeAllAppenders()
433
0
{
434
0
  priv->appenders.removeAllAppenders();
435
0
}
436
437
void AsyncAppender::removeAppender(const AppenderPtr appender)
438
0
{
439
0
  priv->appenders.removeAppender(appender);
440
0
}
441
442
void AsyncAppender::removeAppender(const LogString& n)
443
0
{
444
0
  priv->appenders.removeAppender(n);
445
0
}
446
447
bool AsyncAppender::replaceAppender(const AppenderPtr& oldAppender, const AppenderPtr& newAppender)
448
0
{
449
0
  return priv->appenders.replaceAppender(oldAppender, newAppender);
450
0
}
451
452
void AsyncAppender::replaceAppenders( const AppenderList& newList)
453
0
{
454
0
  priv->appenders.replaceAppenders(newList);
455
0
}
456
457
bool AsyncAppender::getLocationInfo() const
458
0
{
459
0
  return priv->locationInfo;
460
0
}
461
462
void AsyncAppender::setLocationInfo(bool flag)
463
0
{
464
0
  priv->locationInfo = flag;
465
0
}
466
467
468
void AsyncAppender::setBufferSize(int size)
469
0
{
470
0
  if (size < 0)
471
0
  {
472
0
    throw IllegalArgumentException(LOG4CXX_STR("size argument must be non-negative"));
473
0
  }
474
475
0
  std::lock_guard<std::mutex> lock(priv->bufferMutex);
476
0
  priv->bufferSize = (size < 1) ? 1 : size;
477
0
  priv->buffer.resize(priv->bufferSize);
478
0
  priv->bufferNotFull.notify_all();
479
0
}
480
481
int AsyncAppender::getBufferSize() const
482
0
{
483
0
  return priv->bufferSize;
484
0
}
485
486
void AsyncAppender::setBlocking(bool value)
487
0
{
488
0
  std::lock_guard<std::mutex> lock(priv->bufferMutex);
489
0
  priv->blocking = value;
490
0
  priv->bufferNotFull.notify_all();
491
0
}
492
493
bool AsyncAppender::getBlocking() const
494
0
{
495
0
  return priv->blocking;
496
0
}
497
498
DiscardSummary::DiscardSummary(const LoggingEventPtr& event, const LogString& reasonArg)
499
0
  : maxEvent(event)
500
0
  , count(1)
501
0
  , reason(reasonArg)
502
0
{
503
0
}
504
505
DiscardSummary::DiscardSummary(DiscardSummary&& other)
506
0
  : maxEvent(std::move(other.maxEvent))
507
0
  , count(other.count)
508
0
  , reason(std::move(other.reason))
509
0
{
510
0
}
511
512
#if LOG4CXX_ABI_VERSION <= 15
513
DiscardSummary::DiscardSummary(const LoggingEventPtr& event) :
514
0
  maxEvent(event), count(1)
515
0
{
516
0
}
517
518
DiscardSummary::DiscardSummary(const DiscardSummary& src) :
519
0
  maxEvent(src.maxEvent), count(src.count)
520
0
{
521
0
}
522
523
DiscardSummary& DiscardSummary::operator=(const DiscardSummary& src)
524
0
{
525
0
  maxEvent = src.maxEvent;
526
0
  count = src.count;
527
0
  return *this;
528
0
}
529
#endif
530
531
void DiscardSummary::add(const LoggingEventPtr& event)
532
0
{
533
0
  if (this->maxEvent->getLevel()->toInt() < event->getLevel()->toInt())
534
0
    this->maxEvent = event;
535
0
  ++this->count;
536
0
}
537
538
LoggingEventPtr DiscardSummary::createEvent(Pool& p)
539
0
{
540
0
  LogString msg(LOG4CXX_STR("Discarded "));
541
0
  StringHelper::toString(this->count, p, msg);
542
0
  msg.append(LOG4CXX_STR(" messages ") + this->reason + LOG4CXX_STR(" including: "));
543
0
  msg.append(this->maxEvent->getRenderedMessage());
544
0
  return std::make_shared<LoggingEvent>
545
0
    ( this->maxEvent->getLoggerName()
546
0
    , this->maxEvent->getLevel()
547
0
    , msg
548
0
    , LocationInfo::getLocationUnavailable()
549
0
    );
550
0
}
551
552
#if LOG4CXX_ABI_VERSION <= 15
553
::LOG4CXX_NS::spi::LoggingEventPtr
554
DiscardSummary::createEvent(::LOG4CXX_NS::helpers::Pool& p,
555
  size_t discardedCount)
556
0
{
557
0
  LogString msg(LOG4CXX_STR("Discarded "));
558
0
  StringHelper::toString(discardedCount, p, msg);
559
0
  msg.append(LOG4CXX_STR(" messages due to a full event buffer"));
560
561
0
  return std::make_shared<LoggingEvent>(
562
0
        LOG4CXX_STR(""),
563
0
        LOG4CXX_NS::Level::getError(),
564
0
        msg,
565
0
        LocationInfo::getLocationUnavailable() );
566
0
}
567
#endif
568
569
570
void AsyncAppender::dispatch()
571
0
{
572
0
  size_t discardCount = 0;
573
0
  size_t iterationCount = 0;
574
0
  size_t waitCount = 0;
575
0
  size_t blockedCount = 0;
576
0
  std::vector<size_t> pendingCountHistogram(priv->bufferSize, 0);
577
0
  bool isActive = true;
578
579
0
  while (isActive)
580
0
  {
581
0
    Pool p;
582
0
    LoggingEventList events;
583
0
    events.reserve(priv->bufferSize);
584
0
    for (int count = 0; count < 2 && priv->dispatchedCount == priv->commitCount; ++count)
585
0
      std::this_thread::yield(); // Wait a bit
586
0
    if (priv->dispatchedCount == priv->commitCount)
587
0
    {
588
0
      ++waitCount;
589
0
      std::unique_lock<std::mutex> lock(priv->bufferMutex);
590
0
      priv->bufferNotEmpty.wait(lock, [this]() -> bool
591
0
        { return 0 < priv->blockedCount || priv->dispatchedCount != priv->commitCount || priv->closed; }
592
0
      );
593
0
    }
594
0
    isActive = !priv->isClosed();
595
596
0
    while (events.size() < priv->bufferSize && priv->dispatchedCount != priv->commitCount)
597
0
    {
598
0
      auto index = priv->dispatchedCount % priv->buffer.size();
599
0
      const auto& data = priv->buffer[index];
600
0
      events.push_back(data.event);
601
0
      if (data.pendingCount < pendingCountHistogram.size())
602
0
        ++pendingCountHistogram[data.pendingCount];
603
0
      ++priv->dispatchedCount;
604
0
    }
605
0
    priv->bufferNotFull.notify_all();
606
0
    {
607
0
      std::lock_guard<std::mutex> lock(priv->bufferMutex);
608
0
      blockedCount += priv->blockedCount;
609
0
      for (auto& discardItem : priv->discardMap)
610
0
      {
611
0
        events.push_back(discardItem.second.createEvent(p));
612
0
        discardCount += discardItem.second.getCount();
613
0
      }
614
0
      priv->discardMap.clear();
615
0
    }
616
617
0
    for (auto item : events)
618
0
    {
619
0
      try
620
0
      {
621
0
        priv->appenders.appendLoopOnAppenders(item, p);
622
0
      }
623
0
      catch (std::exception& ex)
624
0
      {
625
0
        if (!priv->isClosed())
626
0
        {
627
0
          priv->errorHandler->error(LOG4CXX_STR("async dispatcher"), ex, 0, item);
628
0
          isActive = false;
629
0
        }
630
0
      }
631
0
      catch (...)
632
0
      {
633
0
        if (!priv->isClosed())
634
0
        {
635
0
          priv->errorHandler->error(LOG4CXX_STR("async dispatcher"));
636
0
          isActive = false;
637
0
        }
638
0
      }
639
0
    }
640
0
    ++iterationCount;
641
0
  }
642
0
  if (LogLog::isDebugEnabled())
643
0
  {
644
0
    Pool p;
645
0
    LogString msg(LOG4CXX_STR("[") + getName() + LOG4CXX_STR("] AsyncAppender"));
646
#ifdef _DEBUG
647
    msg += LOG4CXX_STR(" iterationCount ");
648
    StringHelper::toString(iterationCount, p, msg);
649
    msg += LOG4CXX_STR(" waitCount ");
650
    StringHelper::toString(waitCount, p, msg);
651
    msg += LOG4CXX_STR(" blockedCount ");
652
    StringHelper::toString(blockedCount, p, msg);
653
    msg += LOG4CXX_STR(" commitCount ");
654
    StringHelper::toString(priv->commitCount, p, msg);
655
#endif
656
0
    msg += LOG4CXX_STR(" dispatchedCount ");
657
0
    StringHelper::toString(priv->dispatchedCount, p, msg);
658
0
    msg += LOG4CXX_STR(" discardCount ");
659
0
    StringHelper::toString(discardCount, p, msg);
660
0
    msg += LOG4CXX_STR(" pendingCountHistogram");
661
0
    for (auto item : pendingCountHistogram)
662
0
    {
663
0
      msg += logchar(' ');
664
0
      StringHelper::toString(item, p, msg);
665
0
    }
666
0
    LogLog::debug(msg);
667
0
  }
668
669
0
}