Coverage Report

Created: 2026-02-11 06:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/swift-nio/Sources/NIOEmbedded/AsyncTestingChannel.swift
Line
Count
Source
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
15
#if canImport(Dispatch)
16
import NIOConcurrencyHelpers
17
import NIOCore
18
19
/// A `Channel` with fine-grained control for testing.
20
///
21
/// ``NIOAsyncTestingChannel`` is a `Channel` implementation that does no
22
/// actual IO but that does have proper eventing mechanism, albeit one that users can
23
/// control. The prime use-case for ``NIOAsyncTestingChannel`` is in unit tests when you
24
/// want to feed the inbound events and check the outbound events manually.
25
///
26
/// Please remember to call ``finish()`` when you are no longer using this
27
/// ``NIOAsyncTestingChannel``.
28
///
29
/// To feed events through an ``NIOAsyncTestingChannel``'s `ChannelPipeline` use
30
/// ``NIOAsyncTestingChannel/writeInbound(_:)`` which accepts data of any type. It will then
31
/// forward that data through the `ChannelPipeline` and the subsequent
32
/// `ChannelInboundHandler` will receive it through the usual `channelRead`
33
/// event. The user is responsible for making sure the first
34
/// `ChannelInboundHandler` expects data of that type.
35
///
36
/// Unlike in a regular `ChannelPipeline`, it is expected that the test code will act
37
/// as the "network layer", using ``readOutbound(as:)`` to observe the data that the
38
/// `Channel` has "written" to the network, and using ``writeInbound(_:)`` to simulate
39
/// receiving data from the network. There are also facilities to make it a bit easier
40
/// to handle the logic for `write` and `flush` (using ``writeOutbound(_:)``), and to
41
/// extract data that passed the whole way along the channel in `channelRead` (using
42
/// ``readOutbound(as:)``. Below is a diagram showing the layout of a `ChannelPipeline`
43
/// inside a ``NIOAsyncTestingChannel``, including the functions that can be used to
44
/// inject and extract data at each end.
45
///
46
/// ```
47
///
48
///            Extract data                         Inject data
49
///         using readInbound()                using writeOutbound()
50
///                  ▲                                   |
51
///  +---------------+-----------------------------------+---------------+
52
///  |               |           ChannelPipeline         |               |
53
///  |               |                TAIL               ▼               |
54
///  |    +---------------------+            +-----------+----------+    |
55
///  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
56
///  |    +----------+----------+            +-----------+----------+    |
57
///  |               ▲                                   |               |
58
///  |               |                                   ▼               |
59
///  |    +----------+----------+            +-----------+----------+    |
60
///  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
61
///  |    +----------+----------+            +-----------+----------+    |
62
///  |               ▲                                   .               |
63
///  |               .                                   .               |
64
///  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
65
///  |        [ method call]                       [method call]         |
66
///  |               .                                   .               |
67
///  |               .                                   ▼               |
68
///  |    +----------+----------+            +-----------+----------+    |
69
///  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
70
///  |    +----------+----------+            +-----------+----------+    |
71
///  |               ▲                                   |               |
72
///  |               |                                   ▼               |
73
///  |    +----------+----------+            +-----------+----------+    |
74
///  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
75
///  |    +----------+----------+            +-----------+----------+    |
76
///  |               ▲              HEAD                 |               |
77
///  +---------------+-----------------------------------+---------------+
78
///                  |                                   ▼
79
///             Inject data                         Extract data
80
///         using writeInbound()                using readOutbound()
81
/// ```
82
///
83
/// - Note: ``NIOAsyncTestingChannel`` is currently only compatible with
84
///   ``NIOAsyncTestingEventLoop``s and cannot be used with `SelectableEventLoop`s from
85
///   for example `MultiThreadedEventLoopGroup`.
86
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
87
public final class NIOAsyncTestingChannel: Channel {
88
    /// ``LeftOverState`` represents any left-over inbound, outbound, and pending outbound events that hit the
89
    /// ``NIOAsyncTestingChannel`` and were not consumed when ``finish()`` was called on the ``NIOAsyncTestingChannel``.
90
    ///
91
    /// ``NIOAsyncTestingChannel`` is most useful in testing and usually in unit tests, you want to consume all inbound and
92
    /// outbound data to verify they are what you expect. Therefore, when you ``finish()`` a ``NIOAsyncTestingChannel`` it will
93
    /// return if it's either ``LeftOverState/clean`` (no left overs) or that it has ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``.
94
    public enum LeftOverState {
95
        /// The ``NIOAsyncTestingChannel`` is clean, ie. no inbound, outbound, or pending outbound data left on ``NIOAsyncTestingChannel/finish()``.
96
        case clean
97
98
        /// The ``NIOAsyncTestingChannel`` has inbound, outbound, or pending outbound data left on ``NIOAsyncTestingChannel/finish()``.
99
        case leftOvers(inbound: CircularBuffer<NIOAny>, outbound: CircularBuffer<NIOAny>, pendingOutbound: [NIOAny])
100
101
        /// `true` if the ``NIOAsyncTestingChannel`` was `clean` on ``NIOAsyncTestingChannel/finish()``, ie. there is no unconsumed inbound, outbound, or
102
        /// pending outbound data left on the `Channel`.
103
0
        public var isClean: Bool {
104
0
            if case .clean = self {
105
0
                return true
106
0
            } else {
107
0
                return false
108
0
            }
109
0
        }
110
111
        /// `true` if the ``NIOAsyncTestingChannel`` if there was unconsumed inbound, outbound, or pending outbound data left
112
        /// on the `Channel` when it was `finish`ed.
113
0
        public var hasLeftOvers: Bool {
114
0
            !self.isClean
115
0
        }
116
    }
117
118
    /// ``BufferState`` represents the state of either the inbound, or the outbound ``NIOAsyncTestingChannel`` buffer.
119
    ///
120
    /// These buffers contain data that travelled the `ChannelPipeline` all the way through..
121
    ///
122
    /// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing
123
    /// `channelRead`) sends inbound data into the end of the ``NIOAsyncTestingChannel``, it will be held in the
124
    /// ``NIOAsyncTestingChannel``'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective
125
    /// buffer will be returned from ``writeInbound(_:)``/``writeOutbound(_:)`` as a ``BufferState``.
126
    public enum BufferState {
127
        /// The buffer is empty.
128
        case empty
129
130
        /// The buffer is non-empty.
131
        case full(CircularBuffer<NIOAny>)
132
133
        /// Returns `true` is the buffer was empty.
134
0
        public var isEmpty: Bool {
135
0
            if case .empty = self {
136
0
                return true
137
0
            } else {
138
0
                return false
139
0
            }
140
0
        }
141
142
        /// Returns `true` if the buffer was non-empty.
143
0
        public var isFull: Bool {
144
0
            !self.isEmpty
145
0
        }
146
    }
147
148
    /// ``WrongTypeError`` is thrown if you use ``readInbound(as:)`` or ``readOutbound(as:)`` and request a certain type but the first
149
    /// item in the respective buffer is of a different type.
150
    public struct WrongTypeError: Error, Equatable {
151
        /// The type you expected.
152
        public let expected: Any.Type
153
154
        /// The type of the actual first element.
155
        public let actual: Any.Type
156
157
0
        public init(expected: Any.Type, actual: Any.Type) {
158
0
            self.expected = expected
159
0
            self.actual = actual
160
0
        }
161
162
0
        public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool {
163
0
            lhs.expected == rhs.expected && lhs.actual == rhs.actual
164
0
        }
165
    }
166
167
    /// Returns `true` if the ``NIOAsyncTestingChannel`` is 'active'.
168
    ///
169
    /// An active ``NIOAsyncTestingChannel`` can be closed by calling `close` or ``finish()`` on the ``NIOAsyncTestingChannel``.
170
    ///
171
    /// - Note: An ``NIOAsyncTestingChannel`` starts _inactive_ and can be activated, for example by calling `connect`.
172
0
    public var isActive: Bool { channelcore.isActive }
173
174
    /// - see: `ChannelOptions.Types.AllowRemoteHalfClosureOption`
175
    public var allowRemoteHalfClosure: Bool {
176
0
        get {
177
0
            channelcore.allowRemoteHalfClosure
178
0
        }
179
0
        set {
180
0
            channelcore.allowRemoteHalfClosure = newValue
181
0
        }
182
    }
183
184
    /// - see: `Channel.closeFuture`
185
0
    public var closeFuture: EventLoopFuture<Void> { channelcore.closePromise.futureResult }
186
187
    /// - see: `Channel.allocator`
188
0
    public let allocator: ByteBufferAllocator = ByteBufferAllocator()
189
190
    /// - see: `Channel.eventLoop`
191
0
    public var eventLoop: EventLoop {
192
0
        self.testingEventLoop
193
0
    }
194
195
    /// Returns the ``NIOAsyncTestingEventLoop`` that this ``NIOAsyncTestingChannel`` uses. This will return the same instance as
196
    /// ``NIOAsyncTestingChannel/eventLoop`` but as the concrete ``NIOAsyncTestingEventLoop`` rather than as `EventLoop` existential.
197
    public let testingEventLoop: NIOAsyncTestingEventLoop
198
199
    /// `nil` because ``NIOAsyncTestingChannel``s don't have parents.
200
0
    public let parent: Channel? = nil
201
202
    // These two variables are only written once, from a single thread, and never written again, so they're _technically_ thread-safe. Most methods cannot safely
203
    // be used from multiple threads, but `isActive`, `isOpen`, `eventLoop`, and `closeFuture` can all safely be used from any thread. Just.
204
    // `EmbeddedChannelCore`'s localAddress and remoteAddress fields are protected by a lock so they are safe to access.
205
    @usableFromInline
206
    nonisolated(unsafe) var channelcore: EmbeddedChannelCore!
207
    nonisolated(unsafe) private var _pipeline: ChannelPipeline!
208
209
    @usableFromInline
210
    internal struct State: Sendable {
211
        var isWritable: Bool
212
213
        @usableFromInline
214
        var options: [(option: any ChannelOption, value: any Sendable)]
215
    }
216
217
    /// Guards any of the getters/setters that can be accessed from any thread.
218
    @usableFromInline
219
0
    internal let _stateLock = NIOLockedValueBox(
220
0
        State(isWritable: true, options: [])
221
0
    )
222
223
    /// - see: `Channel._channelCore`
224
0
    public var _channelCore: ChannelCore {
225
0
        channelcore
226
0
    }
227
228
    /// - see: `Channel.pipeline`
229
0
    public var pipeline: ChannelPipeline {
230
0
        _pipeline
231
0
    }
232
233
    /// - see: `Channel.isWritable`
234
    public var isWritable: Bool {
235
0
        get {
236
0
            self._stateLock.withLockedValue { $0.isWritable }
237
0
        }
238
0
        set {
239
0
            self._stateLock.withLockedValue {
240
0
                $0.isWritable = newValue
241
0
            }
242
0
        }
243
    }
244
245
    /// - see: `Channel.localAddress`
246
    public var localAddress: SocketAddress? {
247
0
        get {
248
0
            self.channelcore.localAddress
249
0
        }
250
0
        set {
251
0
            self.channelcore.localAddress = newValue
252
0
        }
253
    }
254
255
    /// - see: `Channel.remoteAddress`
256
    public var remoteAddress: SocketAddress? {
257
0
        get {
258
0
            self.channelcore.remoteAddress
259
0
        }
260
0
        set {
261
0
            self.channelcore.remoteAddress = newValue
262
0
        }
263
    }
264
265
    /// The `ChannelOption`s set on this channel.
266
    /// - see: `NIOAsyncTestingChannel.setOption`
267
0
    public var options: [(option: any ChannelOption, value: any Sendable)] {
268
0
        self._stateLock.withLockedValue { $0.options }
269
0
    }
270
    /// Create a new instance.
271
    ///
272
    /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``.
273
    ///
274
    /// - Parameters:
275
    ///   - loop: The ``NIOAsyncTestingEventLoop`` to use.
276
0
    public init(loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) {
277
0
        self.testingEventLoop = loop
278
0
        self._pipeline = ChannelPipeline(channel: self)
279
0
        self.channelcore = EmbeddedChannelCore(pipeline: self._pipeline, eventLoop: self.eventLoop)
280
0
    }
281
282
    /// Create a new instance.
283
    ///
284
    /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``.
285
    ///
286
    /// - Parameters:
287
    ///   - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register.
288
    ///   - loop: The ``NIOAsyncTestingEventLoop`` to use.
289
    @preconcurrency
290
    public convenience init(
291
        handler: ChannelHandler & Sendable,
292
        loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()
293
0
    ) async {
294
0
        await self.init(handlers: [handler], loop: loop)
295
0
    }
296
297
    /// Create a new instance.
298
    ///
299
    /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``.
300
    ///
301
    /// - Parameters:
302
    ///   - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register.
303
    ///   - loop: The ``NIOAsyncTestingEventLoop`` to use.
304
    @preconcurrency
305
    public convenience init(
306
        handlers: [ChannelHandler & Sendable],
307
        loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()
308
0
    ) async {
309
0
        try! await self.init(loop: loop) { channel in
310
0
            try channel.pipeline.syncOperations.addHandlers(handlers)
311
0
        }
312
0
    }
313
314
    /// Create a new instance.
315
    ///
316
    /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``.
317
    ///
318
    /// - Parameters:
319
    ///   - loop: The ``NIOAsyncTestingEventLoop`` to use.
320
    ///   - channelInitializer: The initialization closure which will be run on the `EventLoop` before registration. This could be used to add handlers using `syncOperations`.
321
    public convenience init(
322
        loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop(),
323
        channelInitializer: @escaping @Sendable (NIOAsyncTestingChannel) throws -> Void
324
0
    ) async throws {
325
0
        self.init(loop: loop)
326
0
        try await loop.submit {
327
0
            try channelInitializer(self)
328
0
        }.get()
329
0
        try await self.register()
330
0
    }
331
332
    /// Asynchronously closes the ``NIOAsyncTestingChannel``.
333
    ///
334
    /// Errors in the ``NIOAsyncTestingChannel`` can be consumed using ``throwIfErrorCaught()``.
335
    ///
336
    /// - Parameters:
337
    ///   - acceptAlreadyClosed: Whether ``finish()`` should throw if the ``NIOAsyncTestingChannel`` has been previously `close`d.
338
    /// - Returns: The ``LeftOverState`` of the ``NIOAsyncTestingChannel``. If all the inbound and outbound events have been
339
    ///            consumed (using ``readInbound(as:)`` / ``readOutbound(as:)``) and there are no pending outbound events (unflushed
340
    ///            writes) this will be ``LeftOverState/clean``. If there are any unconsumed inbound, outbound, or pending outbound
341
    ///            events, the ``NIOAsyncTestingChannel`` will returns those as ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``.
342
0
    public func finish(acceptAlreadyClosed: Bool) async throws -> LeftOverState {
343
0
        do {
344
0
            try await self.close().get()
345
0
        } catch let error as ChannelError {
346
0
            guard error == .alreadyClosed && acceptAlreadyClosed else {
347
0
                throw error
348
0
            }
349
0
        }
350
0
351
0
        // This can never actually throw.
352
0
        try! await self.testingEventLoop.executeInContext {
353
0
            self.testingEventLoop.drainScheduledTasksByRunningAllCurrentlyScheduledTasks()
354
0
        }
355
0
        await self.testingEventLoop.run()
356
0
        try await throwIfErrorCaught()
357
0
358
0
        // This can never actually throw.
359
0
        return try! await self.testingEventLoop.executeInContext {
360
0
            let c = self.channelcore!
361
0
            if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty {
362
0
                return .clean
363
0
            } else {
364
0
                return .leftOvers(
365
0
                    inbound: c.inboundBuffer,
366
0
                    outbound: c.outboundBuffer,
367
0
                    pendingOutbound: c.pendingOutboundBuffer.map { $0.0 }
368
0
                )
369
0
            }
370
0
        }
371
0
    }
372
373
    /// Asynchronously closes the ``NIOAsyncTestingChannel``.
374
    ///
375
    /// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the
376
    /// ``NIOAsyncTestingChannel`` can be consumed using ``throwIfErrorCaught()``.
377
    ///
378
    /// - Returns: The ``LeftOverState`` of the ``NIOAsyncTestingChannel``. If all the inbound and outbound events have been
379
    ///            consumed (using ``readInbound(as:)`` / ``readOutbound(as:)``) and there are no pending outbound events (unflushed
380
    ///            writes) this will be ``LeftOverState/clean``. If there are any unconsumed inbound, outbound, or pending outbound
381
    ///            events, the ``NIOAsyncTestingChannel`` will returns those as ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``.
382
0
    public func finish() async throws -> LeftOverState {
383
0
        try await self.finish(acceptAlreadyClosed: false)
384
0
    }
385
386
    /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s outbound buffer. If the
387
    /// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there
388
    /// are no elements in the outbound buffer, `nil` will be returned.
389
    ///
390
    /// Data hits the ``NIOAsyncTestingChannel``'s outbound buffer when data was written using `write`, then `flush`ed, and
391
    /// then travelled the `ChannelPipeline` all the way to the front. For data to hit the outbound buffer, the very
392
    /// first `ChannelHandler` must have written and flushed it either explicitly (by calling
393
    /// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
394
    ///
395
    /// - Note: Outbound events travel the `ChannelPipeline` _back to front_.
396
    /// - Note: ``NIOAsyncTestingChannel/writeOutbound(_:)`` will `write` data through the `ChannelPipeline`, starting with last
397
    ///         `ChannelHandler`.
398
    @inlinable
399
0
    public func readOutbound<T: Sendable>(as type: T.Type = T.self) async throws -> T? {
400
0
        try await self.testingEventLoop.executeInContext {
401
0
            try self._readFromBuffer(buffer: &self.channelcore.outboundBuffer)
402
0
        }
403
0
    }
404
405
    /// This method is similar to ``NIOAsyncTestingChannel/readOutbound(as:)`` but will wait if the outbound buffer is empty.
406
    /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s outbound buffer. If the
407
    /// first element was of a different type than requested, ``WrongTypeError`` will be thrown. If the channel has
408
    /// already closed or closes before the next pending outbound write, `ChannelError.ioOnClosedChannel` will be
409
    /// thrown. If there are no elements in the outbound buffer, this method will wait until there is one, and return
410
    /// that element.
411
    ///
412
    /// Data hits the ``NIOAsyncTestingChannel``'s outbound buffer when data was written using `write`, then `flush`ed, and
413
    /// then travelled the `ChannelPipeline` all the way to the front. For data to hit the outbound buffer, the very
414
    /// first `ChannelHandler` must have written and flushed it either explicitly (by calling
415
    /// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
416
    ///
417
    /// - Note: Outbound events travel the `ChannelPipeline` _back to front_.
418
    /// - Note: ``NIOAsyncTestingChannel/writeOutbound(_:)`` will `write` data through the `ChannelPipeline`, starting with last
419
    ///         `ChannelHandler`.
420
0
    public func waitForOutboundWrite<T: Sendable>(as type: T.Type = T.self) async throws -> T {
421
0
        try await withCheckedThrowingContinuation { continuation in
422
0
            self.testingEventLoop.execute {
423
0
                do {
424
0
                    if let element: T = try self._readFromBuffer(buffer: &self.channelcore.outboundBuffer) {
425
0
                        continuation.resume(returning: element)
426
0
                        return
427
0
                    }
428
0
                    self.channelcore._enqueueOutboundBufferConsumer { element in
429
0
                        switch element {
430
0
                        case .success(let data):
431
0
                            continuation.resume(with: Result { try self._cast(data) })
432
0
                        case .failure(let failure):
433
0
                            continuation.resume(throwing: failure)
434
0
                        }
435
0
                    }
436
0
                } catch {
437
0
                    continuation.resume(throwing: error)
438
0
                }
439
0
            }
440
0
        }
441
0
    }
442
443
    /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s inbound buffer. If the
444
    /// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there
445
    /// are no elements in the outbound buffer, `nil` will be returned.
446
    ///
447
    /// Data hits the ``NIOAsyncTestingChannel``'s inbound buffer when data was send through the pipeline using `fireChannelRead`
448
    /// and then travelled the `ChannelPipeline` all the way to the back. For data to hit the inbound buffer, the
449
    /// last `ChannelHandler` must have send the event either explicitly (by calling
450
    /// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
451
    ///
452
    /// - Note: ``NIOAsyncTestingChannel/writeInbound(_:)`` will fire data through the `ChannelPipeline` using `fireChannelRead`.
453
    @inlinable
454
0
    public func readInbound<T: Sendable>(as type: T.Type = T.self) async throws -> T? {
455
0
        try await self.testingEventLoop.executeInContext {
456
0
            try self._readFromBuffer(buffer: &self.channelcore.inboundBuffer)
457
0
        }
458
0
    }
459
460
    /// This method is similar to ``NIOAsyncTestingChannel/readInbound(as:)`` but will wait if the inbound buffer is empty.
461
    /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s inbound buffer. If the
462
    /// first element was of a different type than requested, ``WrongTypeError`` will be thrown. If the channel has
463
    /// already closed or closes before the next pending inbound write, `ChannelError.ioOnClosedChannel` will be thrown.
464
    /// If there are no elements in the inbound buffer, this method will wait until there is one, and return that
465
    /// element.
466
    ///
467
    /// Data hits the ``NIOAsyncTestingChannel``'s inbound buffer when data was send through the pipeline using `fireChannelRead`
468
    /// and then travelled the `ChannelPipeline` all the way to the back. For data to hit the inbound buffer, the
469
    /// last `ChannelHandler` must have send the event either explicitly (by calling
470
    /// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
471
    ///
472
    /// - Note: ``NIOAsyncTestingChannel/writeInbound(_:)`` will fire data through the `ChannelPipeline` using `fireChannelRead`.
473
0
    public func waitForInboundWrite<T: Sendable>(as type: T.Type = T.self) async throws -> T {
474
0
        try await withCheckedThrowingContinuation { continuation in
475
0
            self.testingEventLoop.execute {
476
0
                do {
477
0
                    if let element: T = try self._readFromBuffer(buffer: &self.channelcore.inboundBuffer) {
478
0
                        continuation.resume(returning: element)
479
0
                        return
480
0
                    }
481
0
                    self.channelcore._enqueueInboundBufferConsumer { element in
482
0
                        switch element {
483
0
                        case .success(let data):
484
0
                            continuation.resume(with: Result { try self._cast(data) })
485
0
                        case .failure(let failure):
486
0
                            continuation.resume(throwing: failure)
487
0
                        }
488
0
                    }
489
0
                } catch {
490
0
                    continuation.resume(throwing: error)
491
0
                }
492
0
            }
493
0
        }
494
0
    }
495
496
    /// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
497
    ///
498
    /// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called
499
    /// with the data you provide.
500
    ///
501
    /// - Parameters:
502
    ///    - data: The data to fire through the pipeline.
503
    /// - Returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline`
504
    //             all the way.
505
    @inlinable
506
0
    @discardableResult public func writeInbound<T: Sendable>(_ data: T) async throws -> BufferState {
507
0
        try await self.testingEventLoop.executeInContext {
508
0
            self.pipeline.fireChannelRead(data)
509
0
            self.pipeline.fireChannelReadComplete()
510
0
            try self._throwIfErrorCaught()
511
0
            return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(self.channelcore.inboundBuffer)
512
0
        }
513
0
    }
514
515
    /// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`.
516
    ///
517
    /// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called
518
    /// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler
519
    /// because outbound events travel the pipeline from back to front.
520
    ///
521
    /// - Parameters:
522
    ///    - data: The data to fire through the pipeline.
523
    /// - Returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline`
524
    //             all the way.
525
    @inlinable
526
0
    @discardableResult public func writeOutbound<T: Sendable>(_ data: T) async throws -> BufferState {
527
0
        try await self.writeAndFlush(data)
528
0
529
0
        return try await self.testingEventLoop.executeInContext {
530
0
            self.channelcore.outboundBuffer.isEmpty ? .empty : .full(self.channelcore.outboundBuffer)
531
0
        }
532
0
    }
533
534
    /// This method will throw the error that is stored in the ``NIOAsyncTestingChannel`` if any.
535
    ///
536
    /// The ``NIOAsyncTestingChannel`` will store an error if some error travels the `ChannelPipeline` all the way past its end.
537
0
    public func throwIfErrorCaught() async throws {
538
0
        try await self.testingEventLoop.executeInContext {
539
0
            try self._throwIfErrorCaught()
540
0
        }
541
0
    }
542
543
    @usableFromInline
544
0
    func _throwIfErrorCaught() throws {
545
0
        self.testingEventLoop.preconditionInEventLoop()
546
0
        if let error = self.channelcore.error {
547
0
            self.channelcore.error = nil
548
0
            throw error
549
0
        }
550
0
    }
551
552
    @inlinable
553
0
    func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
554
0
        self.testingEventLoop.preconditionInEventLoop()
555
0
556
0
        if buffer.isEmpty {
557
0
            return nil
558
0
        }
559
0
        return try self._cast(buffer.removeFirst(), to: T.self)
560
0
    }
