Coverage Report

Created: 2025-09-08 06:42

/src/swift-nio/Sources/NIOEmbedded/Embedded.swift
Line
Count
Source (jump to first uncovered line)
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
15
import Atomics
16
import DequeModule
17
import NIOConcurrencyHelpers
18
import NIOCore
19
import _NIODataStructures
20
21
#if canImport(Dispatch)
22
import Dispatch
23
#endif
24
25
#if canImport(Darwin)
26
import Darwin
27
#elseif canImport(Glibc)
28
@preconcurrency import Glibc
29
#elseif canImport(Musl)
30
@preconcurrency import Musl
31
#elseif canImport(Android)
32
@preconcurrency import Android
33
#elseif canImport(WASILibc)
34
@preconcurrency import WASILibc
35
#elseif canImport(WinSDK)
36
@preconcurrency import WinSDK
37
#else
38
#error("Unknown C library.")
39
#endif
40
41
0
private func printError(_ string: StaticString) {
42
0
    string.withUTF8Buffer { buf in
43
0
        var buf = buf
44
0
        while buf.count > 0 {
45
0
            // 2 is stderr
46
0
            #if canImport(WinSDK)
47
0
            let rc = _write(2, buf.baseAddress, UInt32(truncatingIfNeeded: buf.count))
48
0
            let errno = GetLastError()
49
0
            #else
50
0
            let rc = write(2, buf.baseAddress, buf.count)
51
0
            #endif
52
0
            if rc < 0 {
53
0
                let err = errno
54
0
                if err == EINTR { continue }
55
0
                fatalError("Unexpected error writing: \(err)")
56
0
            }
57
0
            buf = .init(rebasing: buf.dropFirst(Int(rc)))
58
0
        }
59
0
    }
60
0
}
61
62
internal struct EmbeddedScheduledTask {
63
    let id: UInt64
64
    let task: () -> Void
65
    let failFn: (Error) -> Void
66
    let readyTime: NIODeadline
67
    let insertOrder: UInt64
68
69
    init(
70
        id: UInt64,
71
        readyTime: NIODeadline,
72
        insertOrder: UInt64,
73
        task: @escaping () -> Void,
74
        _ failFn: @escaping (Error) -> Void
75
6.92k
    ) {
76
6.92k
        self.id = id
77
6.92k
        self.readyTime = readyTime
78
6.92k
        self.insertOrder = insertOrder
79
6.92k
        self.task = task
80
6.92k
        self.failFn = failFn
81
6.92k
    }
82
83
0
    func fail(_ error: Error) {
84
0
        self.failFn(error)
85
0
    }
86
}
87
88
extension EmbeddedScheduledTask: Comparable {
89
0
    static func < (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
90
0
        if lhs.readyTime == rhs.readyTime {
91
0
            return lhs.insertOrder < rhs.insertOrder
92
0
        } else {
93
0
            return lhs.readyTime < rhs.readyTime
94
0
        }
95
0
    }
96
97
0
    static func == (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
98
0
        lhs.id == rhs.id
99
0
    }
100
}
101
102
/// An `EventLoop` that is embedded in the current running context with no external
103
/// control.
104
///
105
/// Unlike more complex `EventLoop`s, such as `SelectableEventLoop`, the `EmbeddedEventLoop`
106
/// has no proper eventing mechanism. Instead, reads and writes are fully controlled by the
107
/// entity that instantiates the `EmbeddedEventLoop`. This property makes `EmbeddedEventLoop`
108
/// of limited use for many application purposes, but highly valuable for testing and other
109
/// kinds of mocking.
110
///
111
/// Time is controllable on an `EmbeddedEventLoop`. It begins at `NIODeadline.uptimeNanoseconds(0)`
112
/// and may be advanced by a fixed amount by using `advanceTime(by:)`, or advanced to a point in
113
/// time with `advanceTime(to:)`.
114
///
115
/// - warning: Unlike `SelectableEventLoop`, `EmbeddedEventLoop` **is not thread-safe**. This
116
///     is because it is intended to be run in the thread that instantiated it. Users are
117
///     responsible for ensuring they never call into the `EmbeddedEventLoop` in an
118
///     unsynchronized fashion.
119
public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible {
120
69.5k
    private var _now: NIODeadline = .uptimeNanoseconds(0)
121
    /// The current "time" for this event loop. This is an amount in nanoseconds.
122
0
    public var now: NIODeadline { _now }
123
124
    private enum State { case open, closing, closed }
125
69.5k
    private var state: State = .open
126
127
69.5k
    private var scheduledTaskCounter: UInt64 = 0
128
69.5k
    private var scheduledTasks = PriorityQueue<EmbeddedScheduledTask>()
129
130
    /// Keep track of where promises are allocated to ensure we can identify their source if they leak.
131
69.5k
    private var _promiseCreationStore: [_NIOEventLoopFutureIdentifier: (file: StaticString, line: UInt)] = [:]
132
133
    // The number of the next task to be created. We track the order so that when we execute tasks
134
    // scheduled at the same time, we may do so in the order in which they were submitted for
135
    // execution.
136
69.5k
    private var taskNumber: UInt64 = 0
137
138
69.5k
    public let description = "EmbeddedEventLoop"
139
140
    #if canImport(Darwin) || canImport(Glibc) || canImport(Musl) || canImport(Android)
141
69.5k
    private let myThread: pthread_t = pthread_self()
142
143
4.36M
    func isCorrectThread() -> Bool {
144
4.36M
        pthread_equal(self.myThread, pthread_self()) != 0
145
4.36M
    }
146
    #else
147
    func isCorrectThread() -> Bool {
148
        true  // let's be conservative
149
    }
150
    #endif
151
152
13.8k
    private func nextTaskNumber() -> UInt64 {
153
13.8k
        defer {
154
13.8k
            self.taskNumber += 1
155
13.8k
        }
156
13.8k
        return self.taskNumber
157
13.8k
    }
158
159
    /// - see: `EventLoop.inEventLoop`
160
5.02M
    public var inEventLoop: Bool {
161
5.02M
        self.checkCorrectThread()
162
5.02M
        return true
163
5.02M
    }
164
165
9.67M
    public func checkCorrectThread() {
166
9.67M
        guard self.isCorrectThread() else {
167
0
            if Self.strictModeEnabled {
168
0
                preconditionFailure(
169
0
                    "EmbeddedEventLoop is not thread-safe. You can only use it from the thread you created it on."
170
0
                )
171
0
            } else {
172
0
                printError(
173
0
                    """
174
0
                    ERROR: NIO API misuse: EmbeddedEventLoop is not thread-safe. \
175
0
                    You can only use it from the thread you created it on. This problem will be upgraded to a forced \
176
0
                    crash in future versions of SwiftNIO.
177
0
178
0
                    """
179
0
                )
180
0
            }
181
0
            return
182
9.67M
        }
183
9.67M
    }
184
185
    /// Initialize a new `EmbeddedEventLoop`.
186
27.7k
    public init() {}
187
188
    /// - see: `EventLoop.scheduleTask(deadline:_:)`
189
    @discardableResult
190
13.8k
    public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
191
13.8k
        self.checkCorrectThread()
192
13.8k
        let promise: EventLoopPromise<T> = makePromise()
193
13.8k
194
13.8k
        switch self.state {
195
13.8k
        case .open:
196
13.8k
            break
197
13.8k
        case .closing, .closed:
198
0
            // If the event loop is shut down, or shutting down, immediately cancel the task.
199
0
            promise.fail(EventLoopError.cancelled)
200
0
            return Scheduled(promise: promise, cancellationTask: {})
201
13.8k
        }
202
13.8k
203
13.8k
        self.scheduledTaskCounter += 1
204
13.8k
        let task = EmbeddedScheduledTask(
205
13.8k
            id: self.scheduledTaskCounter,
206
13.8k
            readyTime: deadline,
207
13.8k
            insertOrder: self.nextTaskNumber(),
208
13.8k
            task: {
209
13.8k
                do {
210
13.8k
                    promise.assumeIsolated().succeed(try task())
211
13.8k
                } catch let err {
212
0
                    promise.fail(err)
213
13.8k
                }
214
13.8k
            },
215
13.8k
            promise.fail
$s11NIOEmbedded17EmbeddedEventLoopC12scheduleTask8deadline_7NIOCore9ScheduledVyxGAF11NIODeadlineV_xyKctlFys5Error_pcAF0cD7PromiseVyxGcfu_
Line
Count
Source
215
13.8k
            promise.fail
Unexecuted instantiation: $s11NIOEmbedded17EmbeddedEventLoopC12scheduleTask8deadline_7NIOCore9ScheduledVyxGAF11NIODeadlineV_xyKctlFys5Error_pcAF0cD7PromiseVyxGcfu_ysAL_pcfu0_
216
13.8k
        )
217
13.8k
218
13.8k
        let taskId = task.id
219
13.8k
        let scheduled = Scheduled(
220
13.8k
            promise: promise,
221
13.8k
            cancellationTask: {
222
0
                self.scheduledTasks.removeFirst { $0.id == taskId }
223
0
            }
224
13.8k
        )
225
13.8k
        scheduledTasks.push(task)
226
13.8k
        return scheduled
227
13.8k
    }
