Line data Source code
1 : #include "source/common/access_log/access_log_manager_impl.h" 2 : 3 : #include <string> 4 : 5 : #include "envoy/common/exception.h" 6 : 7 : #include "source/common/common/assert.h" 8 : #include "source/common/common/fmt.h" 9 : #include "source/common/common/lock_guard.h" 10 : 11 : #include "absl/container/fixed_array.h" 12 : 13 : namespace Envoy { 14 : namespace AccessLog { 15 : 16 455 : AccessLogManagerImpl::~AccessLogManagerImpl() { 17 455 : for (auto& [log_key, log_file_ptr] : access_logs_) { 18 101 : ENVOY_LOG(debug, "destroying access logger {}", log_key); 19 101 : log_file_ptr.reset(); 20 101 : } 21 455 : ENVOY_LOG(debug, "destroyed access loggers"); 22 455 : } 23 : 24 0 : void AccessLogManagerImpl::reopen() { 25 0 : for (auto& iter : access_logs_) { 26 0 : iter.second->reopen(); 27 0 : } 28 0 : } 29 : 30 : AccessLogFileSharedPtr 31 171 : AccessLogManagerImpl::createAccessLog(const Filesystem::FilePathAndType& file_info) { 32 171 : auto file = api_.fileSystem().createFile(file_info); 33 171 : std::string file_name = file->path(); 34 171 : if (access_logs_.count(file_name)) { 35 70 : return access_logs_[file_name]; 36 70 : } 37 101 : access_logs_[file_name] = 38 101 : std::make_shared<AccessLogFileImpl>(std::move(file), dispatcher_, lock_, file_stats_, 39 101 : file_flush_interval_msec_, api_.threadFactory()); 40 101 : return access_logs_[file_name]; 41 171 : } 42 : 43 : AccessLogFileImpl::AccessLogFileImpl(Filesystem::FilePtr&& file, Event::Dispatcher& dispatcher, 44 : Thread::BasicLockable& lock, AccessLogFileStats& stats, 45 : std::chrono::milliseconds flush_interval_msec, 46 : Thread::ThreadFactory& thread_factory) 47 : : file_(std::move(file)), file_lock_(lock), 48 564 : flush_timer_(dispatcher.createTimer([this]() -> void { 49 564 : stats_.flushed_by_timer_.inc(); 50 564 : flush_event_.notifyOne(); 51 564 : flush_timer_->enableTimer(flush_interval_msec_); 52 564 : })), 53 101 : thread_factory_(thread_factory), flush_interval_msec_(flush_interval_msec), stats_(stats) { 54 101 : flush_timer_->enableTimer(flush_interval_msec_); 55 101 : auto open_result = open(); 56 101 : if (!open_result.return_value_) { 57 0 : throwEnvoyExceptionOrPanic(fmt::format("unable to open file '{}': {}", file_->path(), 58 0 : open_result.err_->getErrorDetails())); 59 0 : } 60 101 : } 61 : 62 101 : Filesystem::FlagSet AccessLogFileImpl::defaultFlags() { 63 101 : static constexpr Filesystem::FlagSet default_flags{1 << Filesystem::File::Operation::Write | 64 101 : 1 << Filesystem::File::Operation::Create | 65 101 : 1 << Filesystem::File::Operation::Append}; 66 : 67 101 : return default_flags; 68 101 : } 69 : 70 101 : Api::IoCallBoolResult AccessLogFileImpl::open() { 71 101 : Api::IoCallBoolResult result = file_->open(defaultFlags()); 72 101 : return result; 73 101 : } 74 : 75 0 : void AccessLogFileImpl::reopen() { 76 0 : Thread::LockGuard lock(write_lock_); 77 0 : reopen_file_ = true; 78 0 : flush_event_.notifyOne(); 79 0 : } 80 : 81 101 : AccessLogFileImpl::~AccessLogFileImpl() { 82 101 : { 83 101 : Thread::LockGuard lock(write_lock_); 84 101 : flush_thread_exit_ = true; 85 101 : flush_event_.notifyOne(); 86 101 : } 87 : 88 101 : if (flush_thread_ != nullptr) { 89 98 : flush_thread_->join(); 90 98 : } 91 : 92 : // Flush any remaining data. If file was not opened for some reason, skip flushing part. 93 101 : if (file_->isOpen()) { 94 101 : if (flush_buffer_.length() > 0) { 95 47 : doWrite(flush_buffer_); 96 47 : } 97 101 : const Api::IoCallBoolResult result = file_->close(); 98 101 : ASSERT(result.return_value_, fmt::format("unable to close file '{}': {}", file_->path(), 99 101 : result.err_->getErrorDetails())); 100 101 : } 101 101 : } 102 : 103 251 : void AccessLogFileImpl::doWrite(Buffer::Instance& buffer) { 104 251 : Buffer::RawSliceVector slices = buffer.getRawSlices(); 105 : 106 : // We must do the actual writes to disk under lock, so that we don't intermix chunks from 107 : // different AccessLogFileImpl pointing to the same underlying file. This can happen either via 108 : // hot restart or if calling code opens the same underlying file into a different 109 : // AccessLogFileImpl in the same process. 110 : // TODO PERF: Currently, we use a single cross process lock to serialize all disk writes. This 111 : // will never block network workers, but does mean that only a single flush thread can 112 : // actually flush to disk. In the future it would be nice if we did away with the cross 113 : // process lock or had multiple locks. 114 251 : { 115 251 : Thread::LockGuard lock(file_lock_); 116 251 : for (const Buffer::RawSlice& slice : slices) { 117 251 : absl::string_view data(static_cast<char*>(slice.mem_), slice.len_); 118 251 : const Api::IoCallSizeResult result = file_->write(data); 119 251 : if (result.ok() && result.return_value_ == static_cast<ssize_t>(slice.len_)) { 120 251 : stats_.write_completed_.inc(); 121 251 : } else { 122 : // Probably disk full. 123 0 : stats_.write_failed_.inc(); 124 0 : } 125 251 : } 126 251 : } 127 : 128 251 : stats_.write_total_buffered_.sub(buffer.length()); 129 251 : buffer.drain(buffer.length()); 130 251 : } 131 : 132 98 : void AccessLogFileImpl::flushThreadFunc() { 133 : 134 : // Transfer the action from `reopen_file_` to this variable so that `reopen_file_` is only 135 : // accessed while holding the mutex while the actual operation is performed while not holding the 136 : // mutex. 137 98 : bool do_reopen = false; 138 : 139 302 : while (true) { 140 302 : std::unique_lock<Thread::BasicLockable> flush_lock; 141 : 142 302 : { 143 302 : Thread::LockGuard write_lock(write_lock_); 144 : 145 : // flush_event_ can be woken up either by large enough flush_buffer or by timer. 146 : // In case it was timer, flush_buffer_ can be empty. 147 : // 148 : // Note: do not stop waiting when only `do_reopen` is true. In this case, we tried to 149 : // reopen and failed. We don't want to retry this in a tight loop, so wait for the next 150 : // event (timer or flush). 151 866 : while (flush_buffer_.length() == 0 && !flush_thread_exit_ && !reopen_file_) { 152 : // CondVar::wait() does not throw, so it's safe to pass the mutex rather than the guard. 153 564 : flush_event_.wait(write_lock_); 154 564 : } 155 : 156 302 : if (flush_thread_exit_) { 157 98 : return; 158 98 : } 159 : 160 204 : flush_lock = std::unique_lock<Thread::BasicLockable>(flush_lock_); 161 204 : about_to_write_buffer_.move(flush_buffer_); 162 204 : ASSERT(flush_buffer_.length() == 0); 163 : 164 204 : if (reopen_file_) { 165 0 : do_reopen = true; 166 0 : reopen_file_ = false; 167 0 : } 168 204 : } 169 : 170 204 : if (do_reopen) { 171 0 : if (file_->isOpen()) { 172 0 : const Api::IoCallBoolResult result = file_->close(); 173 0 : ASSERT(result.return_value_, fmt::format("unable to close file '{}': {}", file_->path(), 174 0 : result.err_->getErrorDetails())); 175 0 : } 176 0 : const Api::IoCallBoolResult open_result = open(); 177 0 : if (!open_result.return_value_) { 178 0 : stats_.reopen_failed_.inc(); 179 0 : } else { 180 0 : do_reopen = false; 181 0 : } 182 0 : } 183 : // doWrite no matter file isOpen, if not, we can drain buffer 184 204 : doWrite(about_to_write_buffer_); 185 204 : } 186 98 : } 187 : 188 0 : void AccessLogFileImpl::flush() { 189 0 : std::unique_lock<Thread::BasicLockable> flush_buffer_lock; 190 : 191 0 : { 192 0 : Thread::LockGuard write_lock(write_lock_); 193 : 194 : // flush_lock_ must be held while checking this or else it is 195 : // possible that flushThreadFunc() has already moved data from 196 : // flush_buffer_ to about_to_write_buffer_, has unlocked write_lock_, 197 : // but has not yet completed doWrite(). This would allow flush() to 198 : // return before the pending data has actually been written to disk. 199 0 : flush_buffer_lock = std::unique_lock<Thread::BasicLockable>(flush_lock_); 200 : 201 0 : if (flush_buffer_.length() == 0) { 202 0 : return; 203 0 : } 204 : 205 0 : about_to_write_buffer_.move(flush_buffer_); 206 0 : ASSERT(flush_buffer_.length() == 0); 207 0 : } 208 : 209 0 : doWrite(about_to_write_buffer_); 210 0 : } 211 : 212 612 : void AccessLogFileImpl::write(absl::string_view data) { 213 612 : Thread::LockGuard lock(write_lock_); 214 : 215 612 : if (flush_thread_ == nullptr) { 216 98 : createFlushStructures(); 217 98 : } 218 : 219 612 : stats_.write_buffered_.inc(); 220 612 : stats_.write_total_buffered_.add(data.length()); 221 612 : flush_buffer_.add(data.data(), data.size()); 222 612 : if (flush_buffer_.length() > MIN_FLUSH_SIZE) { 223 0 : flush_event_.notifyOne(); 224 0 : } 225 612 : } 226 : 227 98 : void AccessLogFileImpl::createFlushStructures() { 228 98 : flush_thread_ = thread_factory_.createThread([this]() -> void { flushThreadFunc(); }, 229 98 : Thread::Options{"AccessLogFlush"}); 230 98 : } 231 : 232 : } // namespace AccessLog 233 : } // namespace Envoy