LCOV - code coverage report
Current view: top level - source/common/access_log - access_log_manager_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 106 151 70.2 %
Date: 2024-01-05 06:35:25 Functions: 12 15 80.0 %

          Line data    Source code
       1             : #include "source/common/access_log/access_log_manager_impl.h"
       2             : 
       3             : #include <string>
       4             : 
       5             : #include "envoy/common/exception.h"
       6             : 
       7             : #include "source/common/common/assert.h"
       8             : #include "source/common/common/fmt.h"
       9             : #include "source/common/common/lock_guard.h"
      10             : 
      11             : #include "absl/container/fixed_array.h"
      12             : 
      13             : namespace Envoy {
      14             : namespace AccessLog {
      15             : 
      16         455 : AccessLogManagerImpl::~AccessLogManagerImpl() {
      17         455 :   for (auto& [log_key, log_file_ptr] : access_logs_) {
      18         101 :     ENVOY_LOG(debug, "destroying access logger {}", log_key);
      19         101 :     log_file_ptr.reset();
      20         101 :   }
      21         455 :   ENVOY_LOG(debug, "destroyed access loggers");
      22         455 : }
      23             : 
      24           0 : void AccessLogManagerImpl::reopen() {
      25           0 :   for (auto& iter : access_logs_) {
      26           0 :     iter.second->reopen();
      27           0 :   }
      28           0 : }
      29             : 
      30             : AccessLogFileSharedPtr
      31         171 : AccessLogManagerImpl::createAccessLog(const Filesystem::FilePathAndType& file_info) {
      32         171 :   auto file = api_.fileSystem().createFile(file_info);
      33         171 :   std::string file_name = file->path();
      34         171 :   if (access_logs_.count(file_name)) {
      35          70 :     return access_logs_[file_name];
      36          70 :   }
      37         101 :   access_logs_[file_name] =
      38         101 :       std::make_shared<AccessLogFileImpl>(std::move(file), dispatcher_, lock_, file_stats_,
      39         101 :                                           file_flush_interval_msec_, api_.threadFactory());
      40         101 :   return access_logs_[file_name];
      41         171 : }
      42             : 
      43             : AccessLogFileImpl::AccessLogFileImpl(Filesystem::FilePtr&& file, Event::Dispatcher& dispatcher,
      44             :                                      Thread::BasicLockable& lock, AccessLogFileStats& stats,
      45             :                                      std::chrono::milliseconds flush_interval_msec,
      46             :                                      Thread::ThreadFactory& thread_factory)
      47             :     : file_(std::move(file)), file_lock_(lock),
      48         564 :       flush_timer_(dispatcher.createTimer([this]() -> void {
      49         564 :         stats_.flushed_by_timer_.inc();
      50         564 :         flush_event_.notifyOne();
      51         564 :         flush_timer_->enableTimer(flush_interval_msec_);
      52         564 :       })),
      53         101 :       thread_factory_(thread_factory), flush_interval_msec_(flush_interval_msec), stats_(stats) {
      54         101 :   flush_timer_->enableTimer(flush_interval_msec_);
      55         101 :   auto open_result = open();
      56         101 :   if (!open_result.return_value_) {
      57           0 :     throwEnvoyExceptionOrPanic(fmt::format("unable to open file '{}': {}", file_->path(),
      58           0 :                                            open_result.err_->getErrorDetails()));
      59           0 :   }
      60         101 : }
      61             : 
      62         101 : Filesystem::FlagSet AccessLogFileImpl::defaultFlags() {
      63         101 :   static constexpr Filesystem::FlagSet default_flags{1 << Filesystem::File::Operation::Write |
      64         101 :                                                      1 << Filesystem::File::Operation::Create |
      65         101 :                                                      1 << Filesystem::File::Operation::Append};
      66             : 
      67         101 :   return default_flags;
      68         101 : }
      69             : 
      70         101 : Api::IoCallBoolResult AccessLogFileImpl::open() {
      71         101 :   Api::IoCallBoolResult result = file_->open(defaultFlags());
      72         101 :   return result;
      73         101 : }
      74             : 
      75           0 : void AccessLogFileImpl::reopen() {
      76           0 :   Thread::LockGuard lock(write_lock_);
      77           0 :   reopen_file_ = true;
      78           0 :   flush_event_.notifyOne();
      79           0 : }
      80             : 
      81         101 : AccessLogFileImpl::~AccessLogFileImpl() {
      82         101 :   {
      83         101 :     Thread::LockGuard lock(write_lock_);
      84         101 :     flush_thread_exit_ = true;
      85         101 :     flush_event_.notifyOne();
      86         101 :   }
      87             : 
      88         101 :   if (flush_thread_ != nullptr) {
      89          98 :     flush_thread_->join();
      90          98 :   }
      91             : 
      92             :   // Flush any remaining data. If file was not opened for some reason, skip flushing part.
      93         101 :   if (file_->isOpen()) {
      94         101 :     if (flush_buffer_.length() > 0) {
      95          47 :       doWrite(flush_buffer_);
      96          47 :     }
      97         101 :     const Api::IoCallBoolResult result = file_->close();
      98         101 :     ASSERT(result.return_value_, fmt::format("unable to close file '{}': {}", file_->path(),
      99         101 :                                              result.err_->getErrorDetails()));
     100         101 :   }
     101         101 : }
     102             : 
     103         251 : void AccessLogFileImpl::doWrite(Buffer::Instance& buffer) {
     104         251 :   Buffer::RawSliceVector slices = buffer.getRawSlices();
     105             : 
     106             :   // We must do the actual writes to disk under lock, so that we don't intermix chunks from
     107             :   // different AccessLogFileImpl pointing to the same underlying file. This can happen either via
     108             :   // hot restart or if calling code opens the same underlying file into a different
     109             :   // AccessLogFileImpl in the same process.
     110             :   // TODO PERF: Currently, we use a single cross process lock to serialize all disk writes. This
     111             :   //            will never block network workers, but does mean that only a single flush thread can
     112             :   //            actually flush to disk. In the future it would be nice if we did away with the cross
     113             :   //            process lock or had multiple locks.
     114         251 :   {
     115         251 :     Thread::LockGuard lock(file_lock_);
     116         251 :     for (const Buffer::RawSlice& slice : slices) {
     117         251 :       absl::string_view data(static_cast<char*>(slice.mem_), slice.len_);
     118         251 :       const Api::IoCallSizeResult result = file_->write(data);
     119         251 :       if (result.ok() && result.return_value_ == static_cast<ssize_t>(slice.len_)) {
     120         251 :         stats_.write_completed_.inc();
     121         251 :       } else {
     122             :         // Probably disk full.
     123           0 :         stats_.write_failed_.inc();
     124           0 :       }
     125         251 :     }
     126         251 :   }
     127             : 
     128         251 :   stats_.write_total_buffered_.sub(buffer.length());
     129         251 :   buffer.drain(buffer.length());
     130         251 : }
     131             : 
     132          98 : void AccessLogFileImpl::flushThreadFunc() {
     133             : 
     134             :   // Transfer the action from `reopen_file_` to this variable so that `reopen_file_` is only
     135             :   // accessed while holding the mutex while the actual operation is performed while not holding the
     136             :   // mutex.
     137          98 :   bool do_reopen = false;
     138             : 
     139         302 :   while (true) {
     140         302 :     std::unique_lock<Thread::BasicLockable> flush_lock;
     141             : 
     142         302 :     {
     143         302 :       Thread::LockGuard write_lock(write_lock_);
     144             : 
     145             :       // flush_event_ can be woken up either by large enough flush_buffer or by timer.
     146             :       // In case it was timer, flush_buffer_ can be empty.
     147             :       //
     148             :       // Note: do not stop waiting when only `do_reopen` is true. In this case, we tried to
     149             :       // reopen and failed. We don't want to retry this in a tight loop, so wait for the next
     150             :       // event (timer or flush).
     151         866 :       while (flush_buffer_.length() == 0 && !flush_thread_exit_ && !reopen_file_) {
     152             :         // CondVar::wait() does not throw, so it's safe to pass the mutex rather than the guard.
     153         564 :         flush_event_.wait(write_lock_);
     154         564 :       }
     155             : 
     156         302 :       if (flush_thread_exit_) {
     157          98 :         return;
     158          98 :       }
     159             : 
     160         204 :       flush_lock = std::unique_lock<Thread::BasicLockable>(flush_lock_);
     161         204 :       about_to_write_buffer_.move(flush_buffer_);
     162         204 :       ASSERT(flush_buffer_.length() == 0);
     163             : 
     164         204 :       if (reopen_file_) {
     165           0 :         do_reopen = true;
     166           0 :         reopen_file_ = false;
     167           0 :       }
     168         204 :     }
     169             : 
     170         204 :     if (do_reopen) {
     171           0 :       if (file_->isOpen()) {
     172           0 :         const Api::IoCallBoolResult result = file_->close();
     173           0 :         ASSERT(result.return_value_, fmt::format("unable to close file '{}': {}", file_->path(),
     174           0 :                                                  result.err_->getErrorDetails()));
     175           0 :       }
     176           0 :       const Api::IoCallBoolResult open_result = open();
     177           0 :       if (!open_result.return_value_) {
     178           0 :         stats_.reopen_failed_.inc();
     179           0 :       } else {
     180           0 :         do_reopen = false;
     181           0 :       }
     182           0 :     }
     183             :     // doWrite no matter file isOpen, if not, we can drain buffer
     184         204 :     doWrite(about_to_write_buffer_);
     185         204 :   }
     186          98 : }
     187             : 
     188           0 : void AccessLogFileImpl::flush() {
     189           0 :   std::unique_lock<Thread::BasicLockable> flush_buffer_lock;
     190             : 
     191           0 :   {
     192           0 :     Thread::LockGuard write_lock(write_lock_);
     193             : 
     194             :     // flush_lock_ must be held while checking this or else it is
     195             :     // possible that flushThreadFunc() has already moved data from
     196             :     // flush_buffer_ to about_to_write_buffer_, has unlocked write_lock_,
     197             :     // but has not yet completed doWrite(). This would allow flush() to
     198             :     // return before the pending data has actually been written to disk.
     199           0 :     flush_buffer_lock = std::unique_lock<Thread::BasicLockable>(flush_lock_);
     200             : 
     201           0 :     if (flush_buffer_.length() == 0) {
     202           0 :       return;
     203           0 :     }
     204             : 
     205           0 :     about_to_write_buffer_.move(flush_buffer_);
     206           0 :     ASSERT(flush_buffer_.length() == 0);
     207           0 :   }
     208             : 
     209           0 :   doWrite(about_to_write_buffer_);
     210           0 : }
     211             : 
     212         612 : void AccessLogFileImpl::write(absl::string_view data) {
     213         612 :   Thread::LockGuard lock(write_lock_);
     214             : 
     215         612 :   if (flush_thread_ == nullptr) {
     216          98 :     createFlushStructures();
     217          98 :   }
     218             : 
     219         612 :   stats_.write_buffered_.inc();
     220         612 :   stats_.write_total_buffered_.add(data.length());
     221         612 :   flush_buffer_.add(data.data(), data.size());
     222         612 :   if (flush_buffer_.length() > MIN_FLUSH_SIZE) {
     223           0 :     flush_event_.notifyOne();
     224           0 :   }
     225         612 : }
     226             : 
     227          98 : void AccessLogFileImpl::createFlushStructures() {
     228          98 :   flush_thread_ = thread_factory_.createThread([this]() -> void { flushThreadFunc(); },
     229          98 :                                                Thread::Options{"AccessLogFlush"});
     230          98 : }
     231             : 
     232             : } // namespace AccessLog
     233             : } // namespace Envoy

Generated by: LCOV version 1.15