228
229
    /// - see: `EventLoop.scheduleTask(in:_:)`
230
    @discardableResult
231
0
    public func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> {
232
0
        self.checkCorrectThread()
233
0
        return self.scheduleTask(deadline: self._now + `in`, task)
234
0
    }
235
236
    @preconcurrency
237
    @discardableResult
238
    public func scheduleCallback(
239
        in amount: TimeAmount,
240
        handler: some (NIOScheduledCallbackHandler & Sendable)
241
0
    ) -> NIOScheduledCallback {
242
0
        self.checkCorrectThread()
243
0
        /// Even though this type does not implement a custom `scheduleCallback(at:handler)`, it uses a manual clock so
244
0
        /// it cannot rely on the default implementation of `scheduleCallback(in:handler:)`, which computes the deadline
245
0
        /// as an offset from `NIODeadline.now`. This event loop needs the deadline to be offset from `self._now`.
246
0
        return self.scheduleCallback(at: self._now + amount, handler: handler)
247
0
    }
248
249
    /// On an `EmbeddedEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. This means that
250
    /// `task` will be run the next time you call `EmbeddedEventLoop.run`.
251
13.8k
    public func execute(_ task: @escaping () -> Void) {
252
13.8k
        self.checkCorrectThread()
253
13.8k
        self.scheduleTask(deadline: self._now, task)
254
13.8k
    }
255
256
    /// Run all tasks that have previously been submitted to this `EmbeddedEventLoop`, either by calling `execute` or
257
    /// events that have been enqueued using `scheduleTask`/`scheduleRepeatedTask`/`scheduleRepeatedAsyncTask` and whose
258
    /// deadlines have expired.
259
    ///
260
    /// - seealso: `EmbeddedEventLoop.advanceTime`.
261
23.4k
    public func run() {
262
23.4k
        self.checkCorrectThread()
263
23.4k
        // Execute all tasks that are currently enqueued to be executed *now*.
264
23.4k
        self.advanceTime(to: self._now)
265
23.4k
    }
266
267
    /// Runs the event loop and moves "time" forward by the given amount, running any scheduled
268
    /// tasks that need to be run.
269
0
    public func advanceTime(by increment: TimeAmount) {
270
0
        self.checkCorrectThread()
271
0
        self.advanceTime(to: self._now + increment)
272
0
    }
273
274
    /// Runs the event loop and moves "time" forward to the given point in time, running any scheduled
275
    /// tasks that need to be run.
276
    ///
277
    /// - Note: If `deadline` is before the current time, the current time will not be advanced.
278
23.4k
    public func advanceTime(to deadline: NIODeadline) {
279
23.4k
        self.checkCorrectThread()
280
23.4k
        let newTime = max(deadline, self._now)
281
23.4k
282
23.4k
        while let nextTask = self.scheduledTasks.peek() {
283
13.8k
            guard nextTask.readyTime <= newTime else {
284
0
                break
285
13.8k
            }
286
13.8k
287
13.8k
            // Now we want to grab all tasks that are ready to execute at the same
288
13.8k
            // time as the first.
289
13.8k
            var tasks = [EmbeddedScheduledTask]()
290
13.8k
            while let candidateTask = self.scheduledTasks.peek(), candidateTask.readyTime == nextTask.readyTime {
291
13.8k
                tasks.append(candidateTask)
292
13.8k
                self.scheduledTasks.pop()
293
13.8k
            }
294
13.8k
295
13.8k
            // Set the time correctly before we call into user code, then
296
13.8k
            // call in for all tasks.
297
13.8k
            self._now = nextTask.readyTime
298
13.8k
299
13.8k
            for task in tasks {
300
13.8k
                task.task()
301
13.8k
            }
302
23.4k
        }
303
23.4k
304
23.4k
        // Finally ensure we got the time right.
305
23.4k
        self._now = newTime
306
23.4k
    }
