Line data Source code
1 : #pragma once 2 : 3 : #include <cstdint> 4 : #include <functional> 5 : #include <list> 6 : #include <memory> 7 : #include <vector> 8 : 9 : #include "envoy/api/api.h" 10 : #include "envoy/common/scope_tracker.h" 11 : #include "envoy/common/time.h" 12 : #include "envoy/event/deferred_deletable.h" 13 : #include "envoy/event/dispatcher.h" 14 : #include "envoy/network/connection_handler.h" 15 : #include "envoy/stats/scope.h" 16 : 17 : #include "source/common/common/logger.h" 18 : #include "source/common/common/thread.h" 19 : #include "source/common/event/libevent.h" 20 : #include "source/common/event/libevent_scheduler.h" 21 : #include "source/common/signal/fatal_error_handler.h" 22 : 23 : #include "absl/container/inlined_vector.h" 24 : 25 : namespace Envoy { 26 : namespace Event { 27 : 28 : // The tracked object stack likely won't grow larger than this initial 29 : // reservation; this should make appends constant time since the stack 30 : // shouldn't have to grow larger. 31 : inline constexpr size_t ExpectedMaxTrackedObjectStackDepth = 10; 32 : 33 : /** 34 : * libevent implementation of Event::Dispatcher. 35 : */ 36 : class DispatcherImpl : Logger::Loggable<Logger::Id::main>, 37 : public Dispatcher, 38 : public FatalErrorHandlerInterface { 39 : public: 40 : DispatcherImpl(const std::string& name, Api::Api& api, Event::TimeSystem& time_system); 41 : DispatcherImpl(const std::string& name, Api::Api& api, Event::TimeSystem& time_systems, 42 : const Buffer::WatermarkFactorySharedPtr& watermark_factory); 43 : DispatcherImpl(const std::string& name, Api::Api& api, Event::TimeSystem& time_system, 44 : const ScaledRangeTimerManagerFactory& scaled_timer_factory, 45 : const Buffer::WatermarkFactorySharedPtr& watermark_factory); 46 : // TODO(alyssawilk) remove random_generator. 47 : DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory, 48 : TimeSource& time_source, Random::RandomGenerator& random_generator, 49 : Filesystem::Instance& file_system, Event::TimeSystem& time_system, 50 : const ScaledRangeTimerManagerFactory& scaled_timer_factory, 51 : const Buffer::WatermarkFactorySharedPtr& watermark_factory); 52 : ~DispatcherImpl() override; 53 : 54 : /** 55 : * @return event_base& the libevent base. 56 : */ 57 3803 : event_base& base() { return base_scheduler_.base(); } 58 : 59 : // Event::Dispatcher 60 1521 : const std::string& name() override { return name_; } 61 : void registerWatchdog(const Server::WatchDogSharedPtr& watchdog, 62 : std::chrono::milliseconds min_touch_interval) override; 63 14879 : TimeSource& timeSource() override { return time_source_; } 64 : void initializeStats(Stats::Scope& scope, const absl::optional<std::string>& prefix) override; 65 : void clearDeferredDeleteList() override; 66 : Network::ServerConnectionPtr 67 : createServerConnection(Network::ConnectionSocketPtr&& socket, 68 : Network::TransportSocketPtr&& transport_socket, 69 : StreamInfo::StreamInfo& stream_info) override; 70 : Network::ClientConnectionPtr createClientConnection( 71 : Network::Address::InstanceConstSharedPtr address, 72 : Network::Address::InstanceConstSharedPtr source_address, 73 : Network::TransportSocketPtr&& transport_socket, 74 : const Network::ConnectionSocket::OptionsSharedPtr& options, 75 : const Network::TransportSocketOptionsConstSharedPtr& transport_options) override; 76 : FileEventPtr createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, 77 : uint32_t events) override; 78 : Filesystem::WatcherPtr createFilesystemWatcher() override; 79 : TimerPtr createTimer(TimerCb cb) override; 80 : TimerPtr createScaledTimer(ScaledTimerType timer_type, TimerCb cb) override; 81 : TimerPtr createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) override; 82 : 83 : Event::SchedulableCallbackPtr createSchedulableCallback(std::function<void()> cb) override; 84 : void deferredDelete(DeferredDeletablePtr&& to_delete) override; 85 : void exit() override; 86 : SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override; 87 : void post(PostCb callback) override; 88 : void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override; 89 : void run(RunType type) override; 90 7053 : Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; } 91 : void pushTrackedObject(const ScopeTrackedObject* object) override; 92 : void popTrackedObject(const ScopeTrackedObject* expected_object) override; 93 178 : bool trackedObjectStackIsEmpty() const override { return tracked_object_stack_.empty(); } 94 : MonotonicTime approximateMonotonicTime() const override; 95 : void updateApproximateMonotonicTime() override; 96 : void shutdown() override; 97 : 98 : // FatalErrorInterface 99 : void onFatalError(std::ostream& os) const override; 100 : void 101 : runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override; 102 : 103 : private: 104 : // Holds a reference to the watchdog registered with this dispatcher and the timer used to ensure 105 : // that the dog is touched periodically. 106 : class WatchdogRegistration { 107 : public: 108 : WatchdogRegistration(const Server::WatchDogSharedPtr& watchdog, Scheduler& scheduler, 109 : std::chrono::milliseconds timer_interval, Dispatcher& dispatcher) 110 192 : : watchdog_(watchdog), timer_interval_(timer_interval) { 111 192 : touch_timer_ = scheduler.createTimer( 112 483 : [this]() -> void { 113 483 : watchdog_->touch(); 114 483 : touch_timer_->enableTimer(timer_interval_); 115 483 : }, 116 192 : dispatcher); 117 192 : touch_timer_->enableTimer(timer_interval_); 118 192 : } 119 : 120 7018 : void touchWatchdog() { watchdog_->touch(); } 121 : 122 : private: 123 : Server::WatchDogSharedPtr watchdog_; 124 : const std::chrono::milliseconds timer_interval_; 125 : TimerPtr touch_timer_; 126 : }; 127 : using WatchdogRegistrationPtr = std::unique_ptr<WatchdogRegistration>; 128 : 129 : TimerPtr createTimerInternal(TimerCb cb); 130 : void updateApproximateMonotonicTimeInternal(); 131 : void runPostCallbacks(); 132 : void runThreadLocalDelete(); 133 : 134 : // Helper used to touch the watchdog after most schedulable, fd, and timer callbacks. 135 : void touchWatchdog(); 136 : 137 : // Validate that an operation is thread safe, i.e. it's invoked on the same thread that the 138 : // dispatcher run loop is executing on. We allow run_tid_ to be empty for tests where we don't 139 : // invoke run(). 140 0 : bool isThreadSafe() const override { 141 0 : return run_tid_.isEmpty() || run_tid_ == thread_factory_.currentThreadId(); 142 0 : } 143 : 144 : const std::string name_; 145 : Thread::ThreadFactory& thread_factory_; 146 : TimeSource& time_source_; 147 : Filesystem::Instance& file_system_; 148 : std::string stats_prefix_; 149 : DispatcherStatsPtr stats_; 150 : Thread::ThreadId run_tid_; 151 : Buffer::WatermarkFactorySharedPtr buffer_factory_; 152 : LibeventScheduler base_scheduler_; 153 : SchedulerPtr scheduler_; 154 : 155 : SchedulableCallbackPtr thread_local_delete_cb_; 156 : Thread::MutexBasicLockable thread_local_deletable_lock_; 157 : // `deletables_in_dispatcher_thread` must be destroyed last to allow other callbacks populate. 158 : std::list<DispatcherThreadDeletableConstPtr> 159 : deletables_in_dispatcher_thread_ ABSL_GUARDED_BY(thread_local_deletable_lock_); 160 : bool shutdown_called_{false}; 161 : 162 : SchedulableCallbackPtr deferred_delete_cb_; 163 : 164 : SchedulableCallbackPtr post_cb_; 165 : Thread::MutexBasicLockable post_lock_; 166 : std::list<PostCb> post_callbacks_ ABSL_GUARDED_BY(post_lock_); 167 : 168 : std::vector<DeferredDeletablePtr> to_delete_1_; 169 : std::vector<DeferredDeletablePtr> to_delete_2_; 170 : std::vector<DeferredDeletablePtr>* current_to_delete_; 171 : 172 : absl::InlinedVector<const ScopeTrackedObject*, ExpectedMaxTrackedObjectStackDepth> 173 : tracked_object_stack_; 174 : bool deferred_deleting_{}; 175 : MonotonicTime approximate_monotonic_time_; 176 : WatchdogRegistrationPtr watchdog_registration_; 177 : const ScaledRangeTimerManagerPtr scaled_timer_manager_; 178 : }; 179 : 180 : } // namespace Event 181 : } // namespace Envoy