561
562
    @inlinable
563
0
    func _cast<T>(_ element: NIOAny, to: T.Type = T.self) throws -> T {
564
0
        guard let t = self._channelCore.tryUnwrapData(element, as: T.self) else {
565
0
            throw WrongTypeError(
566
0
                expected: T.self,
567
0
                actual: type(of: self._channelCore.tryUnwrapData(element, as: Any.self)!)
568
0
            )
569
0
        }
570
0
        return t
571
0
    }
572
573
    /// - see: `Channel.setOption`
574
    @inlinable
575
0
    public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
576
0
        if self.eventLoop.inEventLoop {
577
0
            self.setOptionSync(option, value: value)
578
0
            return self.eventLoop.makeSucceededVoidFuture()
579
0
        } else {
580
0
            return self.eventLoop.submit { self.setOptionSync(option, value: value) }
581
0
        }
582
0
    }
583
584
    @inlinable
585
0
    internal func setOptionSync<Option: ChannelOption>(_ option: Option, value: Option.Value) {
586
0
        addOption(option, value: value)
587
0
588
0
        if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
589
0
            self.allowRemoteHalfClosure = value as! Bool
590
0
            return
591
0
        }
592
0
    }
593
594
    /// - see: `Channel.getOption`
595
    @inlinable
596
0
    public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