307
308
6.92k
    internal func cancelRemainingScheduledTasks() {
309
6.92k
        self.checkCorrectThread()
310
6.92k
        while let task = self.scheduledTasks.pop() {
311
0
            task.fail(EventLoopError.cancelled)
312
6.92k
        }
313
6.92k
    }
314
315
    #if canImport(Dispatch)
316
    /// - see: `EventLoop.shutdownGracefully`
317
0
    public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
318
0
        self.checkCorrectThread()
319
0
        self.state = .closing
320
0
        run()
321
0
        cancelRemainingScheduledTasks()
322
0
        self.state = .closed
323
0
        queue.sync {
324
0
            callback(nil)
325
0
        }
326
0
    }
327
    #endif
328
329
1.78M
    public func preconditionInEventLoop(file: StaticString, line: UInt) {
330
1.78M
        self.checkCorrectThread()
331
1.78M
        // Currently, inEventLoop is always true so this always passes.
332
1.78M
    }
333
334
0
    public func preconditionNotInEventLoop(file: StaticString, line: UInt) {
335
0
        // As inEventLoop always returns true, this must always precondition.
336
0
        preconditionFailure("Always in EmbeddedEventLoop", file: file, line: line)
337
0
    }
338
339
41.6k
    public func _preconditionSafeToWait(file: StaticString, line: UInt) {
340
41.6k
        self.checkCorrectThread()
341
41.6k
        // EmbeddedEventLoop always allows a wait, as waiting will essentially always block
342
41.6k
        // wait()
343
41.6k
        return
344
41.6k
    }
345
346
27.6k
    public func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) {
347
27.6k
        self.checkCorrectThread()
348
27.6k
        precondition(_isDebugAssertConfiguration())
349
27.6k
        self._promiseCreationStore[futureIdentifier] = (file: file, line: line)
350
27.6k
    }
351
352
    public func _promiseCompleted(futureIdentifier: _NIOEventLoopFutureIdentifier) -> (file: StaticString, line: UInt)?
353
34.6k
    {
354
34.6k
        self.checkCorrectThread()
355
34.6k
        precondition(_isDebugAssertConfiguration())
356
34.6k
        return self._promiseCreationStore.removeValue(forKey: futureIdentifier)
357
34.6k
    }
358
359
0
    public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) {
360
0
        self.checkCorrectThread()
361
0
        // EmbeddedEventLoop always allows a sync shutdown.
362
0
        return
363
0
    }
364
365
0
    public func _executeIsolatedUnsafeUnchecked(_ task: @escaping () -> Void) {
366
0
        // Because of the way EmbeddedEL works, we can just delegate this directly
367
0
        // to execute.
368
0
        self.execute(task)
369
0
    }
370
371
0
    public func _submitIsolatedUnsafeUnchecked<T>(_ task: @escaping () throws -> T) -> EventLoopFuture<T> {
372
0
        // Because of the way EmbeddedEL works, we can delegate this directly to schedule. Note that I didn't
373
0
        // say submit: we don't have an override of submit here.
374
0
        self.scheduleTask(deadline: self._now, task).futureResult
375
0
    }
376
377
    @discardableResult
378
    public func _scheduleTaskIsolatedUnsafeUnchecked<T>(
379
        deadline: NIODeadline,
380
        _ task: @escaping () throws -> T
381
0
    ) -> Scheduled<T> {
382
0
        // Because of the way EmbeddedEL works, we can delegate this directly to schedule.
383
0
        self.scheduleTask(deadline: deadline, task)
384
0
    }
385
386
    @discardableResult
387
    public func _scheduleTaskIsolatedUnsafeUnchecked<T>(
388
        in delay: TimeAmount,
389
        _ task: @escaping () throws -> T
390
0
    ) -> Scheduled<T> {
391
0
        // Because of the way EmbeddedEL works, we can delegate this directly to schedule.
392
0
        self.scheduleTask(in: delay, task)
393
0
    }
394
395
27.7k
    deinit {
396
27.7k
        self.checkCorrectThread()
397
27.7k
        precondition(scheduledTasks.isEmpty, "Embedded event loop freed with unexecuted scheduled tasks!")
398
27.7k
    }
399
400
    @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
401
0
    public var executor: any SerialExecutor {
402
0
        fatalError(
403
0
            "EmbeddedEventLoop is not thread safe and cannot be used as a SerialExecutor. Use NIOAsyncTestingEventLoop instead."
404
0
        )
405
0
    }
406
407
0
    static let strictModeEnabled: Bool = {
408
0
        for ciVar in ["SWIFTNIO_STRICT", "SWIFTNIO_CI", "SWIFTNIO_STRICT_EMBEDDED"] {
409
0
            #if os(Windows)
410
0
            let env = Windows.getenv("SWIFTNIO_STRICT")
411
0
            #else
412
0
            let env = getenv("SWIFTNIO_STRICT").flatMap { String(cString: $0) }
413
0
            #endif
414
0
            switch env?.lowercased() {
415
0
            case "true", "y", "yes", "on", "1":
416
0
                return true
417
0
            default:
418
0
                ()
419
0
            }
420
0
        }
421
0
        return false
422
0
    }()
