/src/swift-nio/Sources/NIOCore/EventLoop.swift
Line | Count | Source |
1 | | //===----------------------------------------------------------------------===// |
2 | | // |
3 | | // This source file is part of the SwiftNIO open source project |
4 | | // |
5 | | // Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors |
6 | | // Licensed under Apache License v2.0 |
7 | | // |
8 | | // See LICENSE.txt for license information |
9 | | // See CONTRIBUTORS.txt for the list of SwiftNIO project authors |
10 | | // |
11 | | // SPDX-License-Identifier: Apache-2.0 |
12 | | // |
13 | | //===----------------------------------------------------------------------===// |
14 | | |
15 | | import NIOConcurrencyHelpers |
16 | | |
17 | | #if canImport(Dispatch) |
18 | | import Dispatch |
19 | | #endif |
20 | | |
21 | | #if canImport(WASILibc) |
22 | | @preconcurrency import WASILibc |
23 | | import CNIOWASI |
24 | | #endif |
25 | | |
26 | | #if os(Linux) |
27 | | import CNIOLinux |
28 | | #endif // os(Linux) |
29 | | |
30 | | /// Returned once a task was scheduled on the `EventLoop` for later execution. |
31 | | /// |
32 | | /// A `Scheduled` allows the user to either `cancel()` the execution of the scheduled task (if possible) or obtain a reference to the `EventLoopFuture` that |
33 | | /// will be notified once the execution is complete. |
34 | | public struct Scheduled<T> { |
35 | | @usableFromInline typealias CancelationCallback = @Sendable () -> Void |
36 | | @usableFromInline let _promise: EventLoopPromise<T> |
37 | | @usableFromInline let _cancellationTask: CancelationCallback |
38 | | |
39 | | @inlinable |
40 | | @preconcurrency |
41 | 115k | public init(promise: EventLoopPromise<T>, cancellationTask: @escaping @Sendable () -> Void) { |
42 | 115k | self._promise = promise |
43 | 115k | self._cancellationTask = cancellationTask |
44 | 115k | } |
45 | | |
46 | | /// Try to cancel the execution of the scheduled task. |
47 | | /// |
48 | | /// Whether this is successful depends on whether the execution of the task already begun. |
49 | | /// This means that cancellation is not guaranteed. |
50 | | @inlinable |
51 | 0 | public func cancel() { |
52 | 0 | self._promise.fail(EventLoopError._cancelled) |
53 | 0 | self._cancellationTask() |
54 | 0 | } |
55 | | |
56 | | /// Returns the `EventLoopFuture` which will be notified once the execution of the scheduled task completes. |
57 | | @inlinable |
58 | 0 | public var futureResult: EventLoopFuture<T> { |
59 | 0 | self._promise.futureResult |
60 | 0 | } |
61 | | } |
62 | | |
63 | | extension Scheduled: Sendable {} |
64 | | |
65 | | /// Returned once a task was scheduled to be repeatedly executed on the `EventLoop`. |
66 | | /// |
67 | | /// A `RepeatedTask` allows the user to `cancel()` the repeated scheduling of further tasks. |
68 | | public final class RepeatedTask { |
69 | | typealias RepeatedTaskCallback = @Sendable (RepeatedTask) -> EventLoopFuture<Void> |
70 | | private let delay: TimeAmount |
71 | | private let eventLoop: EventLoop |
72 | | private let cancellationPromise: EventLoopPromise<Void>? |
73 | | private var scheduled: Optional<Scheduled<EventLoopFuture<Void>>> |
74 | | private var task: Optional<RepeatedTaskCallback> |
75 | | |
76 | | internal init( |
77 | | interval: TimeAmount, |
78 | | eventLoop: EventLoop, |
79 | | cancellationPromise: EventLoopPromise<Void>? = nil, |
80 | | task: @escaping RepeatedTaskCallback |
81 | 0 | ) { |
82 | 0 | self.delay = interval |
83 | 0 | self.eventLoop = eventLoop |
84 | 0 | self.cancellationPromise = cancellationPromise |
85 | 0 | self.task = task |
86 | 0 | self.scheduled = nil |
87 | 0 | } |
88 | | |
89 | 0 | internal func begin(in delay: TimeAmount) { |
90 | 0 | if self.eventLoop.inEventLoop { |
91 | 0 | self.begin0(in: delay) |
92 | 0 | } else { |
93 | 0 | self.eventLoop.execute { |
94 | 0 | self.begin0(in: delay) |
95 | 0 | } |
96 | 0 | } |
97 | 0 | } |
98 | | |
99 | 0 | private func begin0(in delay: TimeAmount) { |
100 | 0 | self.eventLoop.assertInEventLoop() |
101 | 0 | guard let task = self.task else { |
102 | 0 | return |
103 | 0 | } |
104 | 0 | self.scheduled = self.eventLoop.scheduleTask(in: delay) { |
105 | 0 | task(self) |
106 | 0 | } |
107 | 0 | self.reschedule() |
108 | 0 | } |
109 | | |
110 | | /// Try to cancel the execution of the repeated task. |
111 | | /// |
112 | | /// Whether the execution of the task is immediately canceled depends on whether the execution of a task has already begun. |
113 | | /// This means immediate cancellation is not guaranteed. |
114 | | /// |
115 | | /// The safest way to cancel is by using the passed reference of `RepeatedTask` inside the task closure. |
116 | | /// |
117 | | /// If the promise parameter is not `nil`, the passed promise is fulfilled when cancellation is complete. |
118 | | /// Passing a promise does not prevent fulfillment of any promise provided on original task creation. |
119 | 0 | public func cancel(promise: EventLoopPromise<Void>? = nil) { |
120 | 0 | if self.eventLoop.inEventLoop { |
121 | 0 | self.cancel0(localCancellationPromise: promise) |
122 | 0 | } else { |
123 | 0 | self.eventLoop.execute { |
124 | 0 | self.cancel0(localCancellationPromise: promise) |
125 | 0 | } |
126 | 0 | } |
127 | 0 | } |
128 | | |
129 | 0 | private func cancel0(localCancellationPromise: EventLoopPromise<Void>?) { |
130 | 0 | self.eventLoop.assertInEventLoop() |
131 | 0 | self.scheduled?.cancel() |
132 | 0 | self.scheduled = nil |
133 | 0 | self.task = nil |
134 | 0 |
|
135 | 0 | // Possible states at this time are: |
136 | 0 | // 1) Task is scheduled but has not yet executed. |
137 | 0 | // 2) Task is currently executing and invoked `cancel()` on itself. |
138 | 0 | // 3) Task is currently executing and `cancel0()` has been reentrantly invoked. |
139 | 0 | // 4) NOT VALID: Task is currently executing and has NOT invoked `cancel()` (`EventLoop` guarantees serial execution) |
140 | 0 | // 5) NOT VALID: Task has completed execution in a success state (`reschedule()` ensures state #2). |
141 | 0 | // 6) Task has completed execution in a failure state. |
142 | 0 | // 7) Task has been fully cancelled at a previous time. |
143 | 0 | // |
144 | 0 | // It is desirable that the task has fully completed any execution before any cancellation promise is |
145 | 0 | // fulfilled. States 2 and 3 occur during execution, so the requirement is implemented by deferring |
146 | 0 | // fulfillment to the next `EventLoop` cycle. The delay is harmless to other states and distinguishing |
147 | 0 | // them from 2 and 3 is not practical (or necessarily possible), so is used unconditionally. Check the |
148 | 0 | // promises for nil so as not to otherwise invoke `execute()` unnecessarily. |
149 | 0 | if self.cancellationPromise != nil || localCancellationPromise != nil { |
150 | 0 | self.eventLoop.execute { |
151 | 0 | self.cancellationPromise?.succeed(()) |
152 | 0 | localCancellationPromise?.succeed(()) |
153 | 0 | } |
154 | 0 | } |
155 | 0 | } |
156 | | |
157 | 0 | private func reschedule() { |
158 | 0 | self.eventLoop.assertInEventLoop() |
159 | 0 | guard let scheduled = self.scheduled else { |
160 | 0 | return |
161 | 0 | } |
162 | 0 |
|
163 | 0 | scheduled.futureResult.whenSuccess { future in |
164 | 0 | future.hop(to: self.eventLoop).whenComplete { (_: Result<Void, Error>) in |
165 | 0 | self.reschedule0() |
166 | 0 | } |
167 | 0 | } |
168 | 0 |
|
169 | 0 | scheduled.futureResult.whenFailure { (_: Error) in |
170 | 0 | self.cancel0(localCancellationPromise: nil) |
171 | 0 | } |
172 | 0 | } |
173 | | |
174 | 0 | private func reschedule0() { |
175 | 0 | self.eventLoop.assertInEventLoop() |
176 | 0 | guard self.task != nil else { |
177 | 0 | return |
178 | 0 | } |
179 | 0 | self.scheduled = self.eventLoop.scheduleTask(in: self.delay) { |
180 | 0 | // we need to repeat this as we might have been cancelled in the meantime |
181 | 0 | guard let task = self.task else { |
182 | 0 | return self.eventLoop.makeSucceededFuture(()) |
183 | 0 | } |
184 | 0 | return task(self) |
185 | 0 | } |
186 | 0 | self.reschedule() |
187 | 0 | } |
188 | | } |
189 | | |
190 | | extension RepeatedTask: @unchecked Sendable {} |
191 | | |
192 | | /// An iterator over the `EventLoop`s forming an `EventLoopGroup`. |
193 | | /// |
194 | | /// Usually returned by an `EventLoopGroup`'s `makeIterator()` method. |
195 | | /// |
196 | | /// let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) |
197 | | /// group.makeIterator().forEach { loop in |
198 | | /// // Do something with each loop |
199 | | /// } |
200 | | /// |
201 | | public struct EventLoopIterator: Sequence, IteratorProtocol { |
202 | | public typealias Element = EventLoop |
203 | | private var eventLoops: IndexingIterator<[EventLoop]> |
204 | | |
205 | | /// Create an `EventLoopIterator` from an array of `EventLoop`s. |
206 | 0 | public init(_ eventLoops: [EventLoop]) { |
207 | 0 | self.eventLoops = eventLoops.makeIterator() |
208 | 0 | } |
209 | | |
210 | | /// Advances to the next `EventLoop` and returns it, or `nil` if no next element exists. |
211 | | /// |
212 | | /// - Returns: The next `EventLoop` if a next element exists; otherwise, `nil`. |
213 | 0 | public mutating func next() -> EventLoop? { |
214 | 0 | self.eventLoops.next() |
215 | 0 | } |
216 | | } |
217 | | |
218 | | extension EventLoopIterator: Sendable {} |
219 | | |
220 | | /// An EventLoop processes IO / tasks in an endless loop for `Channel`s until it's closed. |
221 | | /// |
222 | | /// Usually multiple `Channel`s share the same `EventLoop` for processing IO / tasks and so share the same processing `NIOThread`. |
223 | | /// For a better understanding of how such an `EventLoop` works internally the following pseudo code may be helpful: |
224 | | /// |
225 | | /// ``` |
226 | | /// while eventLoop.isOpen { |
227 | | /// /// Block until there is something to process for 1...n Channels |
228 | | /// let readyChannels = blockUntilIoOrTasksAreReady() |
229 | | /// /// Loop through all the Channels |
230 | | /// for channel in readyChannels { |
231 | | /// /// Process IO and / or tasks for the Channel. |
232 | | /// /// This may include things like: |
233 | | /// /// - accept new connection |
234 | | /// /// - connect to a remote host |
235 | | /// /// - read from socket |
236 | | /// /// - write to socket |
237 | | /// /// - tasks that were submitted via EventLoop methods |
238 | | /// /// and others. |
239 | | /// processIoAndTasks(channel) |
240 | | /// } |
241 | | /// } |
242 | | /// ``` |
243 | | /// |
244 | | /// Because an `EventLoop` may be shared between multiple `Channel`s it's important to _NOT_ block while processing IO / tasks. This also includes long running computations which will have the same |
245 | | /// effect as blocking in this case. |
246 | | public protocol EventLoop: EventLoopGroup { |
247 | | /// Returns `true` if the current `NIOThread` is the same as the `NIOThread` that is tied to this `EventLoop`. `false` otherwise. |
248 | | /// |
249 | | /// This method is intended principally as an optimization point, allowing users to write code that can provide fast-paths when this |
250 | | /// property is true. It is _not suitable_ as a correctness guard. Code must be correct if this value returns `false` _even if_ |
251 | | /// the event loop context is actually held. That's because this property is allowed to produce false-negatives |
252 | | /// |
253 | | /// Implementers may implement this method in a way that may produce false-negatives: that is, this value may return `false` in |
254 | | /// cases where the code in question actually is executing on the event loop. It may _never_ produce false positives: this value |
255 | | /// must never return `true` when event loop context is not actually held. |
256 | | /// |
257 | | /// If it is necessary for correctness to confirm that you're on an event loop, prefer ``preconditionInEventLoop(file:line:)-7ukrq``. |
258 | | var inEventLoop: Bool { get } |
259 | | |
260 | | /// Submit a given task to be executed by the `EventLoop` |
261 | | @preconcurrency |
262 | | func execute(_ task: @escaping @Sendable () -> Void) |
263 | | |
264 | | /// Submit a given task to be executed by the `EventLoop`. Once the execution is complete the returned `EventLoopFuture` is notified. |
265 | | /// |
266 | | /// - Parameters: |
267 | | /// - task: The closure that will be submitted to the `EventLoop` for execution. |
268 | | /// - Returns: `EventLoopFuture` that is notified once the task was executed. |
269 | | @preconcurrency |
270 | | func submit<T>(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture<T> |
271 | | |
272 | | /// The current time of the event loop. |
273 | | var now: NIODeadline { get } |
274 | | |
275 | | /// Schedule a `task` that is executed by this `EventLoop` at the given time. |
276 | | /// |
277 | | /// - Parameters: |
278 | | /// - deadline: The instant in time before which the task will not execute. |
279 | | /// - task: The synchronous task to run. As with everything that runs on the `EventLoop`, it must not block. |
280 | | /// - Returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait |
281 | | /// on the completion of the task. |
282 | | /// |
283 | | /// - Note: You can only cancel a task before it has started executing. |
284 | | @discardableResult |
285 | | @preconcurrency |
286 | | func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping @Sendable () throws -> T) -> Scheduled<T> |
287 | | |
288 | | /// Schedule a `task` that is executed by this `EventLoop` after the given amount of time. |
289 | | /// |
290 | | /// - Parameters: |
291 | | /// - in: The amount of time before which the task will not execute. |
292 | | /// - task: The synchronous task to run. As with everything that runs on the `EventLoop`, it must not block. |
293 | | /// - Returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait |
294 | | /// on the completion of the task. |
295 | | /// |
296 | | /// - Note: You can only cancel a task before it has started executing. |
297 | | /// - Note: The `in` value is clamped to a maximum when running on a Darwin-kernel. |
298 | | @discardableResult |
299 | | @preconcurrency |
300 | | func scheduleTask<T>(in: TimeAmount, _ task: @escaping @Sendable () throws -> T) -> Scheduled<T> |
301 | | |
302 | | /// Asserts that the current thread is the one tied to this `EventLoop`. |
303 | | /// Otherwise, the process will be abnormally terminated as per the semantics of `preconditionFailure(_:file:line:)`. |
304 | | /// |
305 | | /// This method may never produce false positives or false negatives in conforming implementations. It may never |
306 | | /// terminate the process when event loop context is actually held, and it may never fail to terminate the process |
307 | | /// when event loop context is not held. |
308 | | func preconditionInEventLoop(file: StaticString, line: UInt) |
309 | | |
310 | | /// Asserts that the current thread is _not_ the one tied to this `EventLoop`. |
311 | | /// Otherwise, the process will be abnormally terminated as per the semantics of `preconditionFailure(_:file:line:)`. |
312 | | /// |
313 | | /// This method may never produce false positives or false negatives in conforming implementations. It may never |
314 | | /// terminate the process when event loop context is not held, and it may never fail to terminate the process |
315 | | /// when event loop context is held. |
316 | | func preconditionNotInEventLoop(file: StaticString, line: UInt) |
317 | | |
318 | | /// Return a succeeded `Void` future. |
319 | | /// |
320 | | /// Semantically, this function is equivalent to calling `makeSucceededFuture(())`. |
321 | | /// Contrary to `makeSucceededFuture`, `makeSucceededVoidFuture` is a customization point for `EventLoop`s which |
322 | | /// allows `EventLoop`s to cache a pre-succeeded `Void` future to prevent superfluous allocations. |
323 | | func makeSucceededVoidFuture() -> EventLoopFuture<Void> |
324 | | |
325 | | /// Returns a `SerialExecutor` corresponding to this ``EventLoop``. |
326 | | /// |
327 | | /// This executor can be used to isolate an actor to a given ``EventLoop``. Implementers are encouraged to customise |
328 | | /// this implementation by conforming their ``EventLoop`` to ``NIOSerialEventLoopExecutor`` which will provide an |
329 | | /// optimised implementation of this method, and will conform their type to `SerialExecutor`. The default |
330 | | /// implementation provides suboptimal performance. |
331 | | @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) |
332 | | var executor: any SerialExecutor { get } |
333 | | |
334 | | /// Submit a job to be executed by the `EventLoop` |
335 | | @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) |
336 | | func enqueue(_ job: consuming ExecutorJob) |
337 | | |
338 | | /// Must crash if it is not safe to call `wait()` on an `EventLoopFuture`. |
339 | | /// |
340 | | /// This method is a debugging hook that can be used to override the behaviour of `EventLoopFuture.wait()` when called. |
341 | | /// By default this simply becomes `preconditionNotInEventLoop`, but some `EventLoop`s are capable of more exhaustive |
342 | | /// checking and can validate that the wait is not occurring on an entire `EventLoopGroup`, or even more broadly. |
343 | | /// |
344 | | /// This method should not be called by users directly, it should only be implemented by `EventLoop` implementers that |
345 | | /// need to customise the behaviour. |
346 | | func _preconditionSafeToWait(file: StaticString, line: UInt) |
347 | | |
348 | | /// Debug hook: track a promise creation and its location. |
349 | | /// |
350 | | /// This debug hook is called by EventLoopFutures and EventLoopPromises when they are created, and tracks the location |
351 | | /// of their creation. It combines with `_promiseCompleted` to provide high-fidelity diagnostics for debugging leaked |
352 | | /// promises. |
353 | | /// |
354 | | /// In release mode, this function will never be called. |
355 | | /// |
356 | | /// It is valid for an `EventLoop` not to implement any of the two `_promise` functions. If either of them are implemented, |
357 | | /// however, both of them should be implemented. |
358 | | func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) |
359 | | |
360 | | /// Debug hook: complete a specific promise and return its creation location. |
361 | | /// |
362 | | /// This debug hook is called by EventLoopFutures and EventLoopPromises when they are deinited, and removes the data from |
363 | | /// the promise tracking map and, if available, provides that data as its return value. It combines with `_promiseCreated` |
364 | | /// to provide high-fidelity diagnostics for debugging leaked promises. |
365 | | /// |
366 | | /// In release mode, this function will never be called. |
367 | | /// |
368 | | /// It is valid for an `EventLoop` not to implement any of the two `_promise` functions. If either of them are implemented, |
369 | | /// however, both of them should be implemented. |
370 | | func _promiseCompleted(futureIdentifier: _NIOEventLoopFutureIdentifier) -> (file: StaticString, line: UInt)? |
371 | | |
372 | | /// Schedule a callback at a given time. |
373 | | /// |
374 | | /// - Parameters: |
375 | | /// - deadline: The instant in time before which the task will not execute. |
376 | | /// - handler: The handler that defines the behavior of the callback when executed or canceled. |
377 | | /// - Returns: A ``NIOScheduledCallback`` that can be used to cancel the scheduled callback. |
378 | | /// |
379 | | /// - NOTE: Event loops that provide a custom scheduled callback implementation **must** also implement |
380 | | /// `cancelScheduledCallback`. Failure to do so will result in a runtime error. |
381 | | @preconcurrency |
382 | | @discardableResult |
383 | | func scheduleCallback( |
384 | | at deadline: NIODeadline, |
385 | | handler: some (NIOScheduledCallbackHandler & Sendable) |
386 | | ) throws -> NIOScheduledCallback |
387 | | |
388 | | /// Schedule a callback after given time. |
389 | | /// |
390 | | /// - Parameters: |
391 | | /// - amount: The amount of time before which the task will not execute. |
392 | | /// - handler: The handler that defines the behavior of the callback when executed or canceled. |
393 | | /// - Returns: A ``NIOScheduledCallback`` that can be used to cancel the scheduled callback. |
394 | | /// |
395 | | /// - NOTE: Event loops that provide a custom scheduled callback implementation **must** also implement |
396 | | /// `cancelScheduledCallback`. Failure to do so will result in a runtime error. |
397 | | @preconcurrency |
398 | | @discardableResult |
399 | | func scheduleCallback( |
400 | | in amount: TimeAmount, |
401 | | handler: some (NIOScheduledCallbackHandler & Sendable) |
402 | | ) throws -> NIOScheduledCallback |
403 | | |
404 | | /// Cancel a scheduled callback. |
405 | | /// |
406 | | /// - NOTE: Event loops only need to implemented this if they provide a custom scheduled callback implementation. |
407 | | func cancelScheduledCallback(_ scheduledCallback: NIOScheduledCallback) |
408 | | |
409 | | /// Submit a given task to be executed by the ``EventLoop``, from a context where the caller |
410 | | /// statically knows that the context is isolated. |
411 | | /// |
412 | | /// This is an optional performance hook. ``EventLoop`` implementers are not required to implement |
413 | | /// this witness, but may choose to do so to enable better performance of the isolated EL views. If |
414 | | /// they do so, ``EventLoop/Isolated/execute`` will perform better. |
415 | | func _executeIsolatedUnsafeUnchecked(_ task: @escaping () -> Void) |
416 | | |
417 | | /// Submit a given task to be executed by the ``EventLoop```, from a context where the caller |
418 | | /// statically knows that the context is isolated. |
419 | | /// |
420 | | /// Once the execution is complete the returned ``EventLoopFuture`` is notified. |
421 | | /// |
422 | | /// This is an optional performance hook. ``EventLoop`` implementers are not required to implement |
423 | | /// this witness, but may choose to do so to enable better performance of the isolated EL views. If |
424 | | /// they do so, ``EventLoop/Isolated/submit`` will perform better. |
425 | | /// |
426 | | /// - Parameters: |
427 | | /// - task: The closure that will be submitted to the ``EventLoop`` for execution. |
428 | | /// - Returns: ``EventLoopFuture`` that is notified once the task was executed. |
429 | | func _submitIsolatedUnsafeUnchecked<T>(_ task: @escaping () throws -> T) -> EventLoopFuture<T> |
430 | | |
431 | | /// Schedule a `task` that is executed by this ``EventLoop`` at the given time, from a context where the caller |
432 | | /// statically knows that the context is isolated. |
433 | | /// |
434 | | /// This is an optional performance hook. ``EventLoop`` implementers are not required to implement |
435 | | /// this witness, but may choose to do so to enable better performance of the isolated EL views. If |
436 | | /// they do so, ``EventLoop/Isolated/scheduleTask(deadline:_:)`` will perform better. |
437 | | /// |
438 | | /// - Parameters: |
439 | | /// - deadline: The instant in time before which the task will not execute. |
440 | | /// - task: The synchronous task to run. As with everything that runs on the ``EventLoop```, it must not block. |
441 | | /// - Returns: A ``Scheduled``` object which may be used to cancel the task if it has not yet run, or to wait |
442 | | /// on the completion of the task. |
443 | | /// |
444 | | /// - Note: You can only cancel a task before it has started executing. |
445 | | @discardableResult |
446 | | func _scheduleTaskIsolatedUnsafeUnchecked<T>( |
447 | | deadline: NIODeadline, |
448 | | _ task: @escaping () throws -> T |
449 | | ) -> Scheduled<T> |
450 | | |
451 | | /// Schedule a `task` that is executed by this ``EventLoop`` after the given amount of time, from a context where the caller |
452 | | /// statically knows that the context is isolated. |
453 | | /// |
454 | | /// This is an optional performance hook. ``EventLoop`` implementers are not required to implement |
455 | | /// this witness, but may choose to do so to enable better performance of the isolated EL views. If |
456 | | /// they do so, ``EventLoop/Isolated/scheduleTask(in:_:)`` will perform better. |
457 | | /// |
458 | | /// - Parameters: |
459 | | /// - in: The amount of time before which the task will not execute. |
460 | | /// - task: The synchronous task to run. As with everything that runs on the ``EventLoop``, it must not block. |
461 | | /// - Returns: A ``Scheduled`` object which may be used to cancel the task if it has not yet run, or to wait |
462 | | /// on the completion of the task. |
463 | | /// |
464 | | /// - Note: You can only cancel a task before it has started executing. |
465 | | /// - Note: The `in` value is clamped to a maximum when running on a Darwin-kernel. |
466 | | @discardableResult |
467 | | func _scheduleTaskIsolatedUnsafeUnchecked<T>( |
468 | | in: TimeAmount, |
469 | | _ task: @escaping () throws -> T |
470 | | ) -> Scheduled<T> |
471 | | |
472 | | /// Schedule a callback that is executed by this ``EventLoop`` at a given time, from a context where the caller |
473 | | /// statically knows that the context is isolated. |
474 | | /// |
475 | | /// This is an optional performance hook. ``EventLoop`` implementers are not required to implement |
476 | | /// this witness, but may choose to do so to enable better performance of the isolated EL views. If |
477 | | /// they do so, ``EventLoop/Isolated/scheduleCallback(at:_:)`` will perform better. |
478 | | /// |
479 | | /// - Parameters: |
480 | | /// - at: The instant in time before which the task will not execute. |
481 | | /// - handler: The handler that defines the behavior of the callback when executed or canceled. |
482 | | /// - Returns: A ``NIOScheduledCallback`` that can be used to cancel the scheduled callback. |
483 | | @discardableResult |
484 | | func _scheduleCallbackIsolatedUnsafeUnchecked( |
485 | | at deadline: NIODeadline, |
486 | | handler: some NIOScheduledCallbackHandler |
487 | | ) throws -> NIOScheduledCallback |
488 | | |
489 | | /// Schedule a callback that is executed by this ``EventLoop`` after a given time, from a context where the caller |
490 | | /// statically knows that the context is isolated. |
491 | | /// |
492 | | /// This is an optional performance hook. ``EventLoop`` implementers are not required to implement |
493 | | /// this witness, but may choose to do so to enable better performance of the isolated EL views. If |
494 | | /// they do so, ``EventLoop/Isolated/scheduleCallback(in:_:)`` will perform better. |
495 | | /// |
496 | | /// - Parameters: |
497 | | /// - in: The amount of time before which the task will not execute. |
498 | | /// - handler: The handler that defines the behavior of the callback when executed or canceled. |
499 | | /// - Returns: A ``NIOScheduledCallback`` that can be used to cancel the scheduled callback. |
500 | | @discardableResult |
501 | | func _scheduleCallbackIsolatedUnsafeUnchecked( |
502 | | in amount: TimeAmount, |
503 | | handler: some NIOScheduledCallbackHandler |
504 | | ) throws -> NIOScheduledCallback |
505 | | } |
506 | | |
507 | | extension EventLoop { |
508 | | /// Default implementation of `now`: Returns `NIODeadline.now()`. |
509 | 0 | public var now: NIODeadline { .now() } |
510 | | } |
511 | | |
512 | | extension EventLoop { |
513 | | /// Default implementation of `makeSucceededVoidFuture`: Return a fresh future (which will allocate). |
514 | 67.7k | public func makeSucceededVoidFuture() -> EventLoopFuture<Void> { |
515 | 67.7k | EventLoopFuture(eventLoop: self, value: ()) |
516 | 67.7k | } |
517 | | |
518 | 0 | public func _preconditionSafeToWait(file: StaticString, line: UInt) { |
519 | 0 | self.preconditionNotInEventLoop(file: file, line: line) |
520 | 0 | } |
521 | | |
522 | | /// Default implementation of `_promiseCreated`: does nothing. |
523 | 0 | public func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) { |
524 | 0 | return |
525 | 0 | } |
526 | | |
527 | | /// Default implementation of `_promiseCompleted`: does nothing. |
528 | | public func _promiseCompleted(futureIdentifier: _NIOEventLoopFutureIdentifier) -> (file: StaticString, line: UInt)? |
529 | 0 | { |
530 | 0 | nil |
531 | 0 | } |
532 | | |
533 | | /// Default implementation: wraps the task in an UnsafeTransfer. |
534 | | @inlinable |
535 | 0 | public func _executeIsolatedUnsafeUnchecked(_ task: @escaping () -> Void) { |
536 | 0 | self.assertInEventLoop() |
537 | 0 | let unsafeTransfer = UnsafeTransfer(task) |
538 | 0 | self.execute { |
539 | 0 | unsafeTransfer.wrappedValue() |
540 | 0 | } |
541 | 0 | } |
542 | | |
543 | | /// Default implementation: wraps the task in an UnsafeTransfer. |
544 | | @inlinable |
545 | 0 | public func _submitIsolatedUnsafeUnchecked<T>(_ task: @escaping () throws -> T) -> EventLoopFuture<T> { |
546 | 0 | self.assertInEventLoop() |
547 | 0 | let unsafeTransfer = UnsafeTransfer(task) |
548 | 0 | return self.submit { |
549 | 0 | try unsafeTransfer.wrappedValue() |
550 | 0 | } |
551 | 0 | } |
552 | | |
553 | | /// Default implementation: wraps the task in an UnsafeTransfer. |
554 | | @inlinable |
555 | | @discardableResult |
556 | | public func _scheduleTaskIsolatedUnsafeUnchecked<T>( |
557 | | deadline: NIODeadline, |
558 | | _ task: @escaping () throws -> T |
559 | 0 | ) -> Scheduled<T> { |
560 | 0 | self.assertInEventLoop() |
561 | 0 | let unsafeTransfer = UnsafeTransfer(task) |
562 | 0 | return self.scheduleTask(deadline: deadline) { |
563 | 0 | try unsafeTransfer.wrappedValue() |
564 | 0 | } |
565 | 0 | } |
566 | | |
567 | | /// Default implementation: wraps the task in an UnsafeTransfer. |
568 | | @inlinable |
569 | | @discardableResult |
570 | | public func _scheduleTaskIsolatedUnsafeUnchecked<T>( |
571 | | in delay: TimeAmount, |
572 | | _ task: @escaping () throws -> T |
573 | 0 | ) -> Scheduled<T> { |
574 | 0 | self.assertInEventLoop() |
575 | 0 | let unsafeTransfer = UnsafeTransfer(task) |
576 | 0 | return self.scheduleTask(in: delay) { |
577 | 0 | try unsafeTransfer.wrappedValue() |
578 | 0 | } |
579 | 0 | } |
580 | | |
581 | | @inlinable |
582 | | @discardableResult |
583 | | public func _scheduleCallbackIsolatedUnsafeUnchecked( |
584 | | at deadline: NIODeadline, |
585 | | handler: some NIOScheduledCallbackHandler |
586 | 0 | ) throws -> NIOScheduledCallback { |
587 | 0 | let unsafeHandlerWrapper = LoopBoundScheduledCallbackHandlerWrapper(wrapping: handler, eventLoop: self) |
588 | 0 | return try self.scheduleCallback(at: deadline, handler: unsafeHandlerWrapper) |
589 | 0 | } |
590 | | |
591 | | @inlinable |
592 | | @discardableResult |
593 | | public func _scheduleCallbackIsolatedUnsafeUnchecked( |
594 | | in amount: TimeAmount, |
595 | | handler: some NIOScheduledCallbackHandler |
596 | 0 | ) throws -> NIOScheduledCallback { |
597 | 0 | let unsafeHandlerWrapper = LoopBoundScheduledCallbackHandlerWrapper(wrapping: handler, eventLoop: self) |
598 | 0 | return try self.scheduleCallback(in: amount, handler: unsafeHandlerWrapper) |
599 | 0 | } |
600 | | } |
601 | | |
602 | | extension EventLoop { |
603 | | @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) |
604 | 0 | public var executor: any SerialExecutor { |
605 | 0 | NIODefaultSerialEventLoopExecutor(self) |
606 | 0 | } |
607 | | |
608 | | @inlinable |
609 | | @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) |
610 | 0 | public func enqueue(_ job: consuming ExecutorJob) { |
611 | 0 | // By default we are just going to use execute to run the job |
612 | 0 | // this is quite heavy since it allocates the closure for |
613 | 0 | // every single job. |
614 | 0 | let unownedJob = UnownedJob(job) |
615 | 0 | self.execute { |
616 | 0 | unownedJob.runSynchronously(on: self.executor.asUnownedSerialExecutor()) |
617 | 0 | } |
618 | 0 | } |
619 | | } |
620 | | |
621 | | extension EventLoopGroup { |
622 | 0 | public var description: String { |
623 | 0 | String(describing: self) |
624 | 0 | } |
625 | | } |
626 | | |
627 | | /// Represents a time _interval_. |
628 | | /// |
629 | | /// - Note: `TimeAmount` should not be used to represent a point in time. |
630 | | public struct TimeAmount: Hashable, Sendable { |
631 | | @available(*, deprecated, message: "This typealias doesn't serve any purpose. Please use Int64 directly.") |
632 | | public typealias Value = Int64 |
633 | | |
634 | | /// The nanoseconds representation of the `TimeAmount`. |
635 | | public let nanoseconds: Int64 |
636 | | |
637 | | @inlinable |
638 | 0 | init(_ nanoseconds: Int64) { |
639 | 0 | self.nanoseconds = nanoseconds |
640 | 0 | } |
641 | | |
642 | | /// Creates a new `TimeAmount` for the given amount of nanoseconds. |
643 | | /// |
644 | | /// - Parameters: |
645 | | /// - amount: the amount of nanoseconds this `TimeAmount` represents. |
646 | | /// - Returns: the `TimeAmount` for the given amount. |
647 | | @inlinable |
648 | 0 | public static func nanoseconds(_ amount: Int64) -> TimeAmount { |
649 | 0 | TimeAmount(amount) |
650 | 0 | } |
651 | | |
652 | | /// Creates a new `TimeAmount` for the given amount of microseconds. |
653 | | /// |
654 | | /// - Parameters: |
655 | | /// - amount: the amount of microseconds this `TimeAmount` represents. |
656 | | /// - Returns: the `TimeAmount` for the given amount. |
657 | | /// |
658 | | /// - Note: returns `TimeAmount(.max)` if the amount overflows when converted to nanoseconds and `TimeAmount(.min)` if it underflows. |
659 | | @inlinable |
660 | 0 | public static func microseconds(_ amount: Int64) -> TimeAmount { |
661 | 0 | TimeAmount(_cappedNanoseconds(amount: amount, multiplier: 1000)) |
662 | 0 | } |
663 | | |
664 | | /// Creates a new `TimeAmount` for the given amount of milliseconds. |
665 | | /// |
666 | | /// - Parameters: |
667 | | /// - amount: the amount of milliseconds this `TimeAmount` represents. |
668 | | /// - Returns: the `TimeAmount` for the given amount. |
669 | | /// |
670 | | /// - Note: returns `TimeAmount(.max)` if the amount overflows when converted to nanoseconds and `TimeAmount(.min)` if it underflows. |
671 | | @inlinable |
672 | 0 | public static func milliseconds(_ amount: Int64) -> TimeAmount { |
673 | 0 | TimeAmount(_cappedNanoseconds(amount: amount, multiplier: 1000 * 1000)) |
674 | 0 | } |
675 | | |
676 | | /// Creates a new `TimeAmount` for the given amount of seconds. |
677 | | /// |
678 | | /// - Parameters: |
679 | | /// - amount: the amount of seconds this `TimeAmount` represents. |
680 | | /// - Returns: the `TimeAmount` for the given amount. |
681 | | /// |
682 | | /// - Note: returns `TimeAmount(.max)` if the amount overflows when converted to nanoseconds and `TimeAmount(.min)` if it underflows. |
683 | | @inlinable |
684 | 0 | public static func seconds(_ amount: Int64) -> TimeAmount { |
685 | 0 | TimeAmount(_cappedNanoseconds(amount: amount, multiplier: 1000 * 1000 * 1000)) |
686 | 0 | } |
687 | | |
688 | | /// Creates a new `TimeAmount` for the given amount of minutes. |
689 | | /// |
690 | | /// - Parameters: |
691 | | /// - amount: the amount of minutes this `TimeAmount` represents. |
692 | | /// - Returns: the `TimeAmount` for the given amount. |
693 | | /// |
694 | | /// - Note: returns `TimeAmount(.max)` if the amount overflows when converted to nanoseconds and `TimeAmount(.min)` if it underflows. |
695 | | @inlinable |
696 | 0 | public static func minutes(_ amount: Int64) -> TimeAmount { |
697 | 0 | TimeAmount(_cappedNanoseconds(amount: amount, multiplier: 1000 * 1000 * 1000 * 60)) |
698 | 0 | } |
699 | | |
700 | | /// Creates a new `TimeAmount` for the given amount of hours. |
701 | | /// |
702 | | /// - Parameters: |
703 | | /// - amount: the amount of hours this `TimeAmount` represents. |
704 | | /// - Returns: the `TimeAmount` for the given amount. |
705 | | /// |
706 | | /// - Note: returns `TimeAmount(.max)` if the amount overflows when converted to nanoseconds and `TimeAmount(.min)` if it underflows. |
707 | | @inlinable |
708 | 0 | public static func hours(_ amount: Int64) -> TimeAmount { |
709 | 0 | TimeAmount(_cappedNanoseconds(amount: amount, multiplier: 1000 * 1000 * 1000 * 60 * 60)) |
710 | 0 | } |
711 | | |
712 | | /// Converts `amount` to nanoseconds multiplying it by `multiplier`. The return value is capped to `Int64.max` if the multiplication overflows and `Int64.min` if it underflows. |
713 | | /// |
714 | | /// - parameters: |
715 | | /// - amount: the amount to be converted to nanoseconds. |
716 | | /// - multiplier: the multiplier that converts the given amount to nanoseconds. |
717 | | /// - returns: the amount converted to nanoseconds within [Int64.min, Int64.max]. |
718 | | @inlinable |
719 | 0 | static func _cappedNanoseconds(amount: Int64, multiplier: Int64) -> Int64 { |
720 | 0 | let nanosecondsMultiplication = amount.multipliedReportingOverflow(by: multiplier) |
721 | 0 | if nanosecondsMultiplication.overflow { |
722 | 0 | return amount >= 0 ? .max : .min |
723 | 0 | } else { |
724 | 0 | return nanosecondsMultiplication.partialValue |
725 | 0 | } |
726 | 0 | } |
727 | | } |
728 | | |
729 | | /// Contains the logic for parsing time amounts from strings, |
730 | | /// and printing pretty strings to represent time amounts. |
731 | | extension TimeAmount: CustomStringConvertible { |
732 | | |
733 | | /// Errors thrown when parsint a TimeAmount from a string |
734 | | internal enum ValidationError: Error, Equatable { |
735 | | /// Can't parse the provided unit |
736 | | case unsupportedUnit(String) |
737 | | |
738 | | /// Can't parse the number into a Double |
739 | | case invalidNumber(String) |
740 | | } |
741 | | |
742 | | /// Creates a TimeAmount from a string representation with an optional default unit. |
743 | | /// |
744 | | /// Supports formats like: |
745 | | /// - "5s" (5 seconds) |
746 | | /// - "100ms" (100 milliseconds) |
747 | | /// - "42" (42 of default unit) |
748 | | /// - "1 hr" (1 hour) |
749 | | /// |
750 | | /// This function only supports one pair of the number and units, i.e. "5s" or "100ms" but not "5s 100ms". |
751 | | /// |
752 | | /// Supported units: |
753 | | /// - h, hr, hrs (hours) |
754 | | /// - m, min (minutes) |
755 | | /// - s, sec, secs (seconds) |
756 | | /// - ms, millis (milliseconds) |
757 | | /// - us, µs, micros (microseconds) |
758 | | /// - ns, nanos (nanoseconds) |
759 | | /// |
760 | | /// - Parameters: |
761 | | /// - userProvidedString: The string to parse |
762 | | /// |
763 | | /// - Throws: ValidationError if the string cannot be parsed |
764 | 0 | public init(_ userProvidedString: String) throws { |
765 | 0 | let lowercased = String(userProvidedString.filter { !$0.isWhitespace }).lowercased() |
766 | 0 | let parsedNumbers = lowercased.prefix(while: { $0.isWholeNumber || $0 == "," || $0 == "." }) |
767 | 0 | let parsedUnit = String(lowercased.dropFirst(parsedNumbers.count)) |
768 | 0 |
|
769 | 0 | guard let numbers = Int64(parsedNumbers) else { |
770 | 0 | throw ValidationError.invalidNumber("'\(userProvidedString)' cannot be parsed as number and unit") |
771 | 0 | } |
772 | 0 |
|
773 | 0 | switch parsedUnit { |
774 | 0 | case "h", "hr", "hrs": |
775 | 0 | self = .hours(numbers) |
776 | 0 | case "m", "min": |
777 | 0 | self = .minutes(numbers) |
778 | 0 | case "s", "sec", "secs": |
779 | 0 | self = .seconds(numbers) |
780 | 0 | case "ms", "millis": |
781 | 0 | self = .milliseconds(numbers) |
782 | 0 | case "us", "µs", "micros": |
783 | 0 | self = .microseconds(numbers) |
784 | 0 | case "ns", "nanos": |
785 | 0 | self = .nanoseconds(numbers) |
786 | 0 | default: |
787 | 0 | throw ValidationError.unsupportedUnit("Unknown unit '\(parsedUnit)' in '\(userProvidedString)'") |
788 | 0 | } |
789 | 0 | } |
790 | | |
791 | | /// Returns a human-readable string representation of the time amount |
792 | | /// using the most appropriate unit |
793 | 0 | public var description: String { |
794 | 0 | let fullNS = self.nanoseconds |
795 | 0 | let (fullUS, remUS) = fullNS.quotientAndRemainder(dividingBy: 1_000) |
796 | 0 | let (fullMS, remMS) = fullNS.quotientAndRemainder(dividingBy: 1_000_000) |
797 | 0 | let (fullS, remS) = fullNS.quotientAndRemainder(dividingBy: 1_000_000_000) |
798 | 0 |
|
799 | 0 | if remS == 0 { |
800 | 0 | return "\(fullS) s" |
801 | 0 | } else if remMS == 0 { |
802 | 0 | return "\(fullMS) ms" |
803 | 0 | } else if remUS == 0 { |
804 | 0 | return "\(fullUS) us" |
805 | 0 | } else { |
806 | 0 | return "\(fullNS) ns" |
807 | 0 | } |
808 | 0 | } |
809 | | } |
810 | | |
811 | | extension TimeAmount: Comparable { |
812 | | @inlinable |
813 | 0 | public static func < (lhs: TimeAmount, rhs: TimeAmount) -> Bool { |
814 | 0 | lhs.nanoseconds < rhs.nanoseconds |
815 | 0 | } |
816 | | } |
817 | | |
818 | | extension TimeAmount: AdditiveArithmetic { |
819 | | /// The zero value for `TimeAmount`. |
820 | | @inlinable |
821 | 0 | public static var zero: TimeAmount { |
822 | 0 | TimeAmount.nanoseconds(0) |
823 | 0 | } |
824 | | |
825 | | @inlinable |
826 | 0 | public static func + (lhs: TimeAmount, rhs: TimeAmount) -> TimeAmount { |
827 | 0 | TimeAmount(lhs.nanoseconds + rhs.nanoseconds) |
828 | 0 | } |
829 | | |
830 | | @inlinable |
831 | 0 | public static func += (lhs: inout TimeAmount, rhs: TimeAmount) { |
832 | 0 | lhs = lhs + rhs |
833 | 0 | } |
834 | | |
835 | | @inlinable |
836 | 0 | public static func - (lhs: TimeAmount, rhs: TimeAmount) -> TimeAmount { |
837 | 0 | TimeAmount(lhs.nanoseconds - rhs.nanoseconds) |
838 | 0 | } |
839 | | |
840 | | @inlinable |
841 | 0 | public static func -= (lhs: inout TimeAmount, rhs: TimeAmount) { |
842 | 0 | lhs = lhs - rhs |
843 | 0 | } |
844 | | |
845 | | @inlinable |
846 | 0 | public static func * <T: BinaryInteger>(lhs: T, rhs: TimeAmount) -> TimeAmount { |
847 | 0 | TimeAmount(Int64(lhs) * rhs.nanoseconds) |
848 | 0 | } |
849 | | |
850 | | @inlinable |
851 | 0 | public static func * <T: BinaryInteger>(lhs: TimeAmount, rhs: T) -> TimeAmount { |
852 | 0 | TimeAmount(lhs.nanoseconds * Int64(rhs)) |
853 | 0 | } |
854 | | } |
855 | | |
856 | | /// Represents a point in time. |
857 | | /// |
858 | | /// Stores the time in nanoseconds as returned by `DispatchTime.now().uptimeNanoseconds` |
859 | | /// |
860 | | /// `NIODeadline` allow chaining multiple tasks with the same deadline without needing to |
861 | | /// compute new timeouts for each step |
862 | | /// |
863 | | /// ``` |
864 | | /// func doSomething(deadline: NIODeadline) -> EventLoopFuture<Void> { |
865 | | /// return step1(deadline: deadline).flatMap { |
866 | | /// step2(deadline: deadline) |
867 | | /// } |
868 | | /// } |
869 | | /// doSomething(deadline: .now() + .seconds(5)) |
870 | | /// ``` |
871 | | /// |
872 | | /// - Note: `NIODeadline` should not be used to represent a time interval |
873 | | public struct NIODeadline: Equatable, Hashable, Sendable { |
874 | | @available(*, deprecated, message: "This typealias doesn't serve any purpose, please use UInt64 directly.") |
875 | | public typealias Value = UInt64 |
876 | | |
877 | | // This really should be an UInt63 but we model it as Int64 with >=0 assert |
878 | | @usableFromInline var _uptimeNanoseconds: Int64 { |
879 | 0 | didSet { |
880 | 0 | assert(self._uptimeNanoseconds >= 0) |
881 | 0 | } |
882 | | } |
883 | | |
884 | | /// The nanoseconds since boot representation of the `NIODeadline`. |
885 | | @inlinable |
886 | 2.52M | public var uptimeNanoseconds: UInt64 { |
887 | 2.52M | .init(self._uptimeNanoseconds) |
888 | 2.52M | } |
889 | | |
890 | | @inlinable |
891 | 0 | public static var distantPast: NIODeadline { NIODeadline(0) } |
892 | | |
893 | | @inlinable |
894 | 0 | public static var distantFuture: NIODeadline { NIODeadline(.init(Int64.max)) } |
895 | | |
896 | 1.37M | @inlinable init(_ nanoseconds: Int64) { |
897 | 1.37M | precondition(nanoseconds >= 0) |
898 | 1.37M | self._uptimeNanoseconds = nanoseconds |
899 | 1.37M | } |
900 | | |
901 | | /// Getting the time is a very common operation so it warrants optimization. |
902 | | /// |
903 | | /// Prior to this function, NIO relied on `DispatchTime.now()`, on all platforms. In addition to |
904 | | /// the overhead of making a library call, the underlying implementation has a lot of branches |
905 | | /// because `libdispatch` supports many more usecases than we are making use of here. |
906 | | /// |
907 | | /// On Linux, `DispachTime.now()` _always_ results in a simple call to `clock_gettime(3)` and so |
908 | | /// we make that call here, directly from NIO. |
909 | | /// |
910 | | /// - TODO: Investigate optimizing the call to `DispatchTime.now()` away on other platforms too. |
911 | | @inlinable |
912 | 0 | static func timeNow() -> UInt64 { |
913 | | #if os(Linux) |
914 | 0 | var ts = timespec() |
915 | 0 | clock_gettime(CLOCK_MONOTONIC, &ts) |
916 | 0 | /// We use unsafe arithmetic here because `UInt64.max` nanoseconds is more than 580 years, |
917 | 0 | /// and the odds that this code will still be running 530 years from now is very, very low, |
918 | 0 | /// so as a practical matter this will never overflow. |
919 | 0 | return UInt64(ts.tv_sec) &* 1_000_000_000 &+ UInt64(ts.tv_nsec) |
920 | | #elseif os(WASI) |
921 | | var ts = timespec() |
922 | | CNIOWASI_gettime(&ts) |
923 | | /// We use unsafe arithmetic here because `UInt64.max` nanoseconds is more than 580 years, |
924 | | /// and the odds that this code will still be running 530 years from now is very, very low, |
925 | | /// so as a practical matter this will never overflow. |
926 | | return UInt64(ts.tv_sec) &* 1_000_000_000 &+ UInt64(ts.tv_nsec) |
927 | | #else |
928 | | return DispatchTime.now().uptimeNanoseconds |
929 | | #endif // os(Linux) |
930 | 0 | } |
931 | | |
932 | | @inlinable |
933 | 0 | public static func now() -> NIODeadline { |
934 | 0 | NIODeadline.uptimeNanoseconds(timeNow()) |
935 | 0 | } |
936 | | |
937 | | @inlinable |
938 | 354k | public static func uptimeNanoseconds(_ nanoseconds: UInt64) -> NIODeadline { |
939 | 354k | NIODeadline(Int64(min(UInt64(Int64.max), nanoseconds))) |
940 | 354k | } |
941 | | |
942 | | @inlinable |
943 | 177k | public static func == (lhs: NIODeadline, rhs: NIODeadline) -> Bool { |
944 | 177k | lhs.uptimeNanoseconds == rhs.uptimeNanoseconds |
945 | 177k | } |
946 | | |
947 | | @inlinable |
948 | 0 | public func hash(into hasher: inout Hasher) { |
949 | 0 | hasher.combine(self.uptimeNanoseconds) |
950 | 0 | } |
951 | | } |
952 | | |
953 | | extension NIODeadline: Comparable { |
954 | | @inlinable |
955 | 924k | public static func < (lhs: NIODeadline, rhs: NIODeadline) -> Bool { |
956 | 924k | lhs.uptimeNanoseconds < rhs.uptimeNanoseconds |
957 | 924k | } |
958 | | |
959 | | @inlinable |
960 | 0 | public static func > (lhs: NIODeadline, rhs: NIODeadline) -> Bool { |
961 | 0 | lhs.uptimeNanoseconds > rhs.uptimeNanoseconds |
962 | 0 | } |
963 | | } |
964 | | |
965 | | extension NIODeadline: CustomStringConvertible { |
966 | | @inlinable |
967 | 0 | public var description: String { |
968 | 0 | self.uptimeNanoseconds.description |
969 | 0 | } |
970 | | } |
971 | | |
972 | | extension NIODeadline { |
973 | | @inlinable |
974 | 0 | public static func - (lhs: NIODeadline, rhs: NIODeadline) -> TimeAmount { |
975 | 0 | // This won't ever crash, NIODeadlines are guaranteed to be within 0 ..< 2^63-1 nanoseconds so the result can |
976 | 0 | // definitely be stored in a TimeAmount (which is an Int64). |
977 | 0 | .nanoseconds(Int64(lhs.uptimeNanoseconds) - Int64(rhs.uptimeNanoseconds)) |
978 | 0 | } |
979 | | |
980 | | @inlinable |
981 | 0 | public static func + (lhs: NIODeadline, rhs: TimeAmount) -> NIODeadline { |
982 | 0 | let partial: Int64 |
983 | 0 | let overflow: Bool |
984 | 0 | (partial, overflow) = Int64(lhs.uptimeNanoseconds).addingReportingOverflow(rhs.nanoseconds) |
985 | 0 | if overflow { |
986 | 0 | assert(rhs.nanoseconds > 0) // this certainly must have overflowed towards +infinity |
987 | 0 | return NIODeadline.distantFuture |
988 | 0 | } |
989 | 0 | guard partial >= 0 else { |
990 | 0 | return NIODeadline.uptimeNanoseconds(0) |
991 | 0 | } |
992 | 0 | return NIODeadline(partial) |
993 | 0 | } |
994 | | |
995 | | @inlinable |
996 | 0 | public static func - (lhs: NIODeadline, rhs: TimeAmount) -> NIODeadline { |
997 | 0 | if rhs.nanoseconds < 0 { |
998 | 0 | // The addition won't crash because the worst that could happen is `UInt64(Int64.max) + UInt64(Int64.max)` |
999 | 0 | // which fits into an UInt64 (and will then be capped to Int64.max == distantFuture by `uptimeNanoseconds`). |
1000 | 0 | return NIODeadline.uptimeNanoseconds(lhs.uptimeNanoseconds + rhs.nanoseconds.magnitude) |
1001 | 0 | } else if rhs.nanoseconds > lhs.uptimeNanoseconds { |
1002 | 0 | // Cap it at `0` because otherwise this would be negative. |
1003 | 0 | return NIODeadline.init(0) |
1004 | 0 | } else { |
1005 | 0 | // This will be positive but still fix in an Int64. |
1006 | 0 | let result = Int64(lhs.uptimeNanoseconds) - rhs.nanoseconds |
1007 | 0 | assert(result >= 0) |
1008 | 0 | return NIODeadline(result) |
1009 | 0 | } |
1010 | 0 | } |
1011 | | } |
1012 | | |
1013 | | extension EventLoop { |
1014 | | /// Submit `task` to be run on this `EventLoop`. |
1015 | | /// |
1016 | | /// The returned `EventLoopFuture` will be completed when `task` has finished running. It will be succeeded with |
1017 | | /// `task`'s return value or failed if the execution of `task` threw an error. |
1018 | | /// |
1019 | | /// - Parameters: |
1020 | | /// - task: The synchronous task to run. As everything that runs on the `EventLoop`, it must not block. |
1021 | | /// - Returns: An `EventLoopFuture` containing the result of `task`'s execution. |
1022 | | @inlinable |
1023 | | @preconcurrency |
1024 | 0 | public func submit<T>(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture<T> { |
1025 | 0 | let promise: EventLoopPromise<T> = makePromise(file: #fileID, line: #line) |
1026 | 0 |
|
1027 | 0 | self.execute { |
1028 | 0 | do { |
1029 | 0 | // UnsafeUnchecked is allowed here because we know we are on the EL. |
1030 | 0 | promise.assumeIsolatedUnsafeUnchecked().succeed(try task()) |
1031 | 0 | } catch let err { |
1032 | 0 | promise.fail(err) |
1033 | 0 | } |
1034 | 0 | } |
1035 | 0 |
|
1036 | 0 | return promise.futureResult |
1037 | 0 | } |
1038 | | |
1039 | | /// Submit `task` to be run on this `EventLoop`. |
1040 | | /// |
1041 | | /// The returned `EventLoopFuture` will be completed when `task` has finished running. It will be identical to |
1042 | | /// the `EventLoopFuture` returned by `task`. |
1043 | | /// |
1044 | | /// - Parameters: |
1045 | | /// - task: The asynchronous task to run. As with everything that runs on the `EventLoop`, it must not block. |
1046 | | /// - Returns: An `EventLoopFuture` identical to the `EventLoopFuture` returned from `task`. |
1047 | | @inlinable |
1048 | | @preconcurrency |
1049 | 0 | public func flatSubmit<T: Sendable>(_ task: @escaping @Sendable () -> EventLoopFuture<T>) -> EventLoopFuture<T> { |
1050 | 0 | self.submit(task).flatMap { $0 } |
1051 | 0 | } |
1052 | | |
1053 | | /// Schedule a `task` that is executed by this `EventLoop` at the given time. |
1054 | | /// |
1055 | | /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and |
1056 | | /// this event loop might differ. |
1057 | | /// |
1058 | | /// - Parameters: |
1059 | | /// - deadline: The instant in time before which the task will not execute. |
1060 | | /// - file: The file this function was called in, for debugging purposes. |
1061 | | /// - line: The line this function was called on, for debugging purposes. |
1062 | | /// - task: The asynchronous task to run. As with everything that runs on the `EventLoop`, it must not block. |
1063 | | /// - Returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait |
1064 | | /// on the full execution of the task, including its returned `EventLoopFuture`. |
1065 | | /// |
1066 | | /// - Note: You can only cancel a task before it has started executing. |
1067 | | @discardableResult |
1068 | | @inlinable |
1069 | | @preconcurrency |
1070 | | public func flatScheduleTask<T: Sendable>( |
1071 | | deadline: NIODeadline, |
1072 | | file: StaticString = #fileID, |
1073 | | line: UInt = #line, |
1074 | | _ task: @escaping @Sendable () throws -> EventLoopFuture<T> |
1075 | 0 | ) -> Scheduled<T> { |
1076 | 0 | let promise: EventLoopPromise<T> = self.makePromise(file: file, line: line) |
1077 | 0 | let scheduled = self.scheduleTask(deadline: deadline, task) |
1078 | 0 |
|
1079 | 0 | scheduled.futureResult.whenComplete { result in |
1080 | 0 | switch result { |
1081 | 0 | case .success(let futureResult): |
1082 | 0 | promise.completeWith(futureResult) |
1083 | 0 | case .failure(let error): |
1084 | 0 | promise.fail(error) |
1085 | 0 | } |
1086 | 0 | } |
1087 | 0 |
|
1088 | 0 | return .init(promise: promise, cancellationTask: { scheduled.cancel() }) |
1089 | 0 | } |
1090 | | |
1091 | | /// Schedule a `task` that is executed by this `EventLoop` after the given amount of time. |
1092 | | /// |
1093 | | /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and |
1094 | | /// this event loop might differ. |
1095 | | /// |
1096 | | /// - Parameters: |
1097 | | /// - delay: The amount of time before which the task will not execute. |
1098 | | /// - file: The file this function was called in, for debugging purposes. |
1099 | | /// - line: The line this function was called on, for debugging purposes. |
1100 | | /// - task: The asynchronous task to run. As everything that runs on the `EventLoop`, it must not block. |
1101 | | /// - Returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait |
1102 | | /// on the full execution of the task, including its returned `EventLoopFuture`. |
1103 | | /// |
1104 | | /// - Note: You can only cancel a task before it has started executing. |
1105 | | @discardableResult |
1106 | | @inlinable |
1107 | | @preconcurrency |
1108 | | public func flatScheduleTask<T: Sendable>( |
1109 | | in delay: TimeAmount, |
1110 | | file: StaticString = #fileID, |
1111 | | line: UInt = #line, |
1112 | | _ task: @escaping @Sendable () throws -> EventLoopFuture<T> |
1113 | 0 | ) -> Scheduled<T> { |
1114 | 0 | self._flatScheduleTask(in: delay, file: file, line: line, task) |
1115 | 0 | } |
1116 | | |
1117 | | @usableFromInline typealias FlatScheduleTaskDelayCallback<T> = @Sendable () throws -> EventLoopFuture<T> |
1118 | | |
1119 | | @inlinable |
1120 | | func _flatScheduleTask<T: Sendable>( |
1121 | | in delay: TimeAmount, |
1122 | | file: StaticString, |
1123 | | line: UInt, |
1124 | | _ task: @escaping FlatScheduleTaskDelayCallback<T> |
1125 | 0 | ) -> Scheduled<T> { |
1126 | 0 | let promise: EventLoopPromise<T> = self.makePromise(file: file, line: line) |
1127 | 0 | let scheduled = self.scheduleTask(in: delay, task) |
1128 | 0 |
|
1129 | 0 | scheduled.futureResult.whenComplete { result in |
1130 | 0 | switch result { |
1131 | 0 | case .success(let futureResult): |
1132 | 0 | promise.completeWith(futureResult) |
1133 | 0 | case .failure(let error): |
1134 | 0 | promise.fail(error) |
1135 | 0 | } |
1136 | 0 | } |
1137 | 0 |
|
1138 | 0 | return .init(promise: promise, cancellationTask: { scheduled.cancel() }) |
1139 | 0 | } |
1140 | | |
1141 | | /// Creates and returns a new `EventLoopPromise` that will be notified using this `EventLoop` as execution `NIOThread`. |
1142 | | @inlinable |
1143 | | public func makePromise<T>( |
1144 | | of type: T.Type = T.self, |
1145 | | file: StaticString = #fileID, |
1146 | | line: UInt = #line |
1147 | 1.77M | ) -> EventLoopPromise<T> { |
1148 | 1.77M | EventLoopPromise<T>(eventLoop: self, file: file, line: line) |
1149 | 1.77M | } |
1150 | | |
1151 | | /// Creates and returns a new `EventLoopFuture` that is already marked as failed. Notifications will be done using this `EventLoop` as execution `NIOThread`. |
1152 | | /// |
1153 | | /// - Parameters: |
1154 | | /// - error: the `Error` that is used by the `EventLoopFuture`. |
1155 | | /// - Returns: a failed `EventLoopFuture`. |
1156 | | @inlinable |
1157 | 0 | public func makeFailedFuture<T>(_ error: Error) -> EventLoopFuture<T> { |
1158 | 0 | EventLoopFuture<T>(eventLoop: self, error: error) |
1159 | 0 | } |
1160 | | |
1161 | | /// Creates and returns a new `EventLoopFuture` that is already marked as success. Notifications will be done using this `EventLoop` as execution `NIOThread`. |
1162 | | /// |
1163 | | /// - Parameters: |
1164 | | /// - value: the value that is used by the `EventLoopFuture`. |
1165 | | /// - Returns: a succeeded `EventLoopFuture`. |
1166 | | @preconcurrency |
1167 | | @inlinable |
1168 | 115k | public func makeSucceededFuture<Success: Sendable>(_ value: Success) -> EventLoopFuture<Success> { |
1169 | 115k | if Success.self == Void.self { |
1170 | 115k | // The as! will always succeed because we previously checked that Success.self == Void.self. |
1171 | 115k | return self.makeSucceededVoidFuture() as! EventLoopFuture<Success> |
1172 | 115k | } else { |
1173 | 0 | return EventLoopFuture<Success>(eventLoop: self, value: value) |
1174 | 0 | } |
1175 | 115k | } |
1176 | | |
1177 | | /// Creates and returns a new isolated `EventLoopFuture` that is already marked as success. Notifications will be done using this `EventLoop. |
1178 | | /// |
1179 | | /// - Parameters: |
1180 | | /// - value: the value that is used by the `EventLoopFuture.Isolated`. |
1181 | | /// - Returns: a succeeded `EventLoopFuture.Isolated`. |
1182 | | @inlinable |
1183 | | @available(*, noasync) |
1184 | 0 | public func makeSucceededIsolatedFuture<Success>(_ value: Success) -> EventLoopFuture<Success>.Isolated { |
1185 | 0 | if Success.self == Void.self { |
1186 | 0 | // The as! will always succeed because we previously checked that Success.self == Void.self. |
1187 | 0 | return self.makeSucceededVoidFuture().assumeIsolated() as! EventLoopFuture<Success>.Isolated |
1188 | 0 | } else { |
1189 | 0 | return EventLoopFuture.Isolated(_wrapped: EventLoopFuture(eventLoop: self, isolatedValue: value)) |
1190 | 0 | } |
1191 | 0 | } |
1192 | | |
1193 | | /// Creates and returns a new `EventLoopFuture` that is marked as succeeded or failed with the value held by `result`. |
1194 | | /// |
1195 | | /// - Parameters: |
1196 | | /// - result: The value that is used by the `EventLoopFuture` |
1197 | | /// - Returns: A completed `EventLoopFuture`. |
1198 | | @preconcurrency |
1199 | | @inlinable |
1200 | 33.6k | public func makeCompletedFuture<Success: Sendable>(_ result: Result<Success, Error>) -> EventLoopFuture<Success> { |
1201 | 33.6k | switch result { |
1202 | 33.6k | case .success(let value): |
1203 | 33.6k | return self.makeSucceededFuture(value) |
1204 | 33.6k | case .failure(let error): |
1205 | 0 | return self.makeFailedFuture(error) |
1206 | 33.6k | } |
1207 | 33.6k | } |
1208 | | |
1209 | | /// Creates and returns a new `EventLoopFuture` that is marked as succeeded or failed with the value returned by `body`. |
1210 | | /// |
1211 | | /// - Parameters: |
1212 | | /// - body: The function that is used to complete the `EventLoopFuture` |
1213 | | /// - Returns: A completed `EventLoopFuture`. |
1214 | | @preconcurrency |
1215 | | @inlinable |
1216 | | public func makeCompletedFuture<Success: Sendable>( |
1217 | | withResultOf body: () throws -> Success |
1218 | 0 | ) -> EventLoopFuture<Success> { |
1219 | 0 | let trans = Result(catching: body) |
1220 | 0 | return self.makeCompletedFuture(trans) |
1221 | 0 | } |
1222 | | |
1223 | | /// An `EventLoop` forms a singular `EventLoopGroup`, returning itself as the 'next' `EventLoop`. |
1224 | | /// |
1225 | | /// - Returns: Itself, because an `EventLoop` forms a singular `EventLoopGroup`. |
1226 | 0 | public func next() -> EventLoop { |
1227 | 0 | self |
1228 | 0 | } |
1229 | | |
1230 | | /// An `EventLoop` forms a singular `EventLoopGroup`, returning itself as 'any' `EventLoop`. |
1231 | | /// |
1232 | | /// - Returns: Itself, because an `EventLoop` forms a singular `EventLoopGroup`. |
1233 | 0 | public func any() -> EventLoop { |
1234 | 0 | self |
1235 | 0 | } |
1236 | | |
1237 | | /// Close this `EventLoop`. |
1238 | 0 | public func close() throws { |
1239 | 0 | // Do nothing |
1240 | 0 | } |
1241 | | |
1242 | | /// Schedule a repeated task to be executed by the `EventLoop` with a fixed delay between the end and start of each |
1243 | | /// task. |
1244 | | /// |
1245 | | /// - Parameters: |
1246 | | /// - initialDelay: The delay after which the first task is executed. |
1247 | | /// - delay: The delay between the end of one task and the start of the next. |
1248 | | /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. |
1249 | | /// - task: The closure that will be executed. |
1250 | | /// - return: `RepeatedTask` |
1251 | | @discardableResult |
1252 | | @preconcurrency |
1253 | | public func scheduleRepeatedTask( |
1254 | | initialDelay: TimeAmount, |
1255 | | delay: TimeAmount, |
1256 | | notifying promise: EventLoopPromise<Void>? = nil, |
1257 | | _ task: @escaping @Sendable (RepeatedTask) throws -> Void |
1258 | 0 | ) -> RepeatedTask { |
1259 | 0 | self._scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, notifying: promise, task) |
1260 | 0 | } |
1261 | | |
1262 | | /// Schedule a repeated task to be executed by the `EventLoop` with a fixed delay between the end and start of each |
1263 | | /// task. |
1264 | | /// |
1265 | | /// - Parameters: |
1266 | | /// - initialDelay: The delay after which the first task is executed. |
1267 | | /// - delay: The delay between the end of one task and the start of the next. |
1268 | | /// - maximumAllowableJitter: Exclusive upper bound of jitter range added to the `delay` parameter. |
1269 | | /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. |
1270 | | /// - task: The closure that will be executed. |
1271 | | /// - return: `RepeatedTask` |
1272 | | @discardableResult |
1273 | | public func scheduleRepeatedTask( |
1274 | | initialDelay: TimeAmount, |
1275 | | delay: TimeAmount, |
1276 | | maximumAllowableJitter: TimeAmount, |
1277 | | notifying promise: EventLoopPromise<Void>? = nil, |
1278 | | _ task: @escaping @Sendable (RepeatedTask) throws -> Void |
1279 | 0 | ) -> RepeatedTask { |
1280 | 0 | let jitteredInitialDelay = Self._getJitteredDelay( |
1281 | 0 | delay: initialDelay, |
1282 | 0 | maximumAllowableJitter: maximumAllowableJitter |
1283 | 0 | ) |
1284 | 0 | let jitteredDelay = Self._getJitteredDelay(delay: delay, maximumAllowableJitter: maximumAllowableJitter) |
1285 | 0 | return self.scheduleRepeatedTask( |
1286 | 0 | initialDelay: jitteredInitialDelay, |
1287 | 0 | delay: jitteredDelay, |
1288 | 0 | notifying: promise, |
1289 | 0 | task |
1290 | 0 | ) |
1291 | 0 | } |
1292 | | typealias ScheduleRepeatedTaskCallback = @Sendable (RepeatedTask) throws -> Void |
1293 | | |
1294 | | func _scheduleRepeatedTask( |
1295 | | initialDelay: TimeAmount, |
1296 | | delay: TimeAmount, |
1297 | | notifying promise: EventLoopPromise<Void>?, |
1298 | | _ task: @escaping ScheduleRepeatedTaskCallback |
1299 | 0 | ) -> RepeatedTask { |
1300 | 0 | let futureTask: @Sendable (RepeatedTask) -> EventLoopFuture<Void> = { repeatedTask in |
1301 | 0 | do { |
1302 | 0 | try task(repeatedTask) |
1303 | 0 | return self.makeSucceededFuture(()) |
1304 | 0 | } catch { |
1305 | 0 | return self.makeFailedFuture(error) |
1306 | 0 | } |
1307 | 0 | } |
1308 | 0 | return self.scheduleRepeatedAsyncTask(initialDelay: initialDelay, delay: delay, notifying: promise, futureTask) |
1309 | 0 | } |
1310 | | |
1311 | | /// Schedule a repeated asynchronous task to be executed by the `EventLoop` with a fixed delay between the end and |
1312 | | /// start of each task. |
1313 | | /// |
1314 | | /// - Note: The delay is measured from the completion of one run's returned future to the start of the execution of |
1315 | | /// the next run. For example: If you schedule a task once per second but your task takes two seconds to |
1316 | | /// complete, the time interval between two subsequent runs will actually be three seconds (2s run time plus |
1317 | | /// the 1s delay.) |
1318 | | /// |
1319 | | /// - Parameters: |
1320 | | /// - initialDelay: The delay after which the first task is executed. |
1321 | | /// - delay: The delay between the end of one task and the start of the next. |
1322 | | /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. |
1323 | | /// - task: The closure that will be executed. Task will keep repeating regardless of whether the future |
1324 | | /// gets fulfilled with success or error. |
1325 | | /// |
1326 | | /// - return: `RepeatedTask` |
1327 | | @discardableResult |
1328 | | @preconcurrency |
1329 | | public func scheduleRepeatedAsyncTask( |
1330 | | initialDelay: TimeAmount, |
1331 | | delay: TimeAmount, |
1332 | | notifying promise: EventLoopPromise<Void>? = nil, |
1333 | | _ task: @escaping @Sendable (RepeatedTask) -> EventLoopFuture<Void> |
1334 | 0 | ) -> RepeatedTask { |
1335 | 0 | self._scheduleRepeatedAsyncTask(initialDelay: initialDelay, delay: delay, notifying: promise, task) |
1336 | 0 | } |
1337 | | |
1338 | | /// Schedule a repeated asynchronous task to be executed by the `EventLoop` with a fixed delay between the end and |
1339 | | /// start of each task. |
1340 | | /// |
1341 | | /// - Note: The delay is measured from the completion of one run's returned future to the start of the execution of |
1342 | | /// the next run. For example: If you schedule a task once per second but your task takes two seconds to |
1343 | | /// complete, the time interval between two subsequent runs will actually be three seconds (2s run time plus |
1344 | | /// the 1s delay.) |
1345 | | /// |
1346 | | /// - Parameters: |
1347 | | /// - initialDelay: The delay after which the first task is executed. |
1348 | | /// - delay: The delay between the end of one task and the start of the next. |
1349 | | /// - maximumAllowableJitter: Exclusive upper bound of jitter range added to the `delay` parameter. |
1350 | | /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. |
1351 | | /// - task: The closure that will be executed. Task will keep repeating regardless of whether the future |
1352 | | /// gets fulfilled with success or error. |
1353 | | /// |
1354 | | /// - return: `RepeatedTask` |
1355 | | @discardableResult |
1356 | | public func scheduleRepeatedAsyncTask( |
1357 | | initialDelay: TimeAmount, |
1358 | | delay: TimeAmount, |
1359 | | maximumAllowableJitter: TimeAmount, |
1360 | | notifying promise: EventLoopPromise<Void>? = nil, |
1361 | | _ task: @escaping @Sendable (RepeatedTask) -> EventLoopFuture<Void> |
1362 | 0 | ) -> RepeatedTask { |
1363 | 0 | let jitteredInitialDelay = Self._getJitteredDelay( |
1364 | 0 | delay: initialDelay, |
1365 | 0 | maximumAllowableJitter: maximumAllowableJitter |
1366 | 0 | ) |
1367 | 0 | let jitteredDelay = Self._getJitteredDelay(delay: delay, maximumAllowableJitter: maximumAllowableJitter) |
1368 | 0 | return self._scheduleRepeatedAsyncTask( |
1369 | 0 | initialDelay: jitteredInitialDelay, |
1370 | 0 | delay: jitteredDelay, |
1371 | 0 | notifying: promise, |
1372 | 0 | task |
1373 | 0 | ) |
1374 | 0 | } |
1375 | | typealias ScheduleRepeatedAsyncTaskCallback = @Sendable (RepeatedTask) -> EventLoopFuture<Void> |
1376 | | |
1377 | | func _scheduleRepeatedAsyncTask( |
1378 | | initialDelay: TimeAmount, |
1379 | | delay: TimeAmount, |
1380 | | notifying promise: EventLoopPromise<Void>?, |
1381 | | _ task: @escaping ScheduleRepeatedAsyncTaskCallback |
1382 | 0 | ) -> RepeatedTask { |
1383 | 0 | let repeated = RepeatedTask(interval: delay, eventLoop: self, cancellationPromise: promise, task: task) |
1384 | 0 | repeated.begin(in: initialDelay) |
1385 | 0 | return repeated |
1386 | 0 | } |
1387 | | |
1388 | | /// Adds a random amount of `.nanoseconds` (within `.zero..<maximumAllowableJitter`) to the delay. |
1389 | | /// |
1390 | | /// - Parameters: |
1391 | | /// - delay: the `TimeAmount` delay to jitter. |
1392 | | /// - maximumAllowableJitter: Exclusive upper bound of jitter range added to the `delay` parameter. |
1393 | | /// - Returns: The jittered delay. |
1394 | | @inlinable |
1395 | | static func _getJitteredDelay( |
1396 | | delay: TimeAmount, |
1397 | | maximumAllowableJitter: TimeAmount |
1398 | 0 | ) -> TimeAmount { |
1399 | 0 | let jitter = TimeAmount.nanoseconds(Int64.random(in: .zero..<maximumAllowableJitter.nanoseconds)) |
1400 | 0 | return delay + jitter |
1401 | 0 | } |
1402 | | |
1403 | | /// Returns an `EventLoopIterator` over this `EventLoop`. |
1404 | | /// |
1405 | | /// - Returns: `EventLoopIterator` |
1406 | 0 | public func makeIterator() -> EventLoopIterator { |
1407 | 0 | EventLoopIterator([self]) |
1408 | 0 | } |
1409 | | |
1410 | | /// Asserts that the current thread is the one tied to this `EventLoop`. |
1411 | | /// Otherwise, if running in debug mode, the process will be abnormally terminated as per the semantics of |
1412 | | /// `preconditionFailure(_:file:line:)`. Never has any effect in release mode. |
1413 | | /// |
1414 | | /// - Note: This is not a customization point so calls to this function can be fully optimized out in release mode. |
1415 | | @inlinable |
1416 | 5.65G | public func assertInEventLoop(file: StaticString = #fileID, line: UInt = #line) { |
1417 | 5.65G | debugOnly { |
1418 | 2.23M | self.preconditionInEventLoop(file: file, line: line) |
1419 | 2.23M | } |
1420 | 5.65G | } |
1421 | | |
1422 | | /// Asserts that the current thread is _not_ the one tied to this `EventLoop`. |
1423 | | /// Otherwise, if running in debug mode, the process will be abnormally terminated as per the semantics of |
1424 | | /// `preconditionFailure(_:file:line:)`. Never has any effect in release mode. |
1425 | | /// |
1426 | | /// - Note: This is not a customization point so calls to this function can be fully optimized out in release mode. |
1427 | | @inlinable |
1428 | 0 | public func assertNotInEventLoop(file: StaticString = #fileID, line: UInt = #line) { |
1429 | 0 | debugOnly { |
1430 | 0 | self.preconditionNotInEventLoop(file: file, line: line) |
1431 | 0 | } |
1432 | 0 | } |
1433 | | |
1434 | | /// Checks the necessary condition of currently running on the called `EventLoop` for making forward progress. |
1435 | | @inlinable |
1436 | 297M | public func preconditionInEventLoop(file: StaticString = #fileID, line: UInt = #line) { |
1437 | 297M | precondition(self.inEventLoop, file: file, line: line) |
1438 | 297M | } |
1439 | | |
1440 | | /// Checks the necessary condition of currently _not_ running on the called `EventLoop` for making forward progress. |
1441 | | @inlinable |
1442 | 0 | public func preconditionNotInEventLoop(file: StaticString = #fileID, line: UInt = #line) { |
1443 | 0 | precondition(!self.inEventLoop, file: file, line: line) |
1444 | 0 | } |
1445 | | } |
1446 | | |
1447 | | /// Provides an endless stream of `EventLoop`s to use. |
1448 | | public protocol EventLoopGroup: AnyObject, _NIOPreconcurrencySendable { |
1449 | | /// Returns the next `EventLoop` to use, this is useful for load balancing. |
1450 | | /// |
1451 | | /// The algorithm that is used to select the next `EventLoop` is specific to each `EventLoopGroup`. A common choice |
1452 | | /// is _round robin_. |
1453 | | /// |
1454 | | /// Please note that you should only be using `next()` if you want to load balance over all `EventLoop`s of the |
1455 | | /// `EventLoopGroup`. If the actual `EventLoop` does not matter much, `any()` should be preferred because it can |
1456 | | /// try to return you the _current_ `EventLoop` which usually is faster because the number of thread switches can |
1457 | | /// be reduced. |
1458 | | /// |
1459 | | /// The rule of thumb is: If you are trying to do _load balancing_, use `next()`. If you just want to create a new |
1460 | | /// future or kick off some operation, use `any()`. |
1461 | | |
1462 | | func next() -> EventLoop |
1463 | | |
1464 | | /// Returns any `EventLoop` from the `EventLoopGroup`, a common choice is the current `EventLoop`. |
1465 | | /// |
1466 | | /// - warning: You cannot rely on the returned `EventLoop` being the current one, not all `EventLoopGroup`s support |
1467 | | /// choosing the current one. Use this method only if you are truly happy with _any_ `EventLoop` of this |
1468 | | /// `EventLoopGroup` instance. |
1469 | | /// |
1470 | | /// - Note: You will only receive the current `EventLoop` here iff the current `EventLoop` belongs to the |
1471 | | /// `EventLoopGroup` you call `any()` on. |
1472 | | /// |
1473 | | /// This method is useful having access to an `EventLoopGroup` without the knowledge of which `EventLoop` would be |
1474 | | /// the best one to select to create a new `EventLoopFuture`. This commonly happens in libraries where the user |
1475 | | /// cannot indicate what `EventLoop` they would like their futures on. |
1476 | | /// |
1477 | | /// Typically, it is faster to kick off a new operation on the _current_ `EventLoop` because that minimised thread |
1478 | | /// switches. Hence, if situations where you don't need precise knowledge of what `EventLoop` some code is running |
1479 | | /// on, use `any()` to indicate this. |
1480 | | /// |
1481 | | /// The rule of thumb is: If you are trying to do _load balancing_, use `next()`. If you just want to create a new |
1482 | | /// future or kick off some operation, use `any()`. |
1483 | | func any() -> EventLoop |
1484 | | |
1485 | | #if canImport(Dispatch) |
1486 | | /// Shuts down the eventloop gracefully. This function is clearly an outlier in that it uses a completion |
1487 | | /// callback instead of an EventLoopFuture. The reason for that is that NIO's EventLoopFutures will call back on an event loop. |
1488 | | /// The virtue of this function is to shut the event loop down. To work around that we call back on a DispatchQueue |
1489 | | /// instead. |
1490 | | @preconcurrency func shutdownGracefully(queue: DispatchQueue, _ callback: @Sendable @escaping (Error?) -> Void) |
1491 | | #endif |
1492 | | |
1493 | | /// Returns an `EventLoopIterator` over the `EventLoop`s in this `EventLoopGroup`. |
1494 | | /// |
1495 | | /// - Returns: `EventLoopIterator` |
1496 | | func makeIterator() -> EventLoopIterator |
1497 | | |
1498 | | /// Must crash if it's not safe to call `syncShutdownGracefully` in the current context. |
1499 | | /// |
1500 | | /// This method is a debug hook that can be used to override the behaviour of `syncShutdownGracefully` |
1501 | | /// when called. By default it does nothing. |
1502 | | func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) |
1503 | | } |
1504 | | |
1505 | | extension EventLoopGroup { |
1506 | | /// The default implementation of `any()` just returns the `next()` EventLoop but it's highly recommended to |
1507 | | /// override this and return the current `EventLoop` if possible. |
1508 | 0 | public func any() -> EventLoop { |
1509 | 0 | self.next() |
1510 | 0 | } |
1511 | | } |
1512 | | |
1513 | | #if canImport(Dispatch) |
1514 | | extension EventLoopGroup { |
1515 | 0 | @preconcurrency public func shutdownGracefully(_ callback: @escaping @Sendable (Error?) -> Void) { |
1516 | 0 | self.shutdownGracefully(queue: .global(), callback) |
1517 | 0 | } |
1518 | | |
1519 | | @available(*, noasync, message: "this can end up blocking the calling thread", renamed: "shutdownGracefully()") |
1520 | 0 | public func syncShutdownGracefully() throws { |
1521 | 0 | try self._syncShutdownGracefully() |
1522 | 0 | } |
1523 | | |
1524 | 0 | private func _syncShutdownGracefully() throws { |
1525 | 0 | self._preconditionSafeToSyncShutdown(file: #fileID, line: #line) |
1526 | 0 | let error = NIOLockedValueBox<Error?>(nil) |
1527 | 0 | let semaphore = DispatchSemaphore(value: 0) |
1528 | 0 | self.shutdownGracefully { shutdownError in |
1529 | 0 | if let shutdownError = shutdownError { |
1530 | 0 | error.withLockedValue { |
1531 | 0 | $0 = shutdownError |
1532 | 0 | } |
1533 | 0 | } |
1534 | 0 | semaphore.signal() |
1535 | 0 | } |
1536 | 0 | semaphore.wait() |
1537 | 0 | try error.withLockedValue { error in |
1538 | 0 | if let error = error { |
1539 | 0 | throw error |
1540 | 0 | } |
1541 | 0 | } |
1542 | 0 | } |
1543 | | |
1544 | 0 | public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) { |
1545 | 0 | return |
1546 | 0 | } |
1547 | | } |
1548 | | #endif |
1549 | | |
1550 | | /// Deprecated. |
1551 | | /// |
1552 | | /// This type was intended to be used by libraries which use NIO, and offer their users either the option |
1553 | | /// to `.share` an existing event loop group or create (and manage) a new one (`.createNew`) and let it be |
1554 | | /// managed by given library and its lifecycle. |
1555 | | /// |
1556 | | /// Please use a `group: any EventLoopGroup` parameter instead. If you want to default to a global |
1557 | | /// singleton group instead, consider group: any EventLoopGroup = MultiThreadedEventLoopGroup.singleton` or |
1558 | | /// similar. |
1559 | | /// |
1560 | | /// - See also: https://github.com/apple/swift-nio/issues/2142 |
1561 | | public enum NIOEventLoopGroupProvider { |
1562 | | /// Use an `EventLoopGroup` provided by the user. |
1563 | | /// The owner of this group is responsible for its lifecycle. |
1564 | | case shared(EventLoopGroup) |
1565 | | |
1566 | | /// Deprecated. Create a new `EventLoopGroup` when necessary. |
1567 | | /// The library which accepts this provider takes ownership of the created event loop group, |
1568 | | /// and must ensure its proper shutdown when the library is being shut down. |
1569 | | @available( |
1570 | | *, |
1571 | | deprecated, |
1572 | | message: """ |
1573 | | Please use `.shared(existingGroup)` or use the singleton via \ |
1574 | | `.shared(MultiThreadedEventLoopGroup.singleton)` or similar |
1575 | | """ |
1576 | | ) |
1577 | | case createNew |
1578 | | } |
1579 | | |
1580 | | extension NIOEventLoopGroupProvider: Sendable {} |
1581 | | |
1582 | | /// Different `Error`s that are specific to `EventLoop` operations / implementations. |
1583 | | public enum EventLoopError: Error { |
1584 | | /// An operation was executed that is not supported by the `EventLoop` |
1585 | | case unsupportedOperation |
1586 | | |
1587 | | /// An scheduled task was cancelled. |
1588 | | case cancelled |
1589 | | |
1590 | | /// The `EventLoop` was shutdown already. |
1591 | | case shutdown |
1592 | | |
1593 | | /// Shutting down the `EventLoop` failed. |
1594 | | case shutdownFailed |
1595 | | } |
1596 | | |
1597 | | extension EventLoopError { |
1598 | | @usableFromInline |
1599 | | static let _cancelled: any Error = EventLoopError.cancelled |
1600 | | } |
1601 | | |
1602 | | extension EventLoopError: CustomStringConvertible { |
1603 | 0 | public var description: String { |
1604 | 0 | switch self { |
1605 | 0 | case .unsupportedOperation: |
1606 | 0 | return "EventLoopError: the executed operation is not supported by the event loop" |
1607 | 0 | case .cancelled: |
1608 | 0 | return "EventLoopError: the scheduled task was cancelled" |
1609 | 0 | case .shutdown: |
1610 | 0 | return "EventLoopError: the event loop is shutdown" |
1611 | 0 | case .shutdownFailed: |
1612 | 0 | return "EventLoopError: failed to shutdown the event loop" |
1613 | 0 | } |
1614 | 0 | } |
1615 | | } |