597
0
        if self.eventLoop.inEventLoop {
598
0
            return self.eventLoop.makeSucceededFuture(self.getOptionSync(option))
599
0
        } else {
600
0
            return self.eventLoop.submit { self.getOptionSync(option) }
601
0
        }
602
0
    }
603
604
    @inlinable
605
0
    internal func getOptionSync<Option: ChannelOption>(_ option: Option) -> Option.Value {
606
0
        if option is ChannelOptions.Types.AutoReadOption {
607
0
            return true as! Option.Value
608
0
        }
609
0
        if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
610
0
            return self.allowRemoteHalfClosure as! Option.Value
611
0
        }
612
0
        if option is ChannelOptions.Types.BufferedWritableBytesOption {
613
0
            let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in
614
0
                let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self)
615
0
                return partialResult + buffer.readableBytes
616
0
            }
617
0
618
0
            return result as! Option.Value
619
0
        }
620
0
621
0
        guard let value = self.optionValue(for: option) else {
622
0
            fatalError("option \(option) not supported")
623
0
        }
624
0
625
0
        return value
626
0
    }
627
628
    @inlinable
629
0
    internal func optionValue<Option: ChannelOption>(for option: Option) -> Option.Value? {
630
0
        self.options.first(where: { $0.option is Option })?.value as? Option.Value
631
0
    }