423
}
424
425
// EmbeddedEventLoop is extremely _not_ Sendable. However, the EventLoop protocol
426
// requires it to be. We are doing some runtime enforcement of correct use, but
427
// ultimately we can't have the compiler validating this usage.
428
extension EmbeddedEventLoop: @unchecked Sendable {}
429
430
@usableFromInline
431
class EmbeddedChannelCore: ChannelCore {
432
    var isOpen: Bool {
433
13.8k
        get {
434
13.8k
            self._isOpen.load(ordering: .sequentiallyConsistent)
435
13.8k
        }
436
13.8k
        set {
437
13.8k
            self._isOpen.store(newValue, ordering: .sequentiallyConsistent)
438
13.8k
        }
439
    }
440
441
    var isActive: Bool {
442
6.92k
        get {
443
6.92k
            self._isActive.load(ordering: .sequentiallyConsistent)
444
6.92k
        }
445
6.92k
        set {
446
6.92k
            self._isActive.store(newValue, ordering: .sequentiallyConsistent)
447
6.92k
        }
448
    }
449
450
    var allowRemoteHalfClosure: Bool {
451
0
        get {
452
0
            self._allowRemoteHalfClosure.load(ordering: .sequentiallyConsistent)
453
0
        }
454
0
        set {
455
0
            self._allowRemoteHalfClosure.store(newValue, ordering: .sequentiallyConsistent)
456
0
        }
457
    }
458
459
13.8k
    private let _isOpen = ManagedAtomic(true)
460
13.8k
    private let _isActive = ManagedAtomic(false)
461
13.8k
    private let _allowRemoteHalfClosure = ManagedAtomic(false)
462
463
    let eventLoop: EventLoop
464
    let closePromise: EventLoopPromise<Void>
465
    var error: Optional<Error>
466
467
    private let pipeline: ChannelPipeline
468
469
6.92k
    init(pipeline: ChannelPipeline, eventLoop: EventLoop) {
470
6.92k
        closePromise = eventLoop.makePromise()
471
6.92k
        self.pipeline = pipeline
472
6.92k
        self.eventLoop = eventLoop
473
6.92k
        self.error = nil
474
6.92k
    }
475
476
13.8k
    deinit {
477
13.8k
        assert(
478
13.8k
            !self.isOpen && !self.isActive,
479
13.8k
            "leaked an open EmbeddedChannel, maybe forgot to call channel.finish()?"
480
13.8k
        )
481
13.8k
        isOpen = false
482
13.8k
        closePromise.succeed(())
483
13.8k
    }
484
485
    /// Contains the flushed items that went into the `Channel` (and on a regular channel would have hit the network).
486
    @usableFromInline
487
13.8k
    var outboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
488
489
    /// Contains observers that want to consume the first element that would be appended to the `outboundBuffer`
490
    @usableFromInline
491
13.8k
    var outboundBufferConsumer: Deque<(NIOAny) -> Void> = []
492
493
    /// Contains the unflushed items that went into the `Channel`
494
    @usableFromInline
495
13.8k
    var pendingOutboundBuffer: MarkedCircularBuffer<(NIOAny, EventLoopPromise<Void>?)> = MarkedCircularBuffer(
496
13.8k
        initialCapacity: 16
497
13.8k
    )
498
499
    /// Contains the items that travelled the `ChannelPipeline` all the way and hit the tail channel handler. On a
500
    /// regular `Channel` these items would be lost.
501
    @usableFromInline
502
13.8k
    var inboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
503
504
    /// Contains observers that want to consume the first element that would be appended to the `inboundBuffer`
505
    @usableFromInline
506
13.8k
    var inboundBufferConsumer: Deque<(NIOAny) -> Void> = []
507
508
    @usableFromInline
509
    var localAddress: SocketAddress?
510
511
    @usableFromInline
512
    var remoteAddress: SocketAddress?
513
514
    @usableFromInline
515
0
    func localAddress0() throws -> SocketAddress {
516
0
        self.eventLoop.preconditionInEventLoop()
517
0
        if let localAddress = self.localAddress {
518
0
            return localAddress
519
0
        } else {
520
0
            throw ChannelError.operationUnsupported
521
0
        }
522
0
    }
523
524
    @usableFromInline
525
0
    func remoteAddress0() throws -> SocketAddress {
526
0
        self.eventLoop.preconditionInEventLoop()
527
0
        if let remoteAddress = self.remoteAddress {
528
0
            return remoteAddress
529
0
        } else {
530
0
            throw ChannelError.operationUnsupported
531
0
        }
532
0
    }
533
534
    @usableFromInline
535
13.8k
    func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
536
13.8k
        self.eventLoop.preconditionInEventLoop()
537
13.8k
        guard self.isOpen else {
538
0
            promise?.fail(ChannelError.alreadyClosed)
539
0
            return
540
13.8k
        }
541
13.8k
        isOpen = false
542
13.8k
        isActive = false
543
13.8k
        promise?.succeed(())
544
13.8k
545
13.8k
        // As we called register() in the constructor of EmbeddedChannel we also need to ensure we call unregistered here.
546
13.8k
        self.pipeline.syncOperations.fireChannelInactive()
547
13.8k
        self.pipeline.syncOperations.fireChannelUnregistered()
548
13.8k
549
13.8k
        let loopBoundSelf = NIOLoopBound(self, eventLoop: self.eventLoop)
550
13.8k
551
13.8k
        eventLoop.execute {
552
13.8k
            // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline
553
13.8k
            let `self` = loopBoundSelf.value
554
13.8k
            self.removeHandlers(pipeline: self.pipeline)
555
13.8k
            self.closePromise.succeed(())
556
13.8k
        }
557
13.8k
    }
558
559
    @usableFromInline
560
0
    func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
561
0
        self.eventLoop.preconditionInEventLoop()
562
0
        promise?.succeed(())
563
0
    }
564
565
    @usableFromInline
566
0
    func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
567
0
        self.eventLoop.preconditionInEventLoop()
568
0
        isActive = true
569
0
        promise?.succeed(())
570
0
        self.pipeline.syncOperations.fireChannelActive()
571
0
    }
572
573
    @usableFromInline
574
13.8k
    func register0(promise: EventLoopPromise<Void>?) {
575
13.8k
        self.eventLoop.preconditionInEventLoop()
576
13.8k
        promise?.succeed(())
577
13.8k
        self.pipeline.syncOperations.fireChannelRegistered()
578
13.8k
    }
579
580
    @usableFromInline
581
0
    func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
582
0
        self.eventLoop.preconditionInEventLoop()
583
0
        isActive = true
584
0
        register0(promise: promise)
585
0
        self.pipeline.syncOperations.fireChannelActive()
586
0
    }
587
588
    @usableFromInline
589
0
    func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
590
0
        self.eventLoop.preconditionInEventLoop()
591
0
        self.pendingOutboundBuffer.append((data, promise))
592
0
    }
593
594
    @usableFromInline
595
0
    func flush0() {
596
0
        self.eventLoop.preconditionInEventLoop()
597
0
        self.pendingOutboundBuffer.mark()
598
0
599
0
        while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() {
600
0
            self.addToBuffer(
601
0
                buffer: &self.outboundBuffer,
602
0
                consumer: &self.outboundBufferConsumer,
603
0
                data: dataAndPromise.0
604
0
            )
605
0
            dataAndPromise.1?.succeed(())
606
0
        }
607
0
    }
608
609
    @usableFromInline
610
0
    func read0() {
611
0
        self.eventLoop.preconditionInEventLoop()
612
0
        // NOOP
613
0
    }
614
615
0
    public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
616
0
        self.eventLoop.preconditionInEventLoop()
