/src/swift-nio/Sources/NIOEmbedded/Embedded.swift
Line | Count | Source (jump to first uncovered line) |
1 | | //===----------------------------------------------------------------------===// |
2 | | // |
3 | | // This source file is part of the SwiftNIO open source project |
4 | | // |
5 | | // Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors |
6 | | // Licensed under Apache License v2.0 |
7 | | // |
8 | | // See LICENSE.txt for license information |
9 | | // See CONTRIBUTORS.txt for the list of SwiftNIO project authors |
10 | | // |
11 | | // SPDX-License-Identifier: Apache-2.0 |
12 | | // |
13 | | //===----------------------------------------------------------------------===// |
14 | | |
15 | | import Atomics |
16 | | import DequeModule |
17 | | import NIOConcurrencyHelpers |
18 | | import NIOCore |
19 | | import _NIODataStructures |
20 | | |
21 | | #if canImport(Dispatch) |
22 | | import Dispatch |
23 | | #endif |
24 | | |
25 | | #if canImport(Darwin) |
26 | | import Darwin |
27 | | #elseif canImport(Glibc) |
28 | | @preconcurrency import Glibc |
29 | | #elseif canImport(Musl) |
30 | | @preconcurrency import Musl |
31 | | #elseif canImport(Android) |
32 | | @preconcurrency import Android |
33 | | #elseif canImport(WASILibc) |
34 | | @preconcurrency import WASILibc |
35 | | #elseif canImport(WinSDK) |
36 | | @preconcurrency import WinSDK |
37 | | #else |
38 | | #error("Unknown C library.") |
39 | | #endif |
40 | | |
41 | 0 | private func printError(_ string: StaticString) { |
42 | 0 | string.withUTF8Buffer { buf in |
43 | 0 | var buf = buf |
44 | 0 | while buf.count > 0 { |
45 | 0 | // 2 is stderr |
46 | 0 | #if canImport(WinSDK) |
47 | 0 | let rc = _write(2, buf.baseAddress, UInt32(truncatingIfNeeded: buf.count)) |
48 | 0 | let errno = GetLastError() |
49 | 0 | #else |
50 | 0 | let rc = write(2, buf.baseAddress, buf.count) |
51 | 0 | #endif |
52 | 0 | if rc < 0 { |
53 | 0 | let err = errno |
54 | 0 | if err == EINTR { continue } |
55 | 0 | fatalError("Unexpected error writing: \(err)") |
56 | 0 | } |
57 | 0 | buf = .init(rebasing: buf.dropFirst(Int(rc))) |
58 | 0 | } |
59 | 0 | } |
60 | 0 | } |
61 | | |
62 | | internal struct EmbeddedScheduledTask { |
63 | | let id: UInt64 |
64 | | let task: () -> Void |
65 | | let failFn: (Error) -> Void |
66 | | let readyTime: NIODeadline |
67 | | let insertOrder: UInt64 |
68 | | |
69 | | init( |
70 | | id: UInt64, |
71 | | readyTime: NIODeadline, |
72 | | insertOrder: UInt64, |
73 | | task: @escaping () -> Void, |
74 | | _ failFn: @escaping (Error) -> Void |
75 | 6.92k | ) { |
76 | 6.92k | self.id = id |
77 | 6.92k | self.readyTime = readyTime |
78 | 6.92k | self.insertOrder = insertOrder |
79 | 6.92k | self.task = task |
80 | 6.92k | self.failFn = failFn |
81 | 6.92k | } |
82 | | |
83 | 0 | func fail(_ error: Error) { |
84 | 0 | self.failFn(error) |
85 | 0 | } |
86 | | } |
87 | | |
88 | | extension EmbeddedScheduledTask: Comparable { |
89 | 0 | static func < (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool { |
90 | 0 | if lhs.readyTime == rhs.readyTime { |
91 | 0 | return lhs.insertOrder < rhs.insertOrder |
92 | 0 | } else { |
93 | 0 | return lhs.readyTime < rhs.readyTime |
94 | 0 | } |
95 | 0 | } |
96 | | |
97 | 0 | static func == (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool { |
98 | 0 | lhs.id == rhs.id |
99 | 0 | } |
100 | | } |
101 | | |
102 | | /// An `EventLoop` that is embedded in the current running context with no external |
103 | | /// control. |
104 | | /// |
105 | | /// Unlike more complex `EventLoop`s, such as `SelectableEventLoop`, the `EmbeddedEventLoop` |
106 | | /// has no proper eventing mechanism. Instead, reads and writes are fully controlled by the |
107 | | /// entity that instantiates the `EmbeddedEventLoop`. This property makes `EmbeddedEventLoop` |
108 | | /// of limited use for many application purposes, but highly valuable for testing and other |
109 | | /// kinds of mocking. |
110 | | /// |
111 | | /// Time is controllable on an `EmbeddedEventLoop`. It begins at `NIODeadline.uptimeNanoseconds(0)` |
112 | | /// and may be advanced by a fixed amount by using `advanceTime(by:)`, or advanced to a point in |
113 | | /// time with `advanceTime(to:)`. |
114 | | /// |
115 | | /// - warning: Unlike `SelectableEventLoop`, `EmbeddedEventLoop` **is not thread-safe**. This |
116 | | /// is because it is intended to be run in the thread that instantiated it. Users are |
117 | | /// responsible for ensuring they never call into the `EmbeddedEventLoop` in an |
118 | | /// unsynchronized fashion. |
119 | | public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { |
120 | 69.5k | private var _now: NIODeadline = .uptimeNanoseconds(0) |
121 | | /// The current "time" for this event loop. This is an amount in nanoseconds. |
122 | 0 | public var now: NIODeadline { _now } |
123 | | |
124 | | private enum State { case open, closing, closed } |
125 | 69.5k | private var state: State = .open |
126 | | |
127 | 69.5k | private var scheduledTaskCounter: UInt64 = 0 |
128 | 69.5k | private var scheduledTasks = PriorityQueue<EmbeddedScheduledTask>() |
129 | | |
130 | | /// Keep track of where promises are allocated to ensure we can identify their source if they leak. |
131 | 69.5k | private var _promiseCreationStore: [_NIOEventLoopFutureIdentifier: (file: StaticString, line: UInt)] = [:] |
132 | | |
133 | | // The number of the next task to be created. We track the order so that when we execute tasks |
134 | | // scheduled at the same time, we may do so in the order in which they were submitted for |
135 | | // execution. |
136 | 69.5k | private var taskNumber: UInt64 = 0 |
137 | | |
138 | 69.5k | public let description = "EmbeddedEventLoop" |
139 | | |
140 | | #if canImport(Darwin) || canImport(Glibc) || canImport(Musl) || canImport(Android) |
141 | 69.5k | private let myThread: pthread_t = pthread_self() |
142 | | |
143 | 4.36M | func isCorrectThread() -> Bool { |
144 | 4.36M | pthread_equal(self.myThread, pthread_self()) != 0 |
145 | 4.36M | } |
146 | | #else |
147 | | func isCorrectThread() -> Bool { |
148 | | true // let's be conservative |
149 | | } |
150 | | #endif |
151 | | |
152 | 13.8k | private func nextTaskNumber() -> UInt64 { |
153 | 13.8k | defer { |
154 | 13.8k | self.taskNumber += 1 |
155 | 13.8k | } |
156 | 13.8k | return self.taskNumber |
157 | 13.8k | } |
158 | | |
159 | | /// - see: `EventLoop.inEventLoop` |
160 | 5.02M | public var inEventLoop: Bool { |
161 | 5.02M | self.checkCorrectThread() |
162 | 5.02M | return true |
163 | 5.02M | } |
164 | | |
165 | 9.67M | public func checkCorrectThread() { |
166 | 9.67M | guard self.isCorrectThread() else { |
167 | 0 | if Self.strictModeEnabled { |
168 | 0 | preconditionFailure( |
169 | 0 | "EmbeddedEventLoop is not thread-safe. You can only use it from the thread you created it on." |
170 | 0 | ) |
171 | 0 | } else { |
172 | 0 | printError( |
173 | 0 | """ |
174 | 0 | ERROR: NIO API misuse: EmbeddedEventLoop is not thread-safe. \ |
175 | 0 | You can only use it from the thread you created it on. This problem will be upgraded to a forced \ |
176 | 0 | crash in future versions of SwiftNIO. |
177 | 0 |
|
178 | 0 | """ |
179 | 0 | ) |
180 | 0 | } |
181 | 0 | return |
182 | 9.67M | } |
183 | 9.67M | } |
184 | | |
185 | | /// Initialize a new `EmbeddedEventLoop`. |
186 | 27.7k | public init() {} |
187 | | |
188 | | /// - see: `EventLoop.scheduleTask(deadline:_:)` |
189 | | @discardableResult |
190 | 13.8k | public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> { |
191 | 13.8k | self.checkCorrectThread() |
192 | 13.8k | let promise: EventLoopPromise<T> = makePromise() |
193 | 13.8k | |
194 | 13.8k | switch self.state { |
195 | 13.8k | case .open: |
196 | 13.8k | break |
197 | 13.8k | case .closing, .closed: |
198 | 0 | // If the event loop is shut down, or shutting down, immediately cancel the task. |
199 | 0 | promise.fail(EventLoopError.cancelled) |
200 | 0 | return Scheduled(promise: promise, cancellationTask: {}) |
201 | 13.8k | } |
202 | 13.8k | |
203 | 13.8k | self.scheduledTaskCounter += 1 |
204 | 13.8k | let task = EmbeddedScheduledTask( |
205 | 13.8k | id: self.scheduledTaskCounter, |
206 | 13.8k | readyTime: deadline, |
207 | 13.8k | insertOrder: self.nextTaskNumber(), |
208 | 13.8k | task: { |
209 | 13.8k | do { |
210 | 13.8k | promise.assumeIsolated().succeed(try task()) |
211 | 13.8k | } catch let err { |
212 | 0 | promise.fail(err) |
213 | 13.8k | } |
214 | 13.8k | }, |
215 | 13.8k | promise.fail $s11NIOEmbedded17EmbeddedEventLoopC12scheduleTask8deadline_7NIOCore9ScheduledVyxGAF11NIODeadlineV_xyKctlFys5Error_pcAF0cD7PromiseVyxGcfu_ Line | Count | Source | 215 | 13.8k | promise.fail |
Unexecuted instantiation: $s11NIOEmbedded17EmbeddedEventLoopC12scheduleTask8deadline_7NIOCore9ScheduledVyxGAF11NIODeadlineV_xyKctlFys5Error_pcAF0cD7PromiseVyxGcfu_ysAL_pcfu0_ |
216 | 13.8k | ) |
217 | 13.8k | |
218 | 13.8k | let taskId = task.id |
219 | 13.8k | let scheduled = Scheduled( |
220 | 13.8k | promise: promise, |
221 | 13.8k | cancellationTask: { |
222 | 0 | self.scheduledTasks.removeFirst { $0.id == taskId } |
223 | 0 | } |
224 | 13.8k | ) |
225 | 13.8k | scheduledTasks.push(task) |
226 | 13.8k | return scheduled |
227 | 13.8k | } |
228 | | |
229 | | /// - see: `EventLoop.scheduleTask(in:_:)` |
230 | | @discardableResult |
231 | 0 | public func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> { |
232 | 0 | self.checkCorrectThread() |
233 | 0 | return self.scheduleTask(deadline: self._now + `in`, task) |
234 | 0 | } |
235 | | |
236 | | @preconcurrency |
237 | | @discardableResult |
238 | | public func scheduleCallback( |
239 | | in amount: TimeAmount, |
240 | | handler: some (NIOScheduledCallbackHandler & Sendable) |
241 | 0 | ) -> NIOScheduledCallback { |
242 | 0 | self.checkCorrectThread() |
243 | 0 | /// Even though this type does not implement a custom `scheduleCallback(at:handler)`, it uses a manual clock so |
244 | 0 | /// it cannot rely on the default implementation of `scheduleCallback(in:handler:)`, which computes the deadline |
245 | 0 | /// as an offset from `NIODeadline.now`. This event loop needs the deadline to be offset from `self._now`. |
246 | 0 | return self.scheduleCallback(at: self._now + amount, handler: handler) |
247 | 0 | } |
248 | | |
249 | | /// On an `EmbeddedEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. This means that |
250 | | /// `task` will be run the next time you call `EmbeddedEventLoop.run`. |
251 | 13.8k | public func execute(_ task: @escaping () -> Void) { |
252 | 13.8k | self.checkCorrectThread() |
253 | 13.8k | self.scheduleTask(deadline: self._now, task) |
254 | 13.8k | } |
255 | | |
256 | | /// Run all tasks that have previously been submitted to this `EmbeddedEventLoop`, either by calling `execute` or |
257 | | /// events that have been enqueued using `scheduleTask`/`scheduleRepeatedTask`/`scheduleRepeatedAsyncTask` and whose |
258 | | /// deadlines have expired. |
259 | | /// |
260 | | /// - seealso: `EmbeddedEventLoop.advanceTime`. |
261 | 23.4k | public func run() { |
262 | 23.4k | self.checkCorrectThread() |
263 | 23.4k | // Execute all tasks that are currently enqueued to be executed *now*. |
264 | 23.4k | self.advanceTime(to: self._now) |
265 | 23.4k | } |
266 | | |
267 | | /// Runs the event loop and moves "time" forward by the given amount, running any scheduled |
268 | | /// tasks that need to be run. |
269 | 0 | public func advanceTime(by increment: TimeAmount) { |
270 | 0 | self.checkCorrectThread() |
271 | 0 | self.advanceTime(to: self._now + increment) |
272 | 0 | } |
273 | | |
274 | | /// Runs the event loop and moves "time" forward to the given point in time, running any scheduled |
275 | | /// tasks that need to be run. |
276 | | /// |
277 | | /// - Note: If `deadline` is before the current time, the current time will not be advanced. |
278 | 23.4k | public func advanceTime(to deadline: NIODeadline) { |
279 | 23.4k | self.checkCorrectThread() |
280 | 23.4k | let newTime = max(deadline, self._now) |
281 | 23.4k | |
282 | 23.4k | while let nextTask = self.scheduledTasks.peek() { |
283 | 13.8k | guard nextTask.readyTime <= newTime else { |
284 | 0 | break |
285 | 13.8k | } |
286 | 13.8k | |
287 | 13.8k | // Now we want to grab all tasks that are ready to execute at the same |
288 | 13.8k | // time as the first. |
289 | 13.8k | var tasks = [EmbeddedScheduledTask]() |
290 | 13.8k | while let candidateTask = self.scheduledTasks.peek(), candidateTask.readyTime == nextTask.readyTime { |
291 | 13.8k | tasks.append(candidateTask) |
292 | 13.8k | self.scheduledTasks.pop() |
293 | 13.8k | } |
294 | 13.8k | |
295 | 13.8k | // Set the time correctly before we call into user code, then |
296 | 13.8k | // call in for all tasks. |
297 | 13.8k | self._now = nextTask.readyTime |
298 | 13.8k | |
299 | 13.8k | for task in tasks { |
300 | 13.8k | task.task() |
301 | 13.8k | } |
302 | 23.4k | } |
303 | 23.4k | |
304 | 23.4k | // Finally ensure we got the time right. |
305 | 23.4k | self._now = newTime |
306 | 23.4k | } |
307 | | |
308 | 6.92k | internal func cancelRemainingScheduledTasks() { |
309 | 6.92k | self.checkCorrectThread() |
310 | 6.92k | while let task = self.scheduledTasks.pop() { |
311 | 0 | task.fail(EventLoopError.cancelled) |
312 | 6.92k | } |
313 | 6.92k | } |
314 | | |
315 | | #if canImport(Dispatch) |
316 | | /// - see: `EventLoop.shutdownGracefully` |
317 | 0 | public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { |
318 | 0 | self.checkCorrectThread() |
319 | 0 | self.state = .closing |
320 | 0 | run() |
321 | 0 | cancelRemainingScheduledTasks() |
322 | 0 | self.state = .closed |
323 | 0 | queue.sync { |
324 | 0 | callback(nil) |
325 | 0 | } |
326 | 0 | } |
327 | | #endif |
328 | | |
329 | 1.78M | public func preconditionInEventLoop(file: StaticString, line: UInt) { |
330 | 1.78M | self.checkCorrectThread() |
331 | 1.78M | // Currently, inEventLoop is always true so this always passes. |
332 | 1.78M | } |
333 | | |
334 | 0 | public func preconditionNotInEventLoop(file: StaticString, line: UInt) { |
335 | 0 | // As inEventLoop always returns true, this must always precondition. |
336 | 0 | preconditionFailure("Always in EmbeddedEventLoop", file: file, line: line) |
337 | 0 | } |
338 | | |
339 | 41.6k | public func _preconditionSafeToWait(file: StaticString, line: UInt) { |
340 | 41.6k | self.checkCorrectThread() |
341 | 41.6k | // EmbeddedEventLoop always allows a wait, as waiting will essentially always block |
342 | 41.6k | // wait() |
343 | 41.6k | return |
344 | 41.6k | } |
345 | | |
346 | 27.6k | public func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) { |
347 | 27.6k | self.checkCorrectThread() |
348 | 27.6k | precondition(_isDebugAssertConfiguration()) |
349 | 27.6k | self._promiseCreationStore[futureIdentifier] = (file: file, line: line) |
350 | 27.6k | } |
351 | | |
352 | | public func _promiseCompleted(futureIdentifier: _NIOEventLoopFutureIdentifier) -> (file: StaticString, line: UInt)? |
353 | 34.6k | { |
354 | 34.6k | self.checkCorrectThread() |
355 | 34.6k | precondition(_isDebugAssertConfiguration()) |
356 | 34.6k | return self._promiseCreationStore.removeValue(forKey: futureIdentifier) |
357 | 34.6k | } |
358 | | |
359 | 0 | public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) { |
360 | 0 | self.checkCorrectThread() |
361 | 0 | // EmbeddedEventLoop always allows a sync shutdown. |
362 | 0 | return |
363 | 0 | } |
364 | | |
365 | 0 | public func _executeIsolatedUnsafeUnchecked(_ task: @escaping () -> Void) { |
366 | 0 | // Because of the way EmbeddedEL works, we can just delegate this directly |
367 | 0 | // to execute. |
368 | 0 | self.execute(task) |
369 | 0 | } |
370 | | |
371 | 0 | public func _submitIsolatedUnsafeUnchecked<T>(_ task: @escaping () throws -> T) -> EventLoopFuture<T> { |
372 | 0 | // Because of the way EmbeddedEL works, we can delegate this directly to schedule. Note that I didn't |
373 | 0 | // say submit: we don't have an override of submit here. |
374 | 0 | self.scheduleTask(deadline: self._now, task).futureResult |
375 | 0 | } |
376 | | |
377 | | @discardableResult |
378 | | public func _scheduleTaskIsolatedUnsafeUnchecked<T>( |
379 | | deadline: NIODeadline, |
380 | | _ task: @escaping () throws -> T |
381 | 0 | ) -> Scheduled<T> { |
382 | 0 | // Because of the way EmbeddedEL works, we can delegate this directly to schedule. |
383 | 0 | self.scheduleTask(deadline: deadline, task) |
384 | 0 | } |
385 | | |
386 | | @discardableResult |
387 | | public func _scheduleTaskIsolatedUnsafeUnchecked<T>( |
388 | | in delay: TimeAmount, |
389 | | _ task: @escaping () throws -> T |
390 | 0 | ) -> Scheduled<T> { |
391 | 0 | // Because of the way EmbeddedEL works, we can delegate this directly to schedule. |
392 | 0 | self.scheduleTask(in: delay, task) |
393 | 0 | } |
394 | | |
395 | 27.7k | deinit { |
396 | 27.7k | self.checkCorrectThread() |
397 | 27.7k | precondition(scheduledTasks.isEmpty, "Embedded event loop freed with unexecuted scheduled tasks!") |
398 | 27.7k | } |
399 | | |
400 | | @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) |
401 | 0 | public var executor: any SerialExecutor { |
402 | 0 | fatalError( |
403 | 0 | "EmbeddedEventLoop is not thread safe and cannot be used as a SerialExecutor. Use NIOAsyncTestingEventLoop instead." |
404 | 0 | ) |
405 | 0 | } |
406 | | |
407 | 0 | static let strictModeEnabled: Bool = { |
408 | 0 | for ciVar in ["SWIFTNIO_STRICT", "SWIFTNIO_CI", "SWIFTNIO_STRICT_EMBEDDED"] { |
409 | 0 | #if os(Windows) |
410 | 0 | let env = Windows.getenv("SWIFTNIO_STRICT") |
411 | 0 | #else |
412 | 0 | let env = getenv("SWIFTNIO_STRICT").flatMap { String(cString: $0) } |
413 | 0 | #endif |
414 | 0 | switch env?.lowercased() { |
415 | 0 | case "true", "y", "yes", "on", "1": |
416 | 0 | return true |
417 | 0 | default: |
418 | 0 | () |
419 | 0 | } |
420 | 0 | } |
421 | 0 | return false |
422 | 0 | }() |
423 | | } |
424 | | |
425 | | // EmbeddedEventLoop is extremely _not_ Sendable. However, the EventLoop protocol |
426 | | // requires it to be. We are doing some runtime enforcement of correct use, but |
427 | | // ultimately we can't have the compiler validating this usage. |
428 | | extension EmbeddedEventLoop: @unchecked Sendable {} |
429 | | |
430 | | @usableFromInline |
431 | | class EmbeddedChannelCore: ChannelCore { |
432 | | var isOpen: Bool { |
433 | 13.8k | get { |
434 | 13.8k | self._isOpen.load(ordering: .sequentiallyConsistent) |
435 | 13.8k | } |
436 | 13.8k | set { |
437 | 13.8k | self._isOpen.store(newValue, ordering: .sequentiallyConsistent) |
438 | 13.8k | } |
439 | | } |
440 | | |
441 | | var isActive: Bool { |
442 | 6.92k | get { |
443 | 6.92k | self._isActive.load(ordering: .sequentiallyConsistent) |
444 | 6.92k | } |
445 | 6.92k | set { |
446 | 6.92k | self._isActive.store(newValue, ordering: .sequentiallyConsistent) |
447 | 6.92k | } |
448 | | } |
449 | | |
450 | | var allowRemoteHalfClosure: Bool { |
451 | 0 | get { |
452 | 0 | self._allowRemoteHalfClosure.load(ordering: .sequentiallyConsistent) |
453 | 0 | } |
454 | 0 | set { |
455 | 0 | self._allowRemoteHalfClosure.store(newValue, ordering: .sequentiallyConsistent) |
456 | 0 | } |
457 | | } |
458 | | |
459 | 13.8k | private let _isOpen = ManagedAtomic(true) |
460 | 13.8k | private let _isActive = ManagedAtomic(false) |
461 | 13.8k | private let _allowRemoteHalfClosure = ManagedAtomic(false) |
462 | | |
463 | | let eventLoop: EventLoop |
464 | | let closePromise: EventLoopPromise<Void> |
465 | | var error: Optional<Error> |
466 | | |
467 | | private let pipeline: ChannelPipeline |
468 | | |
469 | 6.92k | init(pipeline: ChannelPipeline, eventLoop: EventLoop) { |
470 | 6.92k | closePromise = eventLoop.makePromise() |
471 | 6.92k | self.pipeline = pipeline |
472 | 6.92k | self.eventLoop = eventLoop |
473 | 6.92k | self.error = nil |
474 | 6.92k | } |
475 | | |
476 | 13.8k | deinit { |
477 | 13.8k | assert( |
478 | 13.8k | !self.isOpen && !self.isActive, |
479 | 13.8k | "leaked an open EmbeddedChannel, maybe forgot to call channel.finish()?" |
480 | 13.8k | ) |
481 | 13.8k | isOpen = false |
482 | 13.8k | closePromise.succeed(()) |
483 | 13.8k | } |
484 | | |
485 | | /// Contains the flushed items that went into the `Channel` (and on a regular channel would have hit the network). |
486 | | @usableFromInline |
487 | 13.8k | var outboundBuffer: CircularBuffer<NIOAny> = CircularBuffer() |
488 | | |
489 | | /// Contains observers that want to consume the first element that would be appended to the `outboundBuffer` |
490 | | @usableFromInline |
491 | 13.8k | var outboundBufferConsumer: Deque<(NIOAny) -> Void> = [] |
492 | | |
493 | | /// Contains the unflushed items that went into the `Channel` |
494 | | @usableFromInline |
495 | 13.8k | var pendingOutboundBuffer: MarkedCircularBuffer<(NIOAny, EventLoopPromise<Void>?)> = MarkedCircularBuffer( |
496 | 13.8k | initialCapacity: 16 |
497 | 13.8k | ) |
498 | | |
499 | | /// Contains the items that travelled the `ChannelPipeline` all the way and hit the tail channel handler. On a |
500 | | /// regular `Channel` these items would be lost. |
501 | | @usableFromInline |
502 | 13.8k | var inboundBuffer: CircularBuffer<NIOAny> = CircularBuffer() |
503 | | |
504 | | /// Contains observers that want to consume the first element that would be appended to the `inboundBuffer` |
505 | | @usableFromInline |
506 | 13.8k | var inboundBufferConsumer: Deque<(NIOAny) -> Void> = [] |
507 | | |
508 | | @usableFromInline |
509 | | var localAddress: SocketAddress? |
510 | | |
511 | | @usableFromInline |
512 | | var remoteAddress: SocketAddress? |
513 | | |
514 | | @usableFromInline |
515 | 0 | func localAddress0() throws -> SocketAddress { |
516 | 0 | self.eventLoop.preconditionInEventLoop() |
517 | 0 | if let localAddress = self.localAddress { |
518 | 0 | return localAddress |
519 | 0 | } else { |
520 | 0 | throw ChannelError.operationUnsupported |
521 | 0 | } |
522 | 0 | } |
523 | | |
524 | | @usableFromInline |
525 | 0 | func remoteAddress0() throws -> SocketAddress { |
526 | 0 | self.eventLoop.preconditionInEventLoop() |
527 | 0 | if let remoteAddress = self.remoteAddress { |
528 | 0 | return remoteAddress |
529 | 0 | } else { |
530 | 0 | throw ChannelError.operationUnsupported |
531 | 0 | } |
532 | 0 | } |
533 | | |
534 | | @usableFromInline |
535 | 13.8k | func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) { |
536 | 13.8k | self.eventLoop.preconditionInEventLoop() |
537 | 13.8k | guard self.isOpen else { |
538 | 0 | promise?.fail(ChannelError.alreadyClosed) |
539 | 0 | return |
540 | 13.8k | } |
541 | 13.8k | isOpen = false |
542 | 13.8k | isActive = false |
543 | 13.8k | promise?.succeed(()) |
544 | 13.8k | |
545 | 13.8k | // As we called register() in the constructor of EmbeddedChannel we also need to ensure we call unregistered here. |
546 | 13.8k | self.pipeline.syncOperations.fireChannelInactive() |
547 | 13.8k | self.pipeline.syncOperations.fireChannelUnregistered() |
548 | 13.8k | |
549 | 13.8k | let loopBoundSelf = NIOLoopBound(self, eventLoop: self.eventLoop) |
550 | 13.8k | |
551 | 13.8k | eventLoop.execute { |
552 | 13.8k | // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline |
553 | 13.8k | let `self` = loopBoundSelf.value |
554 | 13.8k | self.removeHandlers(pipeline: self.pipeline) |
555 | 13.8k | self.closePromise.succeed(()) |
556 | 13.8k | } |
557 | 13.8k | } |
558 | | |
559 | | @usableFromInline |
560 | 0 | func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
561 | 0 | self.eventLoop.preconditionInEventLoop() |
562 | 0 | promise?.succeed(()) |
563 | 0 | } |
564 | | |
565 | | @usableFromInline |
566 | 0 | func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
567 | 0 | self.eventLoop.preconditionInEventLoop() |
568 | 0 | isActive = true |
569 | 0 | promise?.succeed(()) |
570 | 0 | self.pipeline.syncOperations.fireChannelActive() |
571 | 0 | } |
572 | | |
573 | | @usableFromInline |
574 | 13.8k | func register0(promise: EventLoopPromise<Void>?) { |
575 | 13.8k | self.eventLoop.preconditionInEventLoop() |
576 | 13.8k | promise?.succeed(()) |
577 | 13.8k | self.pipeline.syncOperations.fireChannelRegistered() |
578 | 13.8k | } |
579 | | |
580 | | @usableFromInline |
581 | 0 | func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) { |
582 | 0 | self.eventLoop.preconditionInEventLoop() |
583 | 0 | isActive = true |
584 | 0 | register0(promise: promise) |
585 | 0 | self.pipeline.syncOperations.fireChannelActive() |
586 | 0 | } |
587 | | |
588 | | @usableFromInline |
589 | 0 | func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
590 | 0 | self.eventLoop.preconditionInEventLoop() |
591 | 0 | self.pendingOutboundBuffer.append((data, promise)) |
592 | 0 | } |
593 | | |
594 | | @usableFromInline |
595 | 0 | func flush0() { |
596 | 0 | self.eventLoop.preconditionInEventLoop() |
597 | 0 | self.pendingOutboundBuffer.mark() |
598 | 0 |
|
599 | 0 | while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() { |
600 | 0 | self.addToBuffer( |
601 | 0 | buffer: &self.outboundBuffer, |
602 | 0 | consumer: &self.outboundBufferConsumer, |
603 | 0 | data: dataAndPromise.0 |
604 | 0 | ) |
605 | 0 | dataAndPromise.1?.succeed(()) |
606 | 0 | } |
607 | 0 | } |
608 | | |
609 | | @usableFromInline |
610 | 0 | func read0() { |
611 | 0 | self.eventLoop.preconditionInEventLoop() |
612 | 0 | // NOOP |
613 | 0 | } |
614 | | |
615 | 0 | public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) { |
616 | 0 | self.eventLoop.preconditionInEventLoop() |
617 | 0 | promise?.fail(ChannelError.operationUnsupported) |
618 | 0 | } |
619 | | |
620 | | @usableFromInline |
621 | 2.40M | func channelRead0(_ data: NIOAny) { |
622 | 2.40M | self.eventLoop.preconditionInEventLoop() |
623 | 2.40M | self.addToBuffer( |
624 | 2.40M | buffer: &self.inboundBuffer, |
625 | 2.40M | consumer: &self.inboundBufferConsumer, |
626 | 2.40M | data: data |
627 | 2.40M | ) |
628 | 2.40M | } |
629 | | |
630 | 13.2k | public func errorCaught0(error: Error) { |
631 | 13.2k | self.eventLoop.preconditionInEventLoop() |
632 | 13.2k | if self.error == nil { |
633 | 12.9k | self.error = error |
634 | 13.2k | } |
635 | 13.2k | } |
636 | | |
637 | | private func addToBuffer( |
638 | | buffer: inout CircularBuffer<NIOAny>, |
639 | | consumer: inout Deque<(NIOAny) -> Void>, |
640 | | data: NIOAny |
641 | 2.40M | ) { |
642 | 2.40M | self.eventLoop.preconditionInEventLoop() |
643 | 2.40M | if let consume = consumer.popFirst() { |
644 | 0 | consume(data) |
645 | 2.40M | } else { |
646 | 2.40M | buffer.append(data) |
647 | 2.40M | } |
648 | 2.40M | } |
649 | | } |
650 | | |
651 | | // ChannelCores are basically never Sendable. |
652 | | @available(*, unavailable) |
653 | | extension EmbeddedChannelCore: Sendable {} |
654 | | |
655 | | /// `EmbeddedChannel` is a `Channel` implementation that does neither any |
656 | | /// actual IO nor has a proper eventing mechanism. The prime use-case for |
657 | | /// `EmbeddedChannel` is in unit tests when you want to feed the inbound events |
658 | | /// and check the outbound events manually. |
659 | | /// |
660 | | /// Please remember to call `finish()` when you are no longer using this |
661 | | /// `EmbeddedChannel`. |
662 | | /// |
663 | | /// To feed events through an `EmbeddedChannel`'s `ChannelPipeline` use |
664 | | /// `EmbeddedChannel.writeInbound` which accepts data of any type. It will then |
665 | | /// forward that data through the `ChannelPipeline` and the subsequent |
666 | | /// `ChannelInboundHandler` will receive it through the usual `channelRead` |
667 | | /// event. The user is responsible for making sure the first |
668 | | /// `ChannelInboundHandler` expects data of that type. |
669 | | /// |
670 | | /// `EmbeddedChannel` automatically collects arriving outbound data and makes it |
671 | | /// available one-by-one through `readOutbound`. |
672 | | /// |
673 | | /// - Note: `EmbeddedChannel` is currently only compatible with |
674 | | /// `EmbeddedEventLoop`s and cannot be used with `SelectableEventLoop`s from |
675 | | /// for example `MultiThreadedEventLoopGroup`. |
676 | | /// - warning: Unlike other `Channel`s, `EmbeddedChannel` **is not thread-safe**. This |
677 | | /// is because it is intended to be run in the thread that instantiated it. Users are |
678 | | /// responsible for ensuring they never call into an `EmbeddedChannel` in an |
679 | | /// unsynchronized fashion. `EmbeddedEventLoop`s notes also apply as |
680 | | /// `EmbeddedChannel` uses an `EmbeddedEventLoop` as its `EventLoop`. |
681 | | public final class EmbeddedChannel: Channel { |
682 | | /// `LeftOverState` represents any left-over inbound, outbound, and pending outbound events that hit the |
683 | | /// `EmbeddedChannel` and were not consumed when `finish` was called on the `EmbeddedChannel`. |
684 | | /// |
685 | | /// `EmbeddedChannel` is most useful in testing and usually in unit tests, you want to consume all inbound and |
686 | | /// outbound data to verify they are what you expect. Therefore, when you `finish` an `EmbeddedChannel` it will |
687 | | /// return if it's either `.clean` (no left overs) or that it has `.leftOvers`. |
688 | | public enum LeftOverState { |
689 | | /// The `EmbeddedChannel` is clean, ie. no inbound, outbound, or pending outbound data left on `finish`. |
690 | | case clean |
691 | | |
692 | | /// The `EmbeddedChannel` has inbound, outbound, or pending outbound data left on `finish`. |
693 | | case leftOvers(inbound: [NIOAny], outbound: [NIOAny], pendingOutbound: [NIOAny]) |
694 | | |
695 | | /// `true` if the `EmbeddedChannel` was `clean` on `finish`, ie. there is no unconsumed inbound, outbound, or |
696 | | /// pending outbound data left on the `Channel`. |
697 | 0 | public var isClean: Bool { |
698 | 0 | if case .clean = self { |
699 | 0 | return true |
700 | 0 | } else { |
701 | 0 | return false |
702 | 0 | } |
703 | 0 | } |
704 | | |
705 | | /// `true` if the `EmbeddedChannel` if there was unconsumed inbound, outbound, or pending outbound data left |
706 | | /// on the `Channel` when it was `finish`ed. |
707 | 0 | public var hasLeftOvers: Bool { |
708 | 0 | !self.isClean |
709 | 0 | } |
710 | | } |
711 | | |
712 | | /// `BufferState` represents the state of either the inbound, or the outbound `EmbeddedChannel` buffer. These |
713 | | /// buffers contain data that travelled the `ChannelPipeline` all the way. |
714 | | /// |
715 | | /// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing |
716 | | /// `channelRead`) sends inbound data into the end of the `EmbeddedChannel`, it will be held in the |
717 | | /// `EmbeddedChannel`'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective |
718 | | /// buffer will be returned from `writeInbound`/`writeOutbound` as a `BufferState`. |
719 | | public enum BufferState { |
720 | | /// The buffer is empty. |
721 | | case empty |
722 | | |
723 | | /// The buffer is non-empty. |
724 | | case full([NIOAny]) |
725 | | |
726 | | /// Returns `true` is the buffer was empty. |
727 | 0 | public var isEmpty: Bool { |
728 | 0 | if case .empty = self { |
729 | 0 | return true |
730 | 0 | } else { |
731 | 0 | return false |
732 | 0 | } |
733 | 0 | } |
734 | | |
735 | | /// Returns `true` if the buffer was non-empty. |
736 | 0 | public var isFull: Bool { |
737 | 0 | !self.isEmpty |
738 | 0 | } |
739 | | } |
740 | | |
741 | | /// `WrongTypeError` is throws if you use `readInbound` or `readOutbound` and request a certain type but the first |
742 | | /// item in the respective buffer is of a different type. |
743 | | public struct WrongTypeError: Error, Equatable { |
744 | | /// The type you expected. |
745 | | public let expected: Any.Type |
746 | | |
747 | | /// The type of the actual first element. |
748 | | public let actual: Any.Type |
749 | | |
750 | 0 | public init(expected: Any.Type, actual: Any.Type) { |
751 | 0 | self.expected = expected |
752 | 0 | self.actual = actual |
753 | 0 | } |
754 | | |
755 | 0 | public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool { |
756 | 0 | lhs.expected == rhs.expected && lhs.actual == rhs.actual |
757 | 0 | } |
758 | | } |
759 | | |
760 | | /// Returns `true` if the `EmbeddedChannel` is 'active'. |
761 | | /// |
762 | | /// An active `EmbeddedChannel` can be closed by calling `close` or `finish` on the `EmbeddedChannel`. |
763 | | /// |
764 | | /// - Note: An `EmbeddedChannel` starts _inactive_ and can be activated, for example by calling `connect`. |
765 | 0 | public var isActive: Bool { channelcore.isActive } |
766 | | |
767 | | /// - see: `ChannelOptions.Types.AllowRemoteHalfClosureOption` |
768 | | public var allowRemoteHalfClosure: Bool { |
769 | 0 | get { |
770 | 0 | self.embeddedEventLoop.checkCorrectThread() |
771 | 0 | return channelcore.allowRemoteHalfClosure |
772 | 0 | } |
773 | 0 | set { |
774 | 0 | self.embeddedEventLoop.checkCorrectThread() |
775 | 0 | channelcore.allowRemoteHalfClosure = newValue |
776 | 0 | } |
777 | | } |
778 | | |
779 | | /// - see: `Channel.closeFuture` |
780 | 0 | public var closeFuture: EventLoopFuture<Void> { channelcore.closePromise.futureResult } |
781 | | |
782 | | @usableFromInline |
783 | 13.8k | lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore( |
784 | 13.8k | pipeline: self._pipeline, |
785 | 13.8k | eventLoop: self.eventLoop |
786 | 13.8k | ) |
787 | | |
788 | | /// - see: `Channel._channelCore` |
789 | 2.44M | public var _channelCore: ChannelCore { |
790 | 2.44M | self.embeddedEventLoop.checkCorrectThread() |
791 | 2.44M | return self.channelcore |
792 | 2.44M | } |
793 | | |
794 | | /// - see: `Channel.pipeline` |
795 | 69.4k | public var pipeline: ChannelPipeline { |
796 | 69.4k | self.embeddedEventLoop.checkCorrectThread() |
797 | 69.4k | return self._pipeline |
798 | 69.4k | } |
799 | | |
800 | | /// - see: `Channel.isWritable` |
801 | 34.7k | public var isWritable: Bool = true |
802 | | |
803 | | @usableFromInline |
804 | 34.7k | internal var _options: [(option: any ChannelOption, value: any Sendable)] = [] |
805 | | |
806 | | /// The `ChannelOption`s set on this channel. |
807 | | /// - see: `Embedded.setOption` |
808 | 0 | public var options: [(option: any ChannelOption, value: any Sendable)] { self._options } |
809 | | |
810 | | /// Synchronously closes the `EmbeddedChannel`. |
811 | | /// |
812 | | /// Errors in the `EmbeddedChannel` can be consumed using `throwIfErrorCaught`. |
813 | | /// |
814 | | /// - Parameters: |
815 | | /// - acceptAlreadyClosed: Whether `finish` should throw if the `EmbeddedChannel` has been previously `close`d. |
816 | | /// - Returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been |
817 | | /// consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed |
818 | | /// writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound |
819 | | /// events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`. |
820 | 13.8k | public func finish(acceptAlreadyClosed: Bool) throws -> LeftOverState { |
821 | 13.8k | self.embeddedEventLoop.checkCorrectThread() |
822 | 13.8k | do { |
823 | 13.8k | try close().wait() |
824 | 13.8k | } catch let error as ChannelError { |
825 | 0 | guard error == .alreadyClosed && acceptAlreadyClosed else { |
826 | 0 | throw error |
827 | 0 | } |
828 | 13.8k | } |
829 | 13.8k | self.embeddedEventLoop.run() |
830 | 13.8k | self.embeddedEventLoop.cancelRemainingScheduledTasks() |
831 | 13.8k | try throwIfErrorCaught() |
832 | 13.8k | let c = self.channelcore |
833 | 13.8k | if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty { |
834 | 2.78k | return .clean |
835 | 11.1k | } else { |
836 | 11.1k | return .leftOvers( |
837 | 11.1k | inbound: Array(c.inboundBuffer), |
838 | 11.1k | outbound: Array(c.outboundBuffer), |
839 | 11.1k | pendingOutbound: c.pendingOutboundBuffer.map { $0.0 } |
840 | 11.1k | ) |
841 | 11.1k | } |
842 | 11.1k | } |
843 | | |
844 | | /// Synchronously closes the `EmbeddedChannel`. |
845 | | /// |
846 | | /// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the |
847 | | /// `EmbeddedChannel` can be consumed using `throwIfErrorCaught`. |
848 | | /// |
849 | | /// - Returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been |
850 | | /// consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed |
851 | | /// writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound |
852 | | /// events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`. |
853 | 0 | public func finish() throws -> LeftOverState { |
854 | 0 | self.embeddedEventLoop.checkCorrectThread() |
855 | 0 | return try self.finish(acceptAlreadyClosed: false) |
856 | 0 | } |
857 | | |
858 | | private var _pipeline: ChannelPipeline! |
859 | | |
860 | | /// - see: `Channel.allocator` |
861 | 34.7k | public var allocator: ByteBufferAllocator = ByteBufferAllocator() |
862 | | |
863 | | /// - see: `Channel.eventLoop` |
864 | 55.5k | public var eventLoop: EventLoop { |
865 | 55.5k | self.embeddedEventLoop.checkCorrectThread() |
866 | 55.5k | return self.embeddedEventLoop |
867 | 55.5k | } |
868 | | |
869 | | /// Returns the `EmbeddedEventLoop` that this `EmbeddedChannel` uses. This will return the same instance as |
870 | | /// `EmbeddedChannel.eventLoop` but as the concrete `EmbeddedEventLoop` rather than as `EventLoop` existential. |
871 | 34.7k | public var embeddedEventLoop: EmbeddedEventLoop = EmbeddedEventLoop() |
872 | | |
873 | | /// - see: `Channel.localAddress` |
874 | | public var localAddress: SocketAddress? { |
875 | 0 | get { |
876 | 0 | self.embeddedEventLoop.checkCorrectThread() |
877 | 0 | return self.channelcore.localAddress |
878 | 0 | } |
879 | 0 | set { |
880 | 0 | self.embeddedEventLoop.checkCorrectThread() |
881 | 0 | self.channelcore.localAddress = newValue |
882 | 0 | } |
883 | | } |
884 | | |
885 | | /// - see: `Channel.remoteAddress` |
886 | | public var remoteAddress: SocketAddress? { |
887 | 0 | get { |
888 | 0 | self.embeddedEventLoop.checkCorrectThread() |
889 | 0 | return self.channelcore.remoteAddress |
890 | 0 | } |
891 | 0 | set { |
892 | 0 | self.embeddedEventLoop.checkCorrectThread() |
893 | 0 | self.channelcore.remoteAddress = newValue |
894 | 0 | } |
895 | | } |
896 | | |
897 | | /// `nil` because `EmbeddedChannel`s don't have parents. |
898 | 34.7k | public let parent: Channel? = nil |
899 | | |
900 | | /// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s outbound buffer. If the |
901 | | /// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there |
902 | | /// are no elements in the outbound buffer, `nil` will be returned. |
903 | | /// |
904 | | /// Data hits the `EmbeddedChannel`'s outbound buffer when data was written using `write`, then `flush`ed, and |
905 | | /// then travelled the `ChannelPipeline` all the way too the front. For data to hit the outbound buffer, the very |
906 | | /// first `ChannelHandler` must have written and flushed it either explicitly (by calling |
907 | | /// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`. |
908 | | /// |
909 | | /// - Note: Outbound events travel the `ChannelPipeline` _back to front_. |
910 | | /// - Note: `EmbeddedChannel.writeOutbound` will `write` data through the `ChannelPipeline`, starting with last |
911 | | /// `ChannelHandler`. |
912 | | @inlinable |
913 | 0 | public func readOutbound<T>(as type: T.Type = T.self) throws -> T? { |
914 | 0 | self.embeddedEventLoop.checkCorrectThread() |
915 | 0 | return try _readFromBuffer(buffer: &channelcore.outboundBuffer) |
916 | 0 | } |
917 | | |
918 | | /// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s inbound buffer. If the |
919 | | /// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there |
920 | | /// are no elements in the outbound buffer, `nil` will be returned. |
921 | | /// |
922 | | /// Data hits the `EmbeddedChannel`'s inbound buffer when data was send through the pipeline using `fireChannelRead` |
923 | | /// and then travelled the `ChannelPipeline` all the way too the back. For data to hit the inbound buffer, the |
924 | | /// last `ChannelHandler` must have send the event either explicitly (by calling |
925 | | /// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`. |
926 | | /// |
927 | | /// - Note: `EmbeddedChannel.writeInbound` will fire data through the `ChannelPipeline` using `fireChannelRead`. |
928 | | @inlinable |
929 | 0 | public func readInbound<T>(as type: T.Type = T.self) throws -> T? { |
930 | 0 | self.embeddedEventLoop.checkCorrectThread() |
931 | 0 | return try _readFromBuffer(buffer: &channelcore.inboundBuffer) |
932 | 0 | } |
933 | | |
934 | | /// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`. |
935 | | /// |
936 | | /// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called |
937 | | /// with the data you provide. |
938 | | /// |
939 | | /// - Parameters: |
940 | | /// - data: The data to fire through the pipeline. |
941 | | /// - Returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline` |
942 | | // all the way. |
943 | | @inlinable |
944 | 34.7k | @discardableResult public func writeInbound<T>(_ data: T) throws -> BufferState { |
945 | 34.7k | self.embeddedEventLoop.checkCorrectThread() |
946 | 34.7k | self.pipeline.syncOperations.fireChannelRead(NIOAny(data)) |
947 | 34.7k | self.pipeline.syncOperations.fireChannelReadComplete() |
948 | 34.7k | try self.throwIfErrorCaught() |
949 | 34.7k | return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer)) |
950 | 34.7k | } |
951 | | |
952 | | /// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`. |
953 | | /// |
954 | | /// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called |
955 | | /// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler |
956 | | /// because outbound events travel the pipeline from back to front. |
957 | | /// |
958 | | /// - Parameters: |
959 | | /// - data: The data to fire through the pipeline. |
960 | | /// - Returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline` |
961 | | // all the way. |
962 | | @inlinable |
963 | 0 | @discardableResult public func writeOutbound<T>(_ data: T) throws -> BufferState { |
964 | 0 | self.embeddedEventLoop.checkCorrectThread() |
965 | 0 | try self.writeAndFlush(data).wait() |
966 | 0 | return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.outboundBuffer)) |
967 | 0 | } |
968 | | |
969 | | /// This method will throw the error that is stored in the `EmbeddedChannel` if any. |
970 | | /// |
971 | | /// The `EmbeddedChannel` will store an error some error travels the `ChannelPipeline` all the way past its end. |
972 | 27.7k | public func throwIfErrorCaught() throws { |
973 | 27.7k | self.embeddedEventLoop.checkCorrectThread() |
974 | 27.7k | if let error = channelcore.error { |
975 | 12.9k | self.channelcore.error = nil |
976 | 12.9k | throw error |
977 | 14.8k | } |
978 | 14.8k | } |
979 | | |
980 | | @inlinable |
981 | 0 | func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? { |
982 | 0 | self.embeddedEventLoop.checkCorrectThread() |
983 | 0 | if buffer.isEmpty { |
984 | 0 | return nil |
985 | 0 | } |
986 | 0 | let elem = buffer.removeFirst() |
987 | 0 | guard let t = self._channelCore.tryUnwrapData(elem, as: T.self) else { |
988 | 0 | throw WrongTypeError( |
989 | 0 | expected: T.self, |
990 | 0 | actual: type(of: self._channelCore.tryUnwrapData(elem, as: Any.self)!) |
991 | 0 | ) |
992 | 0 | } |
993 | 0 | return t |
994 | 0 | } |
995 | | |
996 | | /// Create a new instance. |
997 | | /// |
998 | | /// During creation it will automatically also register itself on the `EmbeddedEventLoop`. |
999 | | /// |
1000 | | /// - Parameters: |
1001 | | /// - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register or `nil` if none should be added. |
1002 | | /// - loop: The `EmbeddedEventLoop` to use. |
1003 | 13.8k | public convenience init(handler: ChannelHandler? = nil, loop: EmbeddedEventLoop = EmbeddedEventLoop()) { |
1004 | 13.8k | let handlers = handler.map { [$0] } ?? [] |
1005 | 13.8k | self.init(handlers: handlers, loop: loop) |
1006 | 13.8k | self.embeddedEventLoop.checkCorrectThread() |
1007 | 13.8k | } |
1008 | | |
1009 | | /// Create a new instance. |
1010 | | /// |
1011 | | /// During creation it will automatically also register itself on the `EmbeddedEventLoop`. |
1012 | | /// |
1013 | | /// - Parameters: |
1014 | | /// - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register. |
1015 | | /// - loop: The `EmbeddedEventLoop` to use. |
1016 | 13.8k | public init(handlers: [ChannelHandler], loop: EmbeddedEventLoop = EmbeddedEventLoop()) { |
1017 | 13.8k | self.embeddedEventLoop = loop |
1018 | 13.8k | self._pipeline = ChannelPipeline(channel: self) |
1019 | 13.8k | |
1020 | 13.8k | try! self._pipeline.syncOperations.addHandlers(handlers) |
1021 | 13.8k | |
1022 | 13.8k | // This will never throw... |
1023 | 13.8k | try! register().wait() |
1024 | 13.8k | self.embeddedEventLoop.checkCorrectThread() |
1025 | 13.8k | } |
1026 | | |
1027 | | /// - see: `Channel.setOption` |
1028 | | @inlinable |
1029 | 0 | public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> { |
1030 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1031 | 0 | self.setOptionSync(option, value: value) |
1032 | 0 | return self.eventLoop.makeSucceededVoidFuture() |
1033 | 0 | } |
1034 | | |
1035 | | @inlinable |
1036 | 0 | internal func setOptionSync<Option: ChannelOption>(_ option: Option, value: Option.Value) { |
1037 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1038 | 0 |
|
1039 | 0 | self.addOption(option, value: value) |
1040 | 0 |
|
1041 | 0 | if option is ChannelOptions.Types.AllowRemoteHalfClosureOption { |
1042 | 0 | self.allowRemoteHalfClosure = value as! Bool |
1043 | 0 | return |
1044 | 0 | } |
1045 | 0 | } |
1046 | | |
1047 | | /// - see: `Channel.getOption` |
1048 | | @inlinable |
1049 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> { |
1050 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1051 | 0 | return self.eventLoop.makeSucceededFuture(self.getOptionSync(option)) |
1052 | 0 | } |
1053 | | |
1054 | | @inlinable |
1055 | 0 | internal func getOptionSync<Option: ChannelOption>(_ option: Option) -> Option.Value { |
1056 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1057 | 0 | if option is ChannelOptions.Types.AutoReadOption { |
1058 | 0 | return true as! Option.Value |
1059 | 0 | } |
1060 | 0 | if option is ChannelOptions.Types.AllowRemoteHalfClosureOption { |
1061 | 0 | return self.allowRemoteHalfClosure as! Option.Value |
1062 | 0 | } |
1063 | 0 | if option is ChannelOptions.Types.BufferedWritableBytesOption { |
1064 | 0 | let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in |
1065 | 0 | let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self) |
1066 | 0 | return partialResult + buffer.readableBytes |
1067 | 0 | } |
1068 | 0 |
|
1069 | 0 | return result as! Option.Value |
1070 | 0 | } |
1071 | 0 |
|
1072 | 0 | guard let value = optionValue(for: option) else { |
1073 | 0 | fatalError("option \(option) not supported") |
1074 | 0 | } |
1075 | 0 |
|
1076 | 0 | return value |
1077 | 0 | } |
1078 | | |
1079 | | @inlinable |
1080 | 0 | internal func addOption<Option: ChannelOption>(_ option: Option, value: Option.Value) { |
1081 | 0 | if let optionIndex = self._options.firstIndex(where: { $0.option is Option }) { |
1082 | 0 | self._options[optionIndex] = (option: option, value: value) |
1083 | 0 | } else { |
1084 | 0 | self._options.append((option: option, value: value)) |
1085 | 0 | } |
1086 | 0 | } |
1087 | | |
1088 | | @inlinable |
1089 | 0 | internal func optionValue<Option: ChannelOption>(for option: Option) -> Option.Value? { |
1090 | 0 | self.options.first(where: { $0.option is Option })?.value as? Option.Value |
1091 | 0 | } |
1092 | | |
1093 | | /// Fires the (outbound) `bind` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel` which |
1094 | | /// happens when it travels the `ChannelPipeline` all the way to the front, this will also set the |
1095 | | /// `EmbeddedChannel`'s `localAddress`. |
1096 | | /// |
1097 | | /// - Parameters: |
1098 | | /// - address: The address to fake-bind to. |
1099 | | /// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done. |
1100 | 0 | public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1101 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1102 | 0 | let promise = promise ?? self.embeddedEventLoop.makePromise() |
1103 | 0 | promise.futureResult.whenSuccess { |
1104 | 0 | self.localAddress = address |
1105 | 0 | } |
1106 | 0 | self.pipeline.bind(to: address, promise: promise) |
1107 | 0 | } |
1108 | | |
1109 | | /// Fires the (outbound) `connect` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel` |
1110 | | /// which happens when it travels the `ChannelPipeline` all the way to the front, this will also set the |
1111 | | /// `EmbeddedChannel`'s `remoteAddress`. |
1112 | | /// |
1113 | | /// - Parameters: |
1114 | | /// - address: The address to fake-bind to. |
1115 | | /// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done. |
1116 | 0 | public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1117 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1118 | 0 | let promise = promise ?? self.embeddedEventLoop.makePromise() |
1119 | 0 | promise.futureResult.whenSuccess { |
1120 | 0 | self.remoteAddress = address |
1121 | 0 | } |
1122 | 0 | self.pipeline.connect(to: address, promise: promise) |
1123 | 0 | } |
1124 | | |
1125 | | /// An overload of `Channel.write` that does not require a Sendable type, as ``EmbeddedEventLoop`` |
1126 | | /// is bound to a single thread. |
1127 | | @inlinable |
1128 | 0 | public func write<T>(_ data: T, promise: EventLoopPromise<Void>?) { |
1129 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1130 | 0 | self.pipeline.syncOperations.write(NIOAny(data), promise: promise) |
1131 | 0 | } |
1132 | | |
1133 | | /// An overload of `Channel.write` that does not require a Sendable type, as ``EmbeddedEventLoop`` |
1134 | | /// is bound to a single thread. |
1135 | | @inlinable |
1136 | 0 | public func write<T>(_ data: T) -> EventLoopFuture<Void> { |
1137 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1138 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
1139 | 0 | self.pipeline.syncOperations.write(NIOAny(data), promise: promise) |
1140 | 0 | return promise.futureResult |
1141 | 0 | } |
1142 | | |
1143 | | /// An overload of `Channel.writeAndFlush` that does not require a Sendable type, as ``EmbeddedEventLoop`` |
1144 | | /// is bound to a single thread. |
1145 | | @inlinable |
1146 | 0 | public func writeAndFlush<T>(_ data: T, promise: EventLoopPromise<Void>?) { |
1147 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1148 | 0 | self.pipeline.syncOperations.writeAndFlush(NIOAny(data), promise: promise) |
1149 | 0 | } |
1150 | | |
1151 | | /// An overload of `Channel.writeAndFlush` that does not require a Sendable type, as ``EmbeddedEventLoop`` |
1152 | | /// is bound to a single thread. |
1153 | | @inlinable |
1154 | 0 | public func writeAndFlush<T>(_ data: T) -> EventLoopFuture<Void> { |
1155 | 0 | self.embeddedEventLoop.checkCorrectThread() |
1156 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
1157 | 0 | self.pipeline.syncOperations.writeAndFlush(NIOAny(data), promise: promise) |
1158 | 0 | return promise.futureResult |
1159 | 0 | } |
1160 | | } |
1161 | | |
1162 | | extension EmbeddedChannel { |
1163 | | public struct SynchronousOptions: NIOSynchronousChannelOptions { |
1164 | | @usableFromInline |
1165 | | internal let channel: EmbeddedChannel |
1166 | | |
1167 | 0 | fileprivate init(channel: EmbeddedChannel) { |
1168 | 0 | self.channel = channel |
1169 | 0 | } |
1170 | | |
1171 | | @inlinable |
1172 | 0 | public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws { |
1173 | 0 | self.channel.setOptionSync(option, value: value) |
1174 | 0 | } |
1175 | | |
1176 | | @inlinable |
1177 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value { |
1178 | 0 | self.channel.getOptionSync(option) |
1179 | 0 | } |
1180 | | } |
1181 | | |
1182 | 0 | public final var syncOptions: NIOSynchronousChannelOptions? { |
1183 | 0 | SynchronousOptions(channel: self) |
1184 | 0 | } |
1185 | | } |
1186 | | |
1187 | | // EmbeddedChannel is extremely _not_ Sendable. However, the Channel protocol |
1188 | | // requires it to be. We are doing some runtime enforcement of correct use, but |
1189 | | // ultimately we can't have the compiler validating this usage. |
1190 | | extension EmbeddedChannel: @unchecked Sendable {} |
1191 | | |
1192 | | @available(*, unavailable) |
1193 | | extension EmbeddedChannel.LeftOverState: @unchecked Sendable {} |
1194 | | |
1195 | | @available(*, unavailable) |
1196 | | extension EmbeddedChannel.BufferState: @unchecked Sendable {} |
1197 | | |
1198 | | @available(*, unavailable) |
1199 | | extension EmbeddedChannel.SynchronousOptions: Sendable {} |