632
633
    @inlinable
634
0
    internal func addOption<Option: ChannelOption>(_ option: Option, value: Option.Value) {
635
0
        // override the option if it exists
636
0
        self._stateLock.withLockedValue { state in
637
0
            var options = state.options
638
0
            let optionIndex = options.firstIndex(where: { $0.option is Option })
639
0
            if let optionIndex = optionIndex {
640
0
                options[optionIndex] = (option, value)
641
0
            } else {
642
0
                options.append((option, value))
643
0
            }
644
0
            state.options = options
645
0
        }
646
0
    }
647
648
    /// Fires the (outbound) `bind` event through the `ChannelPipeline`. If the event hits the ``NIOAsyncTestingChannel`` which
649
    /// happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
650
    /// ``NIOAsyncTestingChannel``'s ``localAddress``.
651
    ///
652
    /// - Parameters:
653
    ///   - address: The address to fake-bind to.
654
    ///   - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
655
0
    public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
656
0
        let promise = promise ?? self.testingEventLoop.makePromise()
657
0
        promise.futureResult.whenSuccess {
658
0
            self.localAddress = address
659
0
        }
660
0
        if self.eventLoop.inEventLoop {
661
0
            self.pipeline.bind(to: address, promise: promise)
662
0
        } else {
663
0
            self.eventLoop.execute {
664
0
                self.pipeline.bind(to: address, promise: promise)
665
0
            }
666
0
        }