617
0
        promise?.fail(ChannelError.operationUnsupported)
618
0
    }
619
620
    @usableFromInline
621
2.40M
    func channelRead0(_ data: NIOAny) {
622
2.40M
        self.eventLoop.preconditionInEventLoop()
623
2.40M
        self.addToBuffer(
624
2.40M
            buffer: &self.inboundBuffer,
625
2.40M
            consumer: &self.inboundBufferConsumer,
626
2.40M
            data: data
627
2.40M
        )
628
2.40M
    }
629
630
13.2k
    public func errorCaught0(error: Error) {
631
13.2k
        self.eventLoop.preconditionInEventLoop()
632
13.2k
        if self.error == nil {
633
12.9k
            self.error = error
634
13.2k
        }
635
13.2k
    }
636
637
    private func addToBuffer(
638
        buffer: inout CircularBuffer<NIOAny>,
639
        consumer: inout Deque<(NIOAny) -> Void>,
640
        data: NIOAny
641
2.40M
    ) {
642
2.40M
        self.eventLoop.preconditionInEventLoop()
643
2.40M
        if let consume = consumer.popFirst() {
644
0
            consume(data)
645
2.40M
        } else {
646
2.40M
            buffer.append(data)
647
2.40M
        }
648
2.40M
    }
649
}
650
651
// ChannelCores are basically never Sendable.
652
@available(*, unavailable)
653
extension EmbeddedChannelCore: Sendable {}
654
655
/// `EmbeddedChannel` is a `Channel` implementation that does neither any
656
/// actual IO nor has a proper eventing mechanism. The prime use-case for
657
/// `EmbeddedChannel` is in unit tests when you want to feed the inbound events
658
/// and check the outbound events manually.
659
///
660
/// Please remember to call `finish()` when you are no longer using this
661
/// `EmbeddedChannel`.
662
///
663
/// To feed events through an `EmbeddedChannel`'s `ChannelPipeline` use
664
/// `EmbeddedChannel.writeInbound` which accepts data of any type. It will then
665
/// forward that data through the `ChannelPipeline` and the subsequent
666
/// `ChannelInboundHandler` will receive it through the usual `channelRead`
667
/// event. The user is responsible for making sure the first
668
/// `ChannelInboundHandler` expects data of that type.
669
///
670
/// `EmbeddedChannel` automatically collects arriving outbound data and makes it
671
/// available one-by-one through `readOutbound`.
672
///
673
/// - Note: `EmbeddedChannel` is currently only compatible with
674
///   `EmbeddedEventLoop`s and cannot be used with `SelectableEventLoop`s from
675
///   for example `MultiThreadedEventLoopGroup`.
676
/// - warning: Unlike other `Channel`s, `EmbeddedChannel` **is not thread-safe**. This
677
///     is because it is intended to be run in the thread that instantiated it. Users are
678
///     responsible for ensuring they never call into an `EmbeddedChannel` in an
679
///     unsynchronized fashion. `EmbeddedEventLoop`s notes also apply as
680
///     `EmbeddedChannel` uses an `EmbeddedEventLoop` as its `EventLoop`.
681
public final class EmbeddedChannel: Channel {
682
    /// `LeftOverState` represents any left-over inbound, outbound, and pending outbound events that hit the
683
    /// `EmbeddedChannel` and were not consumed when `finish` was called on the `EmbeddedChannel`.
684
    ///
685
    /// `EmbeddedChannel` is most useful in testing and usually in unit tests, you want to consume all inbound and
686
    /// outbound data to verify they are what you expect. Therefore, when you `finish` an `EmbeddedChannel` it will
687
    /// return if it's either `.clean` (no left overs) or that it has `.leftOvers`.
688
    public enum LeftOverState {
689
        /// The `EmbeddedChannel` is clean, ie. no inbound, outbound, or pending outbound data left on `finish`.
690
        case clean
691
692
        /// The `EmbeddedChannel` has inbound, outbound, or pending outbound data left on `finish`.
693
        case leftOvers(inbound: [NIOAny], outbound: [NIOAny], pendingOutbound: [NIOAny])
694
695
        /// `true` if the `EmbeddedChannel` was `clean` on `finish`, ie. there is no unconsumed inbound, outbound, or
696
        /// pending outbound data left on the `Channel`.
697
0
        public var isClean: Bool {
698
0
            if case .clean = self {
699
0
                return true
700
0
            } else {
701
0
                return false
702
0
            }
703
0
        }
704
705
        /// `true` if the `EmbeddedChannel` if there was unconsumed inbound, outbound, or pending outbound data left
706
        /// on the `Channel` when it was `finish`ed.
707
0
        public var hasLeftOvers: Bool {
708
0
            !self.isClean
709
0
        }
710
    }
711
712
    /// `BufferState` represents the state of either the inbound, or the outbound `EmbeddedChannel` buffer. These
713
    /// buffers contain data that travelled the `ChannelPipeline` all the way.
714
    ///
715
    /// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing
716
    /// `channelRead`) sends inbound data into the end of the `EmbeddedChannel`, it will be held in the
717
    /// `EmbeddedChannel`'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective
718
    /// buffer will be returned from `writeInbound`/`writeOutbound` as a `BufferState`.
719
    public enum BufferState {
720
        /// The buffer is empty.
721
        case empty
722
723
        /// The buffer is non-empty.
724
        case full([NIOAny])
725
726
        /// Returns `true` is the buffer was empty.
727
0
        public var isEmpty: Bool {
728
0
            if case .empty = self {
729
0
                return true
730
0
            } else {
731
0
                return false
732
0
            }
733
0
        }
734
735
        /// Returns `true` if the buffer was non-empty.
736
0
        public var isFull: Bool {
737
0
            !self.isEmpty
738
0
        }
739
    }
740
741
    /// `WrongTypeError` is throws if you use `readInbound` or `readOutbound` and request a certain type but the first
742
    /// item in the respective buffer is of a different type.
743
    public struct WrongTypeError: Error, Equatable {
744
        /// The type you expected.
745
        public let expected: Any.Type
746
747
        /// The type of the actual first element.
748
        public let actual: Any.Type
749
750
0
        public init(expected: Any.Type, actual: Any.Type) {
751
0
            self.expected = expected
752
0
            self.actual = actual
753
0
        }
754
755
0
        public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool {
756
0
            lhs.expected == rhs.expected && lhs.actual == rhs.actual
757
0
        }
758
    }
759
760
    /// Returns `true` if the `EmbeddedChannel` is 'active'.
761
    ///
762
    /// An active `EmbeddedChannel` can be closed by calling `close` or `finish` on the `EmbeddedChannel`.
763
    ///
764
    /// - Note: An `EmbeddedChannel` starts _inactive_ and can be activated, for example by calling `connect`.
765
0
    public var isActive: Bool { channelcore.isActive }
766
767
    /// - see: `ChannelOptions.Types.AllowRemoteHalfClosureOption`
768
    public var allowRemoteHalfClosure: Bool {
769
0
        get {
770
0
            self.embeddedEventLoop.checkCorrectThread()
771
0
            return channelcore.allowRemoteHalfClosure
772
0
        }
773
0
        set {
774
0
            self.embeddedEventLoop.checkCorrectThread()
775
0
            channelcore.allowRemoteHalfClosure = newValue
776
0
        }
777
    }
778
779
    /// - see: `Channel.closeFuture`
780
0
    public var closeFuture: EventLoopFuture<Void> { channelcore.closePromise.futureResult }
781
782
    @usableFromInline
783
13.8k
    lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(
784
13.8k
        pipeline: self._pipeline,
785
13.8k
        eventLoop: self.eventLoop
786
13.8k
    )
787
788
    /// - see: `Channel._channelCore`
789
2.44M
    public var _channelCore: ChannelCore {
790
2.44M
        self.embeddedEventLoop.checkCorrectThread()
791
2.44M
        return self.channelcore
792
2.44M
    }
793
794
    /// - see: `Channel.pipeline`
795
69.4k
    public var pipeline: ChannelPipeline {
796
69.4k
        self.embeddedEventLoop.checkCorrectThread()
797
69.4k
        return self._pipeline
798
69.4k
    }
799
800
    /// - see: `Channel.isWritable`
801
34.7k
    public var isWritable: Bool = true
802
803
    @usableFromInline
804
34.7k
    internal var _options: [(option: any ChannelOption, value: any Sendable)] = []
805
806
    /// The `ChannelOption`s set on this channel.
807
    /// - see: `Embedded.setOption`
808
0
    public var options: [(option: any ChannelOption, value: any Sendable)] { self._options }
809
810
    /// Synchronously closes the `EmbeddedChannel`.
811
    ///
812
    /// Errors in the `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
813
    ///
814
    /// - Parameters:
815
    ///   - acceptAlreadyClosed: Whether `finish` should throw if the `EmbeddedChannel` has been previously `close`d.
816
    /// - Returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
817
    ///            consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
818
    ///            writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
819
    ///            events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
820
13.8k
    public func finish(acceptAlreadyClosed: Bool) throws -> LeftOverState {
821
13.8k
        self.embeddedEventLoop.checkCorrectThread()
822
13.8k
        do {
823
13.8k
            try close().wait()
824
13.8k
        } catch let error as ChannelError {
825
0
            guard error == .alreadyClosed && acceptAlreadyClosed else {
826
0
                throw error
827
0
            }
828
13.8k
        }
829
13.8k
        self.embeddedEventLoop.run()
830
13.8k
        self.embeddedEventLoop.cancelRemainingScheduledTasks()
831
13.8k
        try throwIfErrorCaught()
832
13.8k
        let c = self.channelcore
833
13.8k
        if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty {
834
2.78k
            return .clean
835
11.1k
        } else {
836
11.1k
            return .leftOvers(
837
11.1k
                inbound: Array(c.inboundBuffer),
838
11.1k
                outbound: Array(c.outboundBuffer),
839
11.1k
                pendingOutbound: c.pendingOutboundBuffer.map { $0.0 }
840
11.1k
            )
841
11.1k
        }
842
11.1k
    }
843
844
    /// Synchronously closes the `EmbeddedChannel`.
845
    ///
846
    /// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the
847
    /// `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
848
    ///
849
    /// - Returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
850
    ///            consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
851
    ///            writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
852
    ///            events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
853
0
    public func finish() throws -> LeftOverState {
854
0
        self.embeddedEventLoop.checkCorrectThread()
855
0
        return try self.finish(acceptAlreadyClosed: false)
856
0
    }
857
858
    private var _pipeline: ChannelPipeline!
859
860
    /// - see: `Channel.allocator`
861
34.7k
    public var allocator: ByteBufferAllocator = ByteBufferAllocator()
862
863
    /// - see: `Channel.eventLoop`
864
55.5k
    public var eventLoop: EventLoop {
865
55.5k
        self.embeddedEventLoop.checkCorrectThread()
866
55.5k
        return self.embeddedEventLoop
867
55.5k
    }
868
869
    /// Returns the `EmbeddedEventLoop` that this `EmbeddedChannel` uses. This will return the same instance as
870
    /// `EmbeddedChannel.eventLoop` but as the concrete `EmbeddedEventLoop` rather than as `EventLoop` existential.
871
34.7k
    public var embeddedEventLoop: EmbeddedEventLoop = EmbeddedEventLoop()
872
873
    /// - see: `Channel.localAddress`
874
    public var localAddress: SocketAddress? {
875
0
        get {
876
0
            self.embeddedEventLoop.checkCorrectThread()
877
0
            return self.channelcore.localAddress
878
0
        }
879
0
        set {
880
0
            self.embeddedEventLoop.checkCorrectThread()
881
0
            self.channelcore.localAddress = newValue
882
0
        }
883
    }
884
885
    /// - see: `Channel.remoteAddress`
886
    public var remoteAddress: SocketAddress? {
887
0
        get {
888
0
            self.embeddedEventLoop.checkCorrectThread()
889
0
            return self.channelcore.remoteAddress
890
0
        }
891
0
        set {
892
0
            self.embeddedEventLoop.checkCorrectThread()
893
0
            self.channelcore.remoteAddress = newValue
894
0
        }
895
    }
896
897
    /// `nil` because `EmbeddedChannel`s don't have parents.
898
34.7k
    public let parent: Channel? = nil
899
900
    /// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s outbound buffer. If the
901
    /// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
902
    /// are no elements in the outbound buffer, `nil` will be returned.
903
    ///
904
    /// Data hits the `EmbeddedChannel`'s outbound buffer when data was written using `write`, then `flush`ed, and
905
    /// then travelled the `ChannelPipeline` all the way too the front. For data to hit the outbound buffer, the very
906
    /// first `ChannelHandler` must have written and flushed it either explicitly (by calling
907
    /// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
908
    ///
909
    /// - Note: Outbound events travel the `ChannelPipeline` _back to front_.
910
    /// - Note: `EmbeddedChannel.writeOutbound` will `write` data through the `ChannelPipeline`, starting with last
911
    ///         `ChannelHandler`.
912
    @inlinable
913
0
    public func readOutbound<T>(as type: T.Type = T.self) throws -> T? {
914
0
        self.embeddedEventLoop.checkCorrectThread()
915
0
        return try _readFromBuffer(buffer: &channelcore.outboundBuffer)
916
0
    }
917
918
    /// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s inbound buffer. If the
919
    /// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
920
    /// are no elements in the outbound buffer, `nil` will be returned.
921
    ///
922
    /// Data hits the `EmbeddedChannel`'s inbound buffer when data was send through the pipeline using `fireChannelRead`
923
    /// and then travelled the `ChannelPipeline` all the way too the back. For data to hit the inbound buffer, the
924
    /// last `ChannelHandler` must have send the event either explicitly (by calling
925
    /// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
926
    ///
927
    /// - Note: `EmbeddedChannel.writeInbound` will fire data through the `ChannelPipeline` using `fireChannelRead`.
928
    @inlinable
929
0
    public func readInbound<T>(as type: T.Type = T.self) throws -> T? {
930
0
        self.embeddedEventLoop.checkCorrectThread()
931
0
        return try _readFromBuffer(buffer: &channelcore.inboundBuffer)
932
0
    }
933
934
    /// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
935
    ///
936
    /// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called
937
    /// with the data you provide.
938
    ///
939
    /// - Parameters:
940
    ///    - data: The data to fire through the pipeline.
941
    /// - Returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline`
942
    //             all the way.
943
    @inlinable
944
34.7k
    @discardableResult public func writeInbound<T>(_ data: T) throws -> BufferState {
945
34.7k
        self.embeddedEventLoop.checkCorrectThread()
946
34.7k
        self.pipeline.syncOperations.fireChannelRead(NIOAny(data))
947
34.7k
        self.pipeline.syncOperations.fireChannelReadComplete()
948
34.7k
        try self.throwIfErrorCaught()
949
34.7k
        return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer))
