/src/swift-nio/Sources/NIOEmbedded/AsyncTestingChannel.swift
Line | Count | Source |
1 | | //===----------------------------------------------------------------------===// |
2 | | // |
3 | | // This source file is part of the SwiftNIO open source project |
4 | | // |
5 | | // Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors |
6 | | // Licensed under Apache License v2.0 |
7 | | // |
8 | | // See LICENSE.txt for license information |
9 | | // See CONTRIBUTORS.txt for the list of SwiftNIO project authors |
10 | | // |
11 | | // SPDX-License-Identifier: Apache-2.0 |
12 | | // |
13 | | //===----------------------------------------------------------------------===// |
14 | | |
15 | | #if canImport(Dispatch) |
16 | | import NIOConcurrencyHelpers |
17 | | import NIOCore |
18 | | |
19 | | /// A `Channel` with fine-grained control for testing. |
20 | | /// |
21 | | /// ``NIOAsyncTestingChannel`` is a `Channel` implementation that does no |
22 | | /// actual IO but that does have proper eventing mechanism, albeit one that users can |
23 | | /// control. The prime use-case for ``NIOAsyncTestingChannel`` is in unit tests when you |
24 | | /// want to feed the inbound events and check the outbound events manually. |
25 | | /// |
26 | | /// Please remember to call ``finish()`` when you are no longer using this |
27 | | /// ``NIOAsyncTestingChannel``. |
28 | | /// |
29 | | /// To feed events through an ``NIOAsyncTestingChannel``'s `ChannelPipeline` use |
30 | | /// ``NIOAsyncTestingChannel/writeInbound(_:)`` which accepts data of any type. It will then |
31 | | /// forward that data through the `ChannelPipeline` and the subsequent |
32 | | /// `ChannelInboundHandler` will receive it through the usual `channelRead` |
33 | | /// event. The user is responsible for making sure the first |
34 | | /// `ChannelInboundHandler` expects data of that type. |
35 | | /// |
36 | | /// Unlike in a regular `ChannelPipeline`, it is expected that the test code will act |
37 | | /// as the "network layer", using ``readOutbound(as:)`` to observe the data that the |
38 | | /// `Channel` has "written" to the network, and using ``writeInbound(_:)`` to simulate |
39 | | /// receiving data from the network. There are also facilities to make it a bit easier |
40 | | /// to handle the logic for `write` and `flush` (using ``writeOutbound(_:)``), and to |
41 | | /// extract data that passed the whole way along the channel in `channelRead` (using |
42 | | /// ``readOutbound(as:)``. Below is a diagram showing the layout of a `ChannelPipeline` |
43 | | /// inside a ``NIOAsyncTestingChannel``, including the functions that can be used to |
44 | | /// inject and extract data at each end. |
45 | | /// |
46 | | /// ``` |
47 | | /// |
48 | | /// Extract data Inject data |
49 | | /// using readInbound() using writeOutbound() |
50 | | /// ▲ | |
51 | | /// +---------------+-----------------------------------+---------------+ |
52 | | /// | | ChannelPipeline | | |
53 | | /// | | TAIL ▼ | |
54 | | /// | +---------------------+ +-----------+----------+ | |
55 | | /// | | Inbound Handler N | | Outbound Handler 1 | | |
56 | | /// | +----------+----------+ +-----------+----------+ | |
57 | | /// | ▲ | | |
58 | | /// | | ▼ | |
59 | | /// | +----------+----------+ +-----------+----------+ | |
60 | | /// | | Inbound Handler N-1 | | Outbound Handler 2 | | |
61 | | /// | +----------+----------+ +-----------+----------+ | |
62 | | /// | ▲ . | |
63 | | /// | . . | |
64 | | /// | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| |
65 | | /// | [ method call] [method call] | |
66 | | /// | . . | |
67 | | /// | . ▼ | |
68 | | /// | +----------+----------+ +-----------+----------+ | |
69 | | /// | | Inbound Handler 2 | | Outbound Handler M-1 | | |
70 | | /// | +----------+----------+ +-----------+----------+ | |
71 | | /// | ▲ | | |
72 | | /// | | ▼ | |
73 | | /// | +----------+----------+ +-----------+----------+ | |
74 | | /// | | Inbound Handler 1 | | Outbound Handler M | | |
75 | | /// | +----------+----------+ +-----------+----------+ | |
76 | | /// | ▲ HEAD | | |
77 | | /// +---------------+-----------------------------------+---------------+ |
78 | | /// | ▼ |
79 | | /// Inject data Extract data |
80 | | /// using writeInbound() using readOutbound() |
81 | | /// ``` |
82 | | /// |
83 | | /// - Note: ``NIOAsyncTestingChannel`` is currently only compatible with |
84 | | /// ``NIOAsyncTestingEventLoop``s and cannot be used with `SelectableEventLoop`s from |
85 | | /// for example `MultiThreadedEventLoopGroup`. |
86 | | @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) |
87 | | public final class NIOAsyncTestingChannel: Channel { |
88 | | /// ``LeftOverState`` represents any left-over inbound, outbound, and pending outbound events that hit the |
89 | | /// ``NIOAsyncTestingChannel`` and were not consumed when ``finish()`` was called on the ``NIOAsyncTestingChannel``. |
90 | | /// |
91 | | /// ``NIOAsyncTestingChannel`` is most useful in testing and usually in unit tests, you want to consume all inbound and |
92 | | /// outbound data to verify they are what you expect. Therefore, when you ``finish()`` a ``NIOAsyncTestingChannel`` it will |
93 | | /// return if it's either ``LeftOverState/clean`` (no left overs) or that it has ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``. |
94 | | public enum LeftOverState { |
95 | | /// The ``NIOAsyncTestingChannel`` is clean, ie. no inbound, outbound, or pending outbound data left on ``NIOAsyncTestingChannel/finish()``. |
96 | | case clean |
97 | | |
98 | | /// The ``NIOAsyncTestingChannel`` has inbound, outbound, or pending outbound data left on ``NIOAsyncTestingChannel/finish()``. |
99 | | case leftOvers(inbound: CircularBuffer<NIOAny>, outbound: CircularBuffer<NIOAny>, pendingOutbound: [NIOAny]) |
100 | | |
101 | | /// `true` if the ``NIOAsyncTestingChannel`` was `clean` on ``NIOAsyncTestingChannel/finish()``, ie. there is no unconsumed inbound, outbound, or |
102 | | /// pending outbound data left on the `Channel`. |
103 | 0 | public var isClean: Bool { |
104 | 0 | if case .clean = self { |
105 | 0 | return true |
106 | 0 | } else { |
107 | 0 | return false |
108 | 0 | } |
109 | 0 | } |
110 | | |
111 | | /// `true` if the ``NIOAsyncTestingChannel`` if there was unconsumed inbound, outbound, or pending outbound data left |
112 | | /// on the `Channel` when it was `finish`ed. |
113 | 0 | public var hasLeftOvers: Bool { |
114 | 0 | !self.isClean |
115 | 0 | } |
116 | | } |
117 | | |
118 | | /// ``BufferState`` represents the state of either the inbound, or the outbound ``NIOAsyncTestingChannel`` buffer. |
119 | | /// |
120 | | /// These buffers contain data that travelled the `ChannelPipeline` all the way through.. |
121 | | /// |
122 | | /// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing |
123 | | /// `channelRead`) sends inbound data into the end of the ``NIOAsyncTestingChannel``, it will be held in the |
124 | | /// ``NIOAsyncTestingChannel``'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective |
125 | | /// buffer will be returned from ``writeInbound(_:)``/``writeOutbound(_:)`` as a ``BufferState``. |
126 | | public enum BufferState { |
127 | | /// The buffer is empty. |
128 | | case empty |
129 | | |
130 | | /// The buffer is non-empty. |
131 | | case full(CircularBuffer<NIOAny>) |
132 | | |
133 | | /// Returns `true` is the buffer was empty. |
134 | 0 | public var isEmpty: Bool { |
135 | 0 | if case .empty = self { |
136 | 0 | return true |
137 | 0 | } else { |
138 | 0 | return false |
139 | 0 | } |
140 | 0 | } |
141 | | |
142 | | /// Returns `true` if the buffer was non-empty. |
143 | 0 | public var isFull: Bool { |
144 | 0 | !self.isEmpty |
145 | 0 | } |
146 | | } |
147 | | |
148 | | /// ``WrongTypeError`` is thrown if you use ``readInbound(as:)`` or ``readOutbound(as:)`` and request a certain type but the first |
149 | | /// item in the respective buffer is of a different type. |
150 | | public struct WrongTypeError: Error, Equatable { |
151 | | /// The type you expected. |
152 | | public let expected: Any.Type |
153 | | |
154 | | /// The type of the actual first element. |
155 | | public let actual: Any.Type |
156 | | |
157 | 0 | public init(expected: Any.Type, actual: Any.Type) { |
158 | 0 | self.expected = expected |
159 | 0 | self.actual = actual |
160 | 0 | } |
161 | | |
162 | 0 | public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool { |
163 | 0 | lhs.expected == rhs.expected && lhs.actual == rhs.actual |
164 | 0 | } |
165 | | } |
166 | | |
167 | | /// Returns `true` if the ``NIOAsyncTestingChannel`` is 'active'. |
168 | | /// |
169 | | /// An active ``NIOAsyncTestingChannel`` can be closed by calling `close` or ``finish()`` on the ``NIOAsyncTestingChannel``. |
170 | | /// |
171 | | /// - Note: An ``NIOAsyncTestingChannel`` starts _inactive_ and can be activated, for example by calling `connect`. |
172 | 0 | public var isActive: Bool { channelcore.isActive } |
173 | | |
174 | | /// - see: `ChannelOptions.Types.AllowRemoteHalfClosureOption` |
175 | | public var allowRemoteHalfClosure: Bool { |
176 | 0 | get { |
177 | 0 | channelcore.allowRemoteHalfClosure |
178 | 0 | } |
179 | 0 | set { |
180 | 0 | channelcore.allowRemoteHalfClosure = newValue |
181 | 0 | } |
182 | | } |
183 | | |
184 | | /// - see: `Channel.closeFuture` |
185 | 0 | public var closeFuture: EventLoopFuture<Void> { channelcore.closePromise.futureResult } |
186 | | |
187 | | /// - see: `Channel.allocator` |
188 | 0 | public let allocator: ByteBufferAllocator = ByteBufferAllocator() |
189 | | |
190 | | /// - see: `Channel.eventLoop` |
191 | 0 | public var eventLoop: EventLoop { |
192 | 0 | self.testingEventLoop |
193 | 0 | } |
194 | | |
195 | | /// Returns the ``NIOAsyncTestingEventLoop`` that this ``NIOAsyncTestingChannel`` uses. This will return the same instance as |
196 | | /// ``NIOAsyncTestingChannel/eventLoop`` but as the concrete ``NIOAsyncTestingEventLoop`` rather than as `EventLoop` existential. |
197 | | public let testingEventLoop: NIOAsyncTestingEventLoop |
198 | | |
199 | | /// `nil` because ``NIOAsyncTestingChannel``s don't have parents. |
200 | 0 | public let parent: Channel? = nil |
201 | | |
202 | | // These two variables are only written once, from a single thread, and never written again, so they're _technically_ thread-safe. Most methods cannot safely |
203 | | // be used from multiple threads, but `isActive`, `isOpen`, `eventLoop`, and `closeFuture` can all safely be used from any thread. Just. |
204 | | // `EmbeddedChannelCore`'s localAddress and remoteAddress fields are protected by a lock so they are safe to access. |
205 | | @usableFromInline |
206 | | nonisolated(unsafe) var channelcore: EmbeddedChannelCore! |
207 | | nonisolated(unsafe) private var _pipeline: ChannelPipeline! |
208 | | |
209 | | @usableFromInline |
210 | | internal struct State: Sendable { |
211 | | var isWritable: Bool |
212 | | |
213 | | @usableFromInline |
214 | | var options: [(option: any ChannelOption, value: any Sendable)] |
215 | | } |
216 | | |
217 | | /// Guards any of the getters/setters that can be accessed from any thread. |
218 | | @usableFromInline |
219 | 0 | internal let _stateLock = NIOLockedValueBox( |
220 | 0 | State(isWritable: true, options: []) |
221 | 0 | ) |
222 | | |
223 | | /// - see: `Channel._channelCore` |
224 | 0 | public var _channelCore: ChannelCore { |
225 | 0 | channelcore |
226 | 0 | } |
227 | | |
228 | | /// - see: `Channel.pipeline` |
229 | 0 | public var pipeline: ChannelPipeline { |
230 | 0 | _pipeline |
231 | 0 | } |
232 | | |
233 | | /// - see: `Channel.isWritable` |
234 | | public var isWritable: Bool { |
235 | 0 | get { |
236 | 0 | self._stateLock.withLockedValue { $0.isWritable } |
237 | 0 | } |
238 | 0 | set { |
239 | 0 | self._stateLock.withLockedValue { |
240 | 0 | $0.isWritable = newValue |
241 | 0 | } |
242 | 0 | } |
243 | | } |
244 | | |
245 | | /// - see: `Channel.localAddress` |
246 | | public var localAddress: SocketAddress? { |
247 | 0 | get { |
248 | 0 | self.channelcore.localAddress |
249 | 0 | } |
250 | 0 | set { |
251 | 0 | self.channelcore.localAddress = newValue |
252 | 0 | } |
253 | | } |
254 | | |
255 | | /// - see: `Channel.remoteAddress` |
256 | | public var remoteAddress: SocketAddress? { |
257 | 0 | get { |
258 | 0 | self.channelcore.remoteAddress |
259 | 0 | } |
260 | 0 | set { |
261 | 0 | self.channelcore.remoteAddress = newValue |
262 | 0 | } |
263 | | } |
264 | | |
265 | | /// The `ChannelOption`s set on this channel. |
266 | | /// - see: `NIOAsyncTestingChannel.setOption` |
267 | 0 | public var options: [(option: any ChannelOption, value: any Sendable)] { |
268 | 0 | self._stateLock.withLockedValue { $0.options } |
269 | 0 | } |
270 | | /// Create a new instance. |
271 | | /// |
272 | | /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``. |
273 | | /// |
274 | | /// - Parameters: |
275 | | /// - loop: The ``NIOAsyncTestingEventLoop`` to use. |
276 | 0 | public init(loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) { |
277 | 0 | self.testingEventLoop = loop |
278 | 0 | self._pipeline = ChannelPipeline(channel: self) |
279 | 0 | self.channelcore = EmbeddedChannelCore(pipeline: self._pipeline, eventLoop: self.eventLoop) |
280 | 0 | } |
281 | | |
282 | | /// Create a new instance. |
283 | | /// |
284 | | /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``. |
285 | | /// |
286 | | /// - Parameters: |
287 | | /// - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register. |
288 | | /// - loop: The ``NIOAsyncTestingEventLoop`` to use. |
289 | | @preconcurrency |
290 | | public convenience init( |
291 | | handler: ChannelHandler & Sendable, |
292 | | loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop() |
293 | 0 | ) async { |
294 | 0 | await self.init(handlers: [handler], loop: loop) |
295 | 0 | } |
296 | | |
297 | | /// Create a new instance. |
298 | | /// |
299 | | /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``. |
300 | | /// |
301 | | /// - Parameters: |
302 | | /// - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register. |
303 | | /// - loop: The ``NIOAsyncTestingEventLoop`` to use. |
304 | | @preconcurrency |
305 | | public convenience init( |
306 | | handlers: [ChannelHandler & Sendable], |
307 | | loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop() |
308 | 0 | ) async { |
309 | 0 | try! await self.init(loop: loop) { channel in |
310 | 0 | try channel.pipeline.syncOperations.addHandlers(handlers) |
311 | 0 | } |
312 | 0 | } |
313 | | |
314 | | /// Create a new instance. |
315 | | /// |
316 | | /// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``. |
317 | | /// |
318 | | /// - Parameters: |
319 | | /// - loop: The ``NIOAsyncTestingEventLoop`` to use. |
320 | | /// - channelInitializer: The initialization closure which will be run on the `EventLoop` before registration. This could be used to add handlers using `syncOperations`. |
321 | | public convenience init( |
322 | | loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop(), |
323 | | channelInitializer: @escaping @Sendable (NIOAsyncTestingChannel) throws -> Void |
324 | 0 | ) async throws { |
325 | 0 | self.init(loop: loop) |
326 | 0 | try await loop.submit { |
327 | 0 | try channelInitializer(self) |
328 | 0 | }.get() |
329 | 0 | try await self.register() |
330 | 0 | } |
331 | | |
332 | | /// Asynchronously closes the ``NIOAsyncTestingChannel``. |
333 | | /// |
334 | | /// Errors in the ``NIOAsyncTestingChannel`` can be consumed using ``throwIfErrorCaught()``. |
335 | | /// |
336 | | /// - Parameters: |
337 | | /// - acceptAlreadyClosed: Whether ``finish()`` should throw if the ``NIOAsyncTestingChannel`` has been previously `close`d. |
338 | | /// - Returns: The ``LeftOverState`` of the ``NIOAsyncTestingChannel``. If all the inbound and outbound events have been |
339 | | /// consumed (using ``readInbound(as:)`` / ``readOutbound(as:)``) and there are no pending outbound events (unflushed |
340 | | /// writes) this will be ``LeftOverState/clean``. If there are any unconsumed inbound, outbound, or pending outbound |
341 | | /// events, the ``NIOAsyncTestingChannel`` will returns those as ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``. |
342 | 0 | public func finish(acceptAlreadyClosed: Bool) async throws -> LeftOverState { |
343 | 0 | do { |
344 | 0 | try await self.close().get() |
345 | 0 | } catch let error as ChannelError { |
346 | 0 | guard error == .alreadyClosed && acceptAlreadyClosed else { |
347 | 0 | throw error |
348 | 0 | } |
349 | 0 | } |
350 | 0 |
|
351 | 0 | // This can never actually throw. |
352 | 0 | try! await self.testingEventLoop.executeInContext { |
353 | 0 | self.testingEventLoop.drainScheduledTasksByRunningAllCurrentlyScheduledTasks() |
354 | 0 | } |
355 | 0 | await self.testingEventLoop.run() |
356 | 0 | try await throwIfErrorCaught() |
357 | 0 |
|
358 | 0 | // This can never actually throw. |
359 | 0 | return try! await self.testingEventLoop.executeInContext { |
360 | 0 | let c = self.channelcore! |
361 | 0 | if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty { |
362 | 0 | return .clean |
363 | 0 | } else { |
364 | 0 | return .leftOvers( |
365 | 0 | inbound: c.inboundBuffer, |
366 | 0 | outbound: c.outboundBuffer, |
367 | 0 | pendingOutbound: c.pendingOutboundBuffer.map { $0.0 } |
368 | 0 | ) |
369 | 0 | } |
370 | 0 | } |
371 | 0 | } |
372 | | |
373 | | /// Asynchronously closes the ``NIOAsyncTestingChannel``. |
374 | | /// |
375 | | /// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the |
376 | | /// ``NIOAsyncTestingChannel`` can be consumed using ``throwIfErrorCaught()``. |
377 | | /// |
378 | | /// - Returns: The ``LeftOverState`` of the ``NIOAsyncTestingChannel``. If all the inbound and outbound events have been |
379 | | /// consumed (using ``readInbound(as:)`` / ``readOutbound(as:)``) and there are no pending outbound events (unflushed |
380 | | /// writes) this will be ``LeftOverState/clean``. If there are any unconsumed inbound, outbound, or pending outbound |
381 | | /// events, the ``NIOAsyncTestingChannel`` will returns those as ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``. |
382 | 0 | public func finish() async throws -> LeftOverState { |
383 | 0 | try await self.finish(acceptAlreadyClosed: false) |
384 | 0 | } |
385 | | |
386 | | /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s outbound buffer. If the |
387 | | /// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there |
388 | | /// are no elements in the outbound buffer, `nil` will be returned. |
389 | | /// |
390 | | /// Data hits the ``NIOAsyncTestingChannel``'s outbound buffer when data was written using `write`, then `flush`ed, and |
391 | | /// then travelled the `ChannelPipeline` all the way to the front. For data to hit the outbound buffer, the very |
392 | | /// first `ChannelHandler` must have written and flushed it either explicitly (by calling |
393 | | /// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`. |
394 | | /// |
395 | | /// - Note: Outbound events travel the `ChannelPipeline` _back to front_. |
396 | | /// - Note: ``NIOAsyncTestingChannel/writeOutbound(_:)`` will `write` data through the `ChannelPipeline`, starting with last |
397 | | /// `ChannelHandler`. |
398 | | @inlinable |
399 | 0 | public func readOutbound<T: Sendable>(as type: T.Type = T.self) async throws -> T? { |
400 | 0 | try await self.testingEventLoop.executeInContext { |
401 | 0 | try self._readFromBuffer(buffer: &self.channelcore.outboundBuffer) |
402 | 0 | } |
403 | 0 | } |
404 | | |
405 | | /// This method is similar to ``NIOAsyncTestingChannel/readOutbound(as:)`` but will wait if the outbound buffer is empty. |
406 | | /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s outbound buffer. If the |
407 | | /// first element was of a different type than requested, ``WrongTypeError`` will be thrown. If the channel has |
408 | | /// already closed or closes before the next pending outbound write, `ChannelError.ioOnClosedChannel` will be |
409 | | /// thrown. If there are no elements in the outbound buffer, this method will wait until there is one, and return |
410 | | /// that element. |
411 | | /// |
412 | | /// Data hits the ``NIOAsyncTestingChannel``'s outbound buffer when data was written using `write`, then `flush`ed, and |
413 | | /// then travelled the `ChannelPipeline` all the way to the front. For data to hit the outbound buffer, the very |
414 | | /// first `ChannelHandler` must have written and flushed it either explicitly (by calling |
415 | | /// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`. |
416 | | /// |
417 | | /// - Note: Outbound events travel the `ChannelPipeline` _back to front_. |
418 | | /// - Note: ``NIOAsyncTestingChannel/writeOutbound(_:)`` will `write` data through the `ChannelPipeline`, starting with last |
419 | | /// `ChannelHandler`. |
420 | 0 | public func waitForOutboundWrite<T: Sendable>(as type: T.Type = T.self) async throws -> T { |
421 | 0 | try await withCheckedThrowingContinuation { continuation in |
422 | 0 | self.testingEventLoop.execute { |
423 | 0 | do { |
424 | 0 | if let element: T = try self._readFromBuffer(buffer: &self.channelcore.outboundBuffer) { |
425 | 0 | continuation.resume(returning: element) |
426 | 0 | return |
427 | 0 | } |
428 | 0 | self.channelcore._enqueueOutboundBufferConsumer { element in |
429 | 0 | switch element { |
430 | 0 | case .success(let data): |
431 | 0 | continuation.resume(with: Result { try self._cast(data) }) |
432 | 0 | case .failure(let failure): |
433 | 0 | continuation.resume(throwing: failure) |
434 | 0 | } |
435 | 0 | } |
436 | 0 | } catch { |
437 | 0 | continuation.resume(throwing: error) |
438 | 0 | } |
439 | 0 | } |
440 | 0 | } |
441 | 0 | } |
442 | | |
443 | | /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s inbound buffer. If the |
444 | | /// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there |
445 | | /// are no elements in the outbound buffer, `nil` will be returned. |
446 | | /// |
447 | | /// Data hits the ``NIOAsyncTestingChannel``'s inbound buffer when data was send through the pipeline using `fireChannelRead` |
448 | | /// and then travelled the `ChannelPipeline` all the way to the back. For data to hit the inbound buffer, the |
449 | | /// last `ChannelHandler` must have send the event either explicitly (by calling |
450 | | /// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`. |
451 | | /// |
452 | | /// - Note: ``NIOAsyncTestingChannel/writeInbound(_:)`` will fire data through the `ChannelPipeline` using `fireChannelRead`. |
453 | | @inlinable |
454 | 0 | public func readInbound<T: Sendable>(as type: T.Type = T.self) async throws -> T? { |
455 | 0 | try await self.testingEventLoop.executeInContext { |
456 | 0 | try self._readFromBuffer(buffer: &self.channelcore.inboundBuffer) |
457 | 0 | } |
458 | 0 | } |
459 | | |
460 | | /// This method is similar to ``NIOAsyncTestingChannel/readInbound(as:)`` but will wait if the inbound buffer is empty. |
461 | | /// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s inbound buffer. If the |
462 | | /// first element was of a different type than requested, ``WrongTypeError`` will be thrown. If the channel has |
463 | | /// already closed or closes before the next pending inbound write, `ChannelError.ioOnClosedChannel` will be thrown. |
464 | | /// If there are no elements in the inbound buffer, this method will wait until there is one, and return that |
465 | | /// element. |
466 | | /// |
467 | | /// Data hits the ``NIOAsyncTestingChannel``'s inbound buffer when data was send through the pipeline using `fireChannelRead` |
468 | | /// and then travelled the `ChannelPipeline` all the way to the back. For data to hit the inbound buffer, the |
469 | | /// last `ChannelHandler` must have send the event either explicitly (by calling |
470 | | /// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`. |
471 | | /// |
472 | | /// - Note: ``NIOAsyncTestingChannel/writeInbound(_:)`` will fire data through the `ChannelPipeline` using `fireChannelRead`. |
473 | 0 | public func waitForInboundWrite<T: Sendable>(as type: T.Type = T.self) async throws -> T { |
474 | 0 | try await withCheckedThrowingContinuation { continuation in |
475 | 0 | self.testingEventLoop.execute { |
476 | 0 | do { |
477 | 0 | if let element: T = try self._readFromBuffer(buffer: &self.channelcore.inboundBuffer) { |
478 | 0 | continuation.resume(returning: element) |
479 | 0 | return |
480 | 0 | } |
481 | 0 | self.channelcore._enqueueInboundBufferConsumer { element in |
482 | 0 | switch element { |
483 | 0 | case .success(let data): |
484 | 0 | continuation.resume(with: Result { try self._cast(data) }) |
485 | 0 | case .failure(let failure): |
486 | 0 | continuation.resume(throwing: failure) |
487 | 0 | } |
488 | 0 | } |
489 | 0 | } catch { |
490 | 0 | continuation.resume(throwing: error) |
491 | 0 | } |
492 | 0 | } |
493 | 0 | } |
494 | 0 | } |
495 | | |
496 | | /// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`. |
497 | | /// |
498 | | /// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called |
499 | | /// with the data you provide. |
500 | | /// |
501 | | /// - Parameters: |
502 | | /// - data: The data to fire through the pipeline. |
503 | | /// - Returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline` |
504 | | // all the way. |
505 | | @inlinable |
506 | 0 | @discardableResult public func writeInbound<T: Sendable>(_ data: T) async throws -> BufferState { |
507 | 0 | try await self.testingEventLoop.executeInContext { |
508 | 0 | self.pipeline.fireChannelRead(data) |
509 | 0 | self.pipeline.fireChannelReadComplete() |
510 | 0 | try self._throwIfErrorCaught() |
511 | 0 | return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(self.channelcore.inboundBuffer) |
512 | 0 | } |
513 | 0 | } |
514 | | |
515 | | /// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`. |
516 | | /// |
517 | | /// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called |
518 | | /// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler |
519 | | /// because outbound events travel the pipeline from back to front. |
520 | | /// |
521 | | /// - Parameters: |
522 | | /// - data: The data to fire through the pipeline. |
523 | | /// - Returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline` |
524 | | // all the way. |
525 | | @inlinable |
526 | 0 | @discardableResult public func writeOutbound<T: Sendable>(_ data: T) async throws -> BufferState { |
527 | 0 | try await self.writeAndFlush(data) |
528 | 0 |
|
529 | 0 | return try await self.testingEventLoop.executeInContext { |
530 | 0 | self.channelcore.outboundBuffer.isEmpty ? .empty : .full(self.channelcore.outboundBuffer) |
531 | 0 | } |
532 | 0 | } |
533 | | |
534 | | /// This method will throw the error that is stored in the ``NIOAsyncTestingChannel`` if any. |
535 | | /// |
536 | | /// The ``NIOAsyncTestingChannel`` will store an error if some error travels the `ChannelPipeline` all the way past its end. |
537 | 0 | public func throwIfErrorCaught() async throws { |
538 | 0 | try await self.testingEventLoop.executeInContext { |
539 | 0 | try self._throwIfErrorCaught() |
540 | 0 | } |
541 | 0 | } |
542 | | |
543 | | @usableFromInline |
544 | 0 | func _throwIfErrorCaught() throws { |
545 | 0 | self.testingEventLoop.preconditionInEventLoop() |
546 | 0 | if let error = self.channelcore.error { |
547 | 0 | self.channelcore.error = nil |
548 | 0 | throw error |
549 | 0 | } |
550 | 0 | } |
551 | | |
552 | | @inlinable |
553 | 0 | func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? { |
554 | 0 | self.testingEventLoop.preconditionInEventLoop() |
555 | 0 |
|
556 | 0 | if buffer.isEmpty { |
557 | 0 | return nil |
558 | 0 | } |
559 | 0 | return try self._cast(buffer.removeFirst(), to: T.self) |
560 | 0 | } |
561 | | |
562 | | @inlinable |
563 | 0 | func _cast<T>(_ element: NIOAny, to: T.Type = T.self) throws -> T { |
564 | 0 | guard let t = self._channelCore.tryUnwrapData(element, as: T.self) else { |
565 | 0 | throw WrongTypeError( |
566 | 0 | expected: T.self, |
567 | 0 | actual: type(of: self._channelCore.tryUnwrapData(element, as: Any.self)!) |
568 | 0 | ) |
569 | 0 | } |
570 | 0 | return t |
571 | 0 | } |
572 | | |
573 | | /// - see: `Channel.setOption` |
574 | | @inlinable |
575 | 0 | public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> { |
576 | 0 | if self.eventLoop.inEventLoop { |
577 | 0 | self.setOptionSync(option, value: value) |
578 | 0 | return self.eventLoop.makeSucceededVoidFuture() |
579 | 0 | } else { |
580 | 0 | return self.eventLoop.submit { self.setOptionSync(option, value: value) } |
581 | 0 | } |
582 | 0 | } |
583 | | |
584 | | @inlinable |
585 | 0 | internal func setOptionSync<Option: ChannelOption>(_ option: Option, value: Option.Value) { |
586 | 0 | addOption(option, value: value) |
587 | 0 |
|
588 | 0 | if option is ChannelOptions.Types.AllowRemoteHalfClosureOption { |
589 | 0 | self.allowRemoteHalfClosure = value as! Bool |
590 | 0 | return |
591 | 0 | } |
592 | 0 | } |
593 | | |
594 | | /// - see: `Channel.getOption` |
595 | | @inlinable |
596 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> { |
597 | 0 | if self.eventLoop.inEventLoop { |
598 | 0 | return self.eventLoop.makeSucceededFuture(self.getOptionSync(option)) |
599 | 0 | } else { |
600 | 0 | return self.eventLoop.submit { self.getOptionSync(option) } |
601 | 0 | } |
602 | 0 | } |
603 | | |
604 | | @inlinable |
605 | 0 | internal func getOptionSync<Option: ChannelOption>(_ option: Option) -> Option.Value { |
606 | 0 | if option is ChannelOptions.Types.AutoReadOption { |
607 | 0 | return true as! Option.Value |
608 | 0 | } |
609 | 0 | if option is ChannelOptions.Types.AllowRemoteHalfClosureOption { |
610 | 0 | return self.allowRemoteHalfClosure as! Option.Value |
611 | 0 | } |
612 | 0 | if option is ChannelOptions.Types.BufferedWritableBytesOption { |
613 | 0 | let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in |
614 | 0 | let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self) |
615 | 0 | return partialResult + buffer.readableBytes |
616 | 0 | } |
617 | 0 |
|
618 | 0 | return result as! Option.Value |
619 | 0 | } |
620 | 0 |
|
621 | 0 | guard let value = self.optionValue(for: option) else { |
622 | 0 | fatalError("option \(option) not supported") |
623 | 0 | } |
624 | 0 |
|
625 | 0 | return value |
626 | 0 | } |
627 | | |
628 | | @inlinable |
629 | 0 | internal func optionValue<Option: ChannelOption>(for option: Option) -> Option.Value? { |
630 | 0 | self.options.first(where: { $0.option is Option })?.value as? Option.Value |
631 | 0 | } |
632 | | |
633 | | @inlinable |
634 | 0 | internal func addOption<Option: ChannelOption>(_ option: Option, value: Option.Value) { |
635 | 0 | // override the option if it exists |
636 | 0 | self._stateLock.withLockedValue { state in |
637 | 0 | var options = state.options |
638 | 0 | let optionIndex = options.firstIndex(where: { $0.option is Option }) |
639 | 0 | if let optionIndex = optionIndex { |
640 | 0 | options[optionIndex] = (option, value) |
641 | 0 | } else { |
642 | 0 | options.append((option, value)) |
643 | 0 | } |
644 | 0 | state.options = options |
645 | 0 | } |
646 | 0 | } |
647 | | |
648 | | /// Fires the (outbound) `bind` event through the `ChannelPipeline`. If the event hits the ``NIOAsyncTestingChannel`` which |
649 | | /// happens when it travels the `ChannelPipeline` all the way to the front, this will also set the |
650 | | /// ``NIOAsyncTestingChannel``'s ``localAddress``. |
651 | | /// |
652 | | /// - Parameters: |
653 | | /// - address: The address to fake-bind to. |
654 | | /// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done. |
655 | 0 | public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
656 | 0 | let promise = promise ?? self.testingEventLoop.makePromise() |
657 | 0 | promise.futureResult.whenSuccess { |
658 | 0 | self.localAddress = address |
659 | 0 | } |
660 | 0 | if self.eventLoop.inEventLoop { |
661 | 0 | self.pipeline.bind(to: address, promise: promise) |
662 | 0 | } else { |
663 | 0 | self.eventLoop.execute { |
664 | 0 | self.pipeline.bind(to: address, promise: promise) |
665 | 0 | } |
666 | 0 | } |
667 | 0 | } |
668 | | |
669 | | /// Fires the (outbound) `connect` event through the `ChannelPipeline`. If the event hits the ``NIOAsyncTestingChannel`` |
670 | | /// which happens when it travels the `ChannelPipeline` all the way to the front, this will also set the |
671 | | /// ``NIOAsyncTestingChannel``'s ``remoteAddress``. |
672 | | /// |
673 | | /// - Parameters: |
674 | | /// - address: The address to fake-bind to. |
675 | | /// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done. |
676 | 0 | public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
677 | 0 | let promise = promise ?? self.testingEventLoop.makePromise() |
678 | 0 | promise.futureResult.whenSuccess { |
679 | 0 | self.remoteAddress = address |
680 | 0 | } |
681 | 0 | if self.eventLoop.inEventLoop { |
682 | 0 | self.pipeline.connect(to: address, promise: promise) |
683 | 0 | } else { |
684 | 0 | self.eventLoop.execute { |
685 | 0 | self.pipeline.connect(to: address, promise: promise) |
686 | 0 | } |
687 | 0 | } |
688 | 0 | } |
689 | | |
690 | | public struct SynchronousOptions: NIOSynchronousChannelOptions { |
691 | | @usableFromInline |
692 | | internal let channel: NIOAsyncTestingChannel |
693 | | |
694 | 0 | fileprivate init(channel: NIOAsyncTestingChannel) { |
695 | 0 | self.channel = channel |
696 | 0 | } |
697 | | |
698 | | @inlinable |
699 | 0 | public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws { |
700 | 0 | self.channel.eventLoop.preconditionInEventLoop() |
701 | 0 | self.channel.setOptionSync(option, value: value) |
702 | 0 | } |
703 | | |
704 | | @inlinable |
705 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value { |
706 | 0 | self.channel.eventLoop.preconditionInEventLoop() |
707 | 0 | return self.channel.getOptionSync(option) |
708 | 0 | } |
709 | | } |
710 | | |
711 | 0 | public final var syncOptions: NIOSynchronousChannelOptions? { |
712 | 0 | SynchronousOptions(channel: self) |
713 | 0 | } |
714 | | } |
715 | | |
716 | | // MARK: Unchecked sendable |
717 | | // |
718 | | // Both of these types are unchecked Sendable because strictly, they aren't. This is |
719 | | // because they contain NIOAny, a non-Sendable type. In this instance, we tolerate the moving |
720 | | // of this object across threads because in the overwhelming majority of cases the data types |
721 | | // in a channel pipeline _are_ `Sendable`, and because these objects only carry NIOAnys in cases |
722 | | // where the `Channel` itself no longer holds a reference to these objects. |
723 | | @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) |
724 | | extension NIOAsyncTestingChannel.LeftOverState: @unchecked Sendable {} |
725 | | |
726 | | @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) |
727 | | extension NIOAsyncTestingChannel.BufferState: @unchecked Sendable {} |
728 | | |
729 | | // Synchronous options are never Sendable. |
730 | | @available(*, unavailable) |
731 | | extension NIOAsyncTestingChannel.SynchronousOptions: Sendable {} |
732 | | |
733 | | #endif // canImport(Dispatch) |