667
0
    }
668
669
    /// Fires the (outbound) `connect` event through the `ChannelPipeline`. If the event hits the ``NIOAsyncTestingChannel``
670
    /// which happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
671
    /// ``NIOAsyncTestingChannel``'s ``remoteAddress``.
672
    ///
673
    /// - Parameters:
674
    ///   - address: The address to fake-bind to.
675
    ///   - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
676
0
    public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
677
0
        let promise = promise ?? self.testingEventLoop.makePromise()
678
0
        promise.futureResult.whenSuccess {
679
0
            self.remoteAddress = address
680
0
        }
681
0
        if self.eventLoop.inEventLoop {
682
0
            self.pipeline.connect(to: address, promise: promise)
683
0
        } else {
684
0
            self.eventLoop.execute {
685
0
                self.pipeline.connect(to: address, promise: promise)
686
0
            }
687
0
        }
688
0
    }
689
690
    public struct SynchronousOptions: NIOSynchronousChannelOptions {
691
        @usableFromInline
692
        internal let channel: NIOAsyncTestingChannel
693
694
0
        fileprivate init(channel: NIOAsyncTestingChannel) {
695
0
            self.channel = channel
696
0
        }
697
698
        @inlinable
699
0
        public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
700
0
            self.channel.eventLoop.preconditionInEventLoop()
701
0
            self.channel.setOptionSync(option, value: value)
702
0
        }
703
704
        @inlinable
705
0
        public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
706
0
            self.channel.eventLoop.preconditionInEventLoop()
707
0
            return self.channel.getOptionSync(option)
708
0
        }
709
    }
710
711
0
    public final var syncOptions: NIOSynchronousChannelOptions? {
712
0
        SynchronousOptions(channel: self)
713
0
    }
714
}
715
716
// MARK: Unchecked sendable
717
//
718
// Both of these types are unchecked Sendable because strictly, they aren't. This is
719
// because they contain NIOAny, a non-Sendable type. In this instance, we tolerate the moving
720
// of this object across threads because in the overwhelming majority of cases the data types
721
// in a channel pipeline _are_ `Sendable`, and because these objects only carry NIOAnys in cases
722
// where the `Channel` itself no longer holds a reference to these objects.
723
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
724
extension NIOAsyncTestingChannel.LeftOverState: @unchecked Sendable {}
725
726
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
727
extension NIOAsyncTestingChannel.BufferState: @unchecked Sendable {}
728
729
// Synchronous options are never Sendable.
730
@available(*, unavailable)
731
extension NIOAsyncTestingChannel.SynchronousOptions: Sendable {}
732
733
#endif  // canImport(Dispatch)