950
34.7k
    }
951
952
    /// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`.
953
    ///
954
    /// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called
955
    /// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler
956
    /// because outbound events travel the pipeline from back to front.
957
    ///
958
    /// - Parameters:
959
    ///    - data: The data to fire through the pipeline.
960
    /// - Returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline`
961
    //             all the way.
962
    @inlinable
963
0
    @discardableResult public func writeOutbound<T>(_ data: T) throws -> BufferState {
964
0
        self.embeddedEventLoop.checkCorrectThread()
965
0
        try self.writeAndFlush(data).wait()
966
0
        return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.outboundBuffer))
967
0
    }
968
969
    /// This method will throw the error that is stored in the `EmbeddedChannel` if any.
970
    ///
971
    /// The `EmbeddedChannel` will store an error some error travels the `ChannelPipeline` all the way past its end.
972
27.7k
    public func throwIfErrorCaught() throws {
973
27.7k
        self.embeddedEventLoop.checkCorrectThread()
974
27.7k
        if let error = channelcore.error {
975
12.9k
            self.channelcore.error = nil
976
12.9k
            throw error
977
14.8k
        }
978
14.8k
    }
979
980
    @inlinable
981
0
    func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
982
0
        self.embeddedEventLoop.checkCorrectThread()
983
0
        if buffer.isEmpty {
984
0
            return nil
985
0
        }
986
0
        let elem = buffer.removeFirst()
987
0
        guard let t = self._channelCore.tryUnwrapData(elem, as: T.self) else {
988
0
            throw WrongTypeError(
989
0
                expected: T.self,
990
0
                actual: type(of: self._channelCore.tryUnwrapData(elem, as: Any.self)!)
991
0
            )
992
0
        }
993
0
        return t
994
0
    }
995
996
    /// Create a new instance.
997
    ///
998
    /// During creation it will automatically also register itself on the `EmbeddedEventLoop`.
999
    ///
1000
    /// - Parameters:
1001
    ///   - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register or `nil` if none should be added.
1002
    ///   - loop: The `EmbeddedEventLoop` to use.
1003
13.8k
    public convenience init(handler: ChannelHandler? = nil, loop: EmbeddedEventLoop = EmbeddedEventLoop()) {
1004
13.8k
        let handlers = handler.map { [$0] } ?? []
1005
13.8k
        self.init(handlers: handlers, loop: loop)
1006
13.8k
        self.embeddedEventLoop.checkCorrectThread()
1007
13.8k
    }
1008
1009
    /// Create a new instance.
1010
    ///
1011
    /// During creation it will automatically also register itself on the `EmbeddedEventLoop`.
1012
    ///
1013
    /// - Parameters:
1014
    ///   - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register.
1015
    ///   - loop: The `EmbeddedEventLoop` to use.
1016
13.8k
    public init(handlers: [ChannelHandler], loop: EmbeddedEventLoop = EmbeddedEventLoop()) {
1017
13.8k
        self.embeddedEventLoop = loop
1018
13.8k
        self._pipeline = ChannelPipeline(channel: self)
1019
13.8k
1020
13.8k
        try! self._pipeline.syncOperations.addHandlers(handlers)
1021
13.8k
1022
13.8k
        // This will never throw...
1023
13.8k
        try! register().wait()
1024
13.8k
        self.embeddedEventLoop.checkCorrectThread()
1025
13.8k
    }
1026
1027
    /// - see: `Channel.setOption`
1028
    @inlinable
1029
0
    public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
1030
0
        self.embeddedEventLoop.checkCorrectThread()
1031
0
        self.setOptionSync(option, value: value)
1032
0
        return self.eventLoop.makeSucceededVoidFuture()
1033
0
    }
1034
1035
    @inlinable
1036
0
    internal func setOptionSync<Option: ChannelOption>(_ option: Option, value: Option.Value) {
1037
0
        self.embeddedEventLoop.checkCorrectThread()
1038
0
1039
0
        self.addOption(option, value: value)
1040
0
1041
0
        if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
1042
0
            self.allowRemoteHalfClosure = value as! Bool
1043
0
            return
1044
0
        }
1045
0
    }
1046
1047
    /// - see: `Channel.getOption`
1048
    @inlinable
1049
0
    public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
1050
0
        self.embeddedEventLoop.checkCorrectThread()
1051
0
        return self.eventLoop.makeSucceededFuture(self.getOptionSync(option))
1052
0
    }
1053
1054
    @inlinable
1055
0
    internal func getOptionSync<Option: ChannelOption>(_ option: Option) -> Option.Value {
1056
0
        self.embeddedEventLoop.checkCorrectThread()
1057
0
        if option is ChannelOptions.Types.AutoReadOption {
1058
0
            return true as! Option.Value
1059
0
        }
1060
0
        if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
1061
0
            return self.allowRemoteHalfClosure as! Option.Value
1062
0
        }
1063
0
        if option is ChannelOptions.Types.BufferedWritableBytesOption {
1064
0
            let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in
1065
0
                let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self)
1066
0
                return partialResult + buffer.readableBytes
1067
0
            }
1068
0
1069
0
            return result as! Option.Value
1070
0
        }
1071
0
1072
0
        guard let value = optionValue(for: option) else {
1073
0
            fatalError("option \(option) not supported")
1074
0
        }
1075
0
1076
0
        return value
1077
0
    }
1078
1079
    @inlinable
1080
0
    internal func addOption<Option: ChannelOption>(_ option: Option, value: Option.Value) {
1081
0
        if let optionIndex = self._options.firstIndex(where: { $0.option is Option }) {
1082
0
            self._options[optionIndex] = (option: option, value: value)
1083
0
        } else {
1084
0
            self._options.append((option: option, value: value))
1085
0
        }
1086
0
    }
1087
1088
    @inlinable
1089
0
    internal func optionValue<Option: ChannelOption>(for option: Option) -> Option.Value? {
1090
0
        self.options.first(where: { $0.option is Option })?.value as? Option.Value
1091
0
    }
1092
1093
    /// Fires the (outbound) `bind` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel` which
1094
    /// happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
1095
    /// `EmbeddedChannel`'s `localAddress`.
1096
    ///
1097
    /// - Parameters:
1098
    ///   - address: The address to fake-bind to.
1099
    ///   - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
1100
0
    public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1101
0
        self.embeddedEventLoop.checkCorrectThread()
1102
0
        let promise = promise ?? self.embeddedEventLoop.makePromise()
1103
0
        promise.futureResult.whenSuccess {
1104
0
            self.localAddress = address
1105
0
        }
1106
0
        self.pipeline.bind(to: address, promise: promise)
1107
0
    }
1108
1109
    /// Fires the (outbound) `connect` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel`
1110
    /// which happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
1111
    /// `EmbeddedChannel`'s `remoteAddress`.
1112
    ///
1113
    /// - Parameters:
1114
    ///   - address: The address to fake-bind to.
1115
    ///   - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
1116
0
    public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1117
0
        self.embeddedEventLoop.checkCorrectThread()
1118
0
        let promise = promise ?? self.embeddedEventLoop.makePromise()
1119
0
        promise.futureResult.whenSuccess {
1120
0
            self.remoteAddress = address
1121
0
        }
1122
0
        self.pipeline.connect(to: address, promise: promise)
1123
0
    }
1124
1125
    /// An overload of `Channel.write` that does not require a Sendable type, as ``EmbeddedEventLoop``
1126
    /// is bound to a single thread.
1127
    @inlinable
1128
0
    public func write<T>(_ data: T, promise: EventLoopPromise<Void>?) {
1129
0
        self.embeddedEventLoop.checkCorrectThread()
1130
0
        self.pipeline.syncOperations.write(NIOAny(data), promise: promise)
1131
0
    }
1132
1133
    /// An overload of `Channel.write` that does not require a Sendable type, as ``EmbeddedEventLoop``
1134
    /// is bound to a single thread.
1135
    @inlinable
1136
0
    public func write<T>(_ data: T) -> EventLoopFuture<Void> {
1137
0
        self.embeddedEventLoop.checkCorrectThread()
1138
0
        let promise = self.eventLoop.makePromise(of: Void.self)
1139
0
        self.pipeline.syncOperations.write(NIOAny(data), promise: promise)
1140
0
        return promise.futureResult
1141
0
    }
1142
1143
    /// An overload of `Channel.writeAndFlush` that does not require a Sendable type, as ``EmbeddedEventLoop``
1144
    /// is bound to a single thread.
1145
    @inlinable
1146
0
    public func writeAndFlush<T>(_ data: T, promise: EventLoopPromise<Void>?) {
1147
0
        self.embeddedEventLoop.checkCorrectThread()
1148
0
        self.pipeline.syncOperations.writeAndFlush(NIOAny(data), promise: promise)
1149
0
    }
1150
1151
    /// An overload of `Channel.writeAndFlush` that does not require a Sendable type, as ``EmbeddedEventLoop``
1152
    /// is bound to a single thread.
1153
    @inlinable
1154
0
    public func writeAndFlush<T>(_ data: T) -> EventLoopFuture<Void> {
1155
0
        self.embeddedEventLoop.checkCorrectThread()
1156
0
        let promise = self.eventLoop.makePromise(of: Void.self)
1157
0
        self.pipeline.syncOperations.writeAndFlush(NIOAny(data), promise: promise)
1158
0
        return promise.futureResult
1159
0
    }
1160
}
1161
1162
extension EmbeddedChannel {
1163
    public struct SynchronousOptions: NIOSynchronousChannelOptions {
1164
        @usableFromInline
1165
        internal let channel: EmbeddedChannel
1166
1167
0
        fileprivate init(channel: EmbeddedChannel) {
1168
0
            self.channel = channel
1169
0
        }
1170
1171
        @inlinable
1172
0
        public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
1173
0
            self.channel.setOptionSync(option, value: value)
1174
0
        }
1175
1176
        @inlinable
1177
0
        public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
1178
0
            self.channel.getOptionSync(option)
1179
0
        }
1180
    }
1181
1182
0
    public final var syncOptions: NIOSynchronousChannelOptions? {
1183
0
        SynchronousOptions(channel: self)
1184
0
    }
1185
}
1186
1187
// EmbeddedChannel is extremely _not_ Sendable. However, the Channel protocol
1188
// requires it to be. We are doing some runtime enforcement of correct use, but
1189
// ultimately we can't have the compiler validating this usage.
1190
extension EmbeddedChannel: @unchecked Sendable {}
1191
1192
@available(*, unavailable)
1193
extension EmbeddedChannel.LeftOverState: @unchecked Sendable {}
1194
1195
@available(*, unavailable)
1196
extension EmbeddedChannel.BufferState: @unchecked Sendable {}
1197
1198
@available(*, unavailable)
1199
extension EmbeddedChannel.SynchronousOptions: Sendable {}