Coverage Report

Created: 2026-06-01 06:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/swift-nio/Sources/NIOCore/Channel.swift
Line
Count
Source
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
15
import NIOConcurrencyHelpers
16
17
/// The core `Channel` methods that are for internal use of the `Channel` implementation only.
18
///
19
/// - warning: If you are not implementing a custom `Channel` type, you should never call any of these.
20
///
21
/// - Note: All methods must be called from the `EventLoop` thread.
22
public protocol ChannelCore: AnyObject {
23
    /// Returns the local bound `SocketAddress`.
24
    func localAddress0() throws -> SocketAddress
25
26
    /// Return the connected `SocketAddress`.
27
    func remoteAddress0() throws -> SocketAddress
28
29
    /// Register with the `EventLoop` to receive I/O notifications.
30
    ///
31
    /// - Parameters:
32
    ///   - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
33
    func register0(promise: EventLoopPromise<Void>?)
34
35
    /// Register channel as already connected or bound socket.
36
    /// - Parameters:
37
    ///   - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
38
    func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?)
39
40
    /// Bind to a `SocketAddress`.
41
    ///
42
    /// - Parameters:
43
    ///   - to: The `SocketAddress` to which we should bind the `Channel`.
44
    ///   - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
45
    func bind0(to: SocketAddress, promise: EventLoopPromise<Void>?)
46
47
    /// Connect to a `SocketAddress`.
48
    ///
49
    /// - Parameters:
50
    ///   - to: The `SocketAddress` to which we should connect the `Channel`.
51
    ///   - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
52
    func connect0(to: SocketAddress, promise: EventLoopPromise<Void>?)
53
54
    /// Write the given data to the outbound buffer.
55
    ///
56
    /// - Parameters:
57
    ///   - data: The data to write, wrapped in a `NIOAny`.
58
    ///   - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
59
    func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?)
60
61
    /// Try to flush out all previous written messages that are pending.
62
    func flush0()
63
64
    /// Request that the `Channel` perform a read when data is ready.
65
    func read0()
66
67
    /// Close the `Channel`.
68
    ///
69
    /// - Parameters:
70
    ///   - error: The `Error` which will be used to fail any pending writes.
71
    ///   - mode: The `CloseMode` to apply.
72
    ///   - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
73
    func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?)
74
75
    /// Trigger an outbound event.
76
    ///
77
    /// - Parameters:
78
    ///   - event: The triggered event.
79
    ///   - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
80
    func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?)
81
82
    /// Called when data was read from the `Channel` but it was not consumed by any `ChannelInboundHandler` in the `ChannelPipeline`.
83
    ///
84
    /// - Parameters:
85
    ///   - data: The data that was read, wrapped in a `NIOAny`.
86
    func channelRead0(_ data: NIOAny)
87
88
    /// Called when an inbound error was encountered but was not consumed by any `ChannelInboundHandler` in the `ChannelPipeline`.
89
    ///
90
    /// - Parameters:
91
    ///   - error: The `Error` that was encountered.
92
    func errorCaught0(error: Error)
93
}
94
95
/// A `Channel` is easiest thought of as a network socket. But it can be anything that is capable of I/O operations such
96
/// as read, write, connect, and bind.
97
///
98
/// - Note: All operations on `Channel` are thread-safe.
99
///
100
/// In SwiftNIO, all I/O operations are asynchronous and hence all operations on `Channel` are asynchronous too. This means
101
/// that all I/O operations will return immediately, usually before the work has been completed. The `EventLoopPromise`s
102
/// passed to or returned by the operations are used to retrieve the result of an operation after it has completed.
103
///
104
/// A `Channel` owns its `ChannelPipeline` which handles all I/O events and requests associated with the `Channel`.
105
public protocol Channel: AnyObject, ChannelOutboundInvoker, _NIOPreconcurrencySendable {
106
    /// The `Channel`'s `ByteBuffer` allocator. This is _the only_ supported way of allocating `ByteBuffer`s to be used with this `Channel`.
107
    var allocator: ByteBufferAllocator { get }
108
109
    /// The `closeFuture` will fire when the `Channel` has been closed.
110
    ///
111
    /// - Important: This future should never be failed: it signals when the channel has been closed, and this action should not fail,
112
    ///              regardless of whether the close happenned cleanly or not.
113
    ///              If you are interested in any errors thrown during `close` to diagnose any unclean channel closures, you
114
    ///              should instead use the future returned from ``ChannelOutboundInvoker/close(mode:file:line:)-7hlgf``
115
    ///              or pass a promise via ``ChannelOutboundInvoker/close(mode:promise:)``.
116
    var closeFuture: EventLoopFuture<Void> { get }
117
118
    /// The `ChannelPipeline` which handles all I/O events and requests associated with this `Channel`.
119
    var pipeline: ChannelPipeline { get }
120
121
    /// The local `SocketAddress`.
122
    var localAddress: SocketAddress? { get }
123
124
    /// The remote peer's `SocketAddress`.
125
    ///
126
    /// If we end up accepting an already-closed connection, the kernel can end up in a place
127
    /// where it has no remote address to give us. In this situation, `remoteAddress` will be
128
    /// `nil`. It can also be `nil` in cases where it isn't representable in SocketAddress, e.g. if
129
    /// we're talking over a vsock.
130
    var remoteAddress: SocketAddress? { get }
131
132
    /// `Channel`s are hierarchical and might have a parent `Channel`. `Channel` hierarchies are in use for certain
133
    /// protocols such as HTTP/2.
134
    var parent: Channel? { get }
135
136
    /// Set `option` to `value` on this `Channel`.
137
    func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void>
138
139
    /// Get the value of `option` for this `Channel`.
140
    func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value>
141
142
    /// Returns if this `Channel` is currently writable.
143
    var isWritable: Bool { get }
144
145
    /// Returns if this `Channel` is currently active. Active is defined as the period of time after the
146
    /// `channelActive` and before `channelInactive` has fired. The main use for this is to know if `channelActive`
147
    /// or `channelInactive` can be expected next when `handlerAdded` was received.
148
    var isActive: Bool { get }
149
150
    /// Reach out to the `_ChannelCore`.
151
    ///
152
    /// - warning: Unsafe, this is for use in NIO's core only.
153
    var _channelCore: ChannelCore { get }
154
155
    /// Returns a view of the `Channel` exposing synchronous versions of `setOption` and `getOption`.
156
    /// The default implementation returns `nil`, and `Channel` implementations must opt in to
157
    /// support this behavior.
158
    var syncOptions: NIOSynchronousChannelOptions? { get }
159
160
    /// Write data into the `Channel`, automatically wrapping with `NIOAny`.
161
    ///
162
    /// - seealso: `ChannelOutboundInvoker.write`.
163
    @preconcurrency
164
    func write<T: Sendable>(_ any: T) -> EventLoopFuture<Void>
165
166
    /// Write data into the `Channel`, automatically wrapping with `NIOAny`.
167
    ///
168
    /// - seealso: `ChannelOutboundInvoker.write`.
169
    @preconcurrency
170
    func write<T: Sendable>(_ any: T, promise: EventLoopPromise<Void>?)
171
172
    /// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`.
173
    ///
174
    /// - seealso: `ChannelOutboundInvoker.writeAndFlush`.
175
    @preconcurrency
176
    func writeAndFlush<T: Sendable>(_ any: T) -> EventLoopFuture<Void>
177
178
    /// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`.
179
    ///
180
    /// - seealso: `ChannelOutboundInvoker.writeAndFlush`.
181
    @preconcurrency
182
    func writeAndFlush<T: Sendable>(_ any: T, promise: EventLoopPromise<Void>?)
183
}
184
185
extension Channel {
186
    /// Default implementation: `NIOSynchronousChannelOptions` are not supported.
187
0
    public var syncOptions: NIOSynchronousChannelOptions? {
188
0
        nil
189
0
    }
190
}
191
192
public protocol NIOSynchronousChannelOptions {
193
    /// Set `option` to `value` on this `Channel`.
194
    ///
195
    /// - Important: Must be called on the `EventLoop` the `Channel` is running on.
196
    func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws
197
198
    /// Get the value of `option` for this `Channel`.
199
    ///
200
    /// - Important: Must be called on the `EventLoop` the `Channel` is running on.
201
    func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value
202
}
203
204
/// Default implementations which will start on the head of the `ChannelPipeline`.
205
extension Channel {
206
207
0
    public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
208
0
        pipeline.bind(to: address, promise: promise)
209
0
    }
210
211
0
    public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
212
0
        pipeline.connect(to: address, promise: promise)
213
0
    }
214
215
    @available(
216
        *,
217
        deprecated,
218
        message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
219
    )
220
0
    public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
221
0
        pipeline.write(data, promise: promise)
222
0
    }
223
224
0
    public func write<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) {
225
0
        pipeline.write(data, promise: promise)
226
0
    }
227
228
0
    public func flush() {
229
0
        pipeline.flush()
230
0
    }
231
232
    @available(
233
        *,
234
        deprecated,
235
        message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
236
    )
237
0
    public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
238
0
        pipeline.writeAndFlush(data, promise: promise)
239
0
    }
240
241
0
    public func writeAndFlush<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) {
242
0
        pipeline.writeAndFlush(data, promise: promise)
243
0
    }
244
245
0
    public func read() {
246
0
        pipeline.read()
247
0
    }
248
249
254k
    public func close(mode: CloseMode = .all, promise: EventLoopPromise<Void>?) {
250
254k
        pipeline.close(mode: mode, promise: promise)
251
254k
    }
252
253
68.2k
    public func register(promise: EventLoopPromise<Void>?) {
254
68.2k
        pipeline.register(promise: promise)
255
68.2k
    }
256
257
0
    public func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
258
0
        promise?.fail(ChannelError._operationUnsupported)
259
0
    }
260
261
    @preconcurrency
262
0
    public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise<Void>?) {
263
0
        pipeline.triggerUserOutboundEvent(event, promise: promise)
264
0
    }
265
}
266
267
/// Provides special extension to make writing data to the `Channel` easier by removing the need to wrap data in `NIOAny` manually.
268
extension Channel {
269
270
    /// Write data into the `Channel`.
271
    ///
272
    /// - seealso: `ChannelOutboundInvoker.write`.
273
    @preconcurrency
274
0
    public func write<T: Sendable>(_ any: T) -> EventLoopFuture<Void> {
275
0
        let promise = self.eventLoop.makePromise(of: Void.self)
276
0
        self.write(any, promise: promise)
277
0
        return promise.futureResult
278
0
    }
279
280
    /// Write and flush data into the `Channel`.
281
    ///
282
    /// - seealso: `ChannelOutboundInvoker.writeAndFlush`.
283
    @preconcurrency
284
0
    public func writeAndFlush<T: Sendable>(_ any: T) -> EventLoopFuture<Void> {
285
0
        let promise = self.eventLoop.makePromise(of: Void.self)
286
0
        self.writeAndFlush(any, promise: promise)
287
0
        return promise.futureResult
288
0
    }
289
}
290
291
extension ChannelCore {
292
    /// Unwraps the given `NIOAny` as a specific concrete type.
293
    ///
294
    /// This method is intended for use when writing custom `ChannelCore` implementations.
295
    /// This can safely be called in methods like `write0` to extract data from the `NIOAny`
296
    /// provided in those cases.
297
    ///
298
    /// Note that if the unwrap fails, this will cause a runtime trap. `ChannelCore`
299
    /// implementations should be concrete about what types they support writing. If multiple
300
    /// types are supported, consider using a tagged union to store the type information like
301
    /// NIO's own `IOData`, which will minimise the amount of runtime type checking.
302
    ///
303
    /// - Parameters:
304
    ///   - data: The `NIOAny` to unwrap.
305
    ///   - as: The type to extract from the `NIOAny`.
306
    /// - Returns: The content of the `NIOAny`.
307
    @inlinable
308
0
    public func unwrapData<T>(_ data: NIOAny, as: T.Type = T.self) -> T {
309
0
        data.forceAs()
310
0
    }
311
312
    /// Attempts to unwrap the given `NIOAny` as a specific concrete type.
313
    ///
314
    /// This method is intended for use when writing custom `ChannelCore` implementations.
315
    /// This can safely be called in methods like `write0` to extract data from the `NIOAny`
316
    /// provided in those cases.
317
    ///
318
    /// If the unwrap fails, this will return `nil`. `ChannelCore` implementations should almost
319
    /// always support only one runtime type, so in general they should avoid using this and prefer
320
    /// using `unwrapData` instead. This method exists for rare use-cases where tolerating type
321
    /// mismatches is acceptable.
322
    ///
323
    /// - Parameters:
324
    ///   - data: The `NIOAny` to unwrap.
325
    ///   - as: The type to extract from the `NIOAny`.
326
    /// - Returns: The content of the `NIOAny`, or `nil` if the type is incorrect.
327
    /// - warning: If you are implementing a `ChannelCore`, you should use `unwrapData` unless you
328
    ///     are doing something _extremely_ unusual.
329
    @inlinable
330
0
    public func tryUnwrapData<T>(_ data: NIOAny, as: T.Type = T.self) -> T? {
331
0
        data.tryAs()
332
0
    }
333
334
    /// Removes the `ChannelHandler`s from the `ChannelPipeline` belonging to `channel`, and
335
    /// closes that `ChannelPipeline`.
336
    ///
337
    /// This method is intended for use when writing custom `ChannelCore` implementations.
338
    /// This can be called from `close0` to tear down the `ChannelPipeline` when closure is
339
    /// complete.
340
    ///
341
    /// - Parameters:
342
    ///   - channel: The `Channel` whose `ChannelPipeline` will be closed.
343
    @available(*, deprecated, renamed: "removeHandlers(pipeline:)")
344
0
    public func removeHandlers(channel: Channel) {
345
0
        self.removeHandlers(pipeline: channel.pipeline)
346
0
    }
347
348
    /// Removes the `ChannelHandler`s from the `ChannelPipeline` `pipeline`, and
349
    /// closes that `ChannelPipeline`.
350
    ///
351
    /// This method is intended for use when writing custom `ChannelCore` implementations.
352
    /// This can be called from `close0` to tear down the `ChannelPipeline` when closure is
353
    /// complete.
354
    ///
355
    /// - Parameters:
356
    ///   - pipeline: The `ChannelPipline` to be closed.
357
12.9k
    public func removeHandlers(pipeline: ChannelPipeline) {
358
12.9k
        pipeline.removeHandlers()
359
12.9k
    }
360
}
361
362
/// An error that can occur on `Channel` operations.
363
public enum ChannelError: Error {
364
    /// Tried to connect on a `Channel` that is already connecting.
365
    case connectPending
366
367
    /// Connect operation timed out
368
    case connectTimeout(TimeAmount)
369
370
    /// Unsupported operation triggered on a `Channel`. For example `connect` on a `ServerSocketChannel`.
371
    case operationUnsupported
372
373
    /// An I/O operation (e.g. read/write/flush) called on a channel that is already closed.
374
    case ioOnClosedChannel
375
376
    /// Close was called on a channel that is already closed.
377
    case alreadyClosed
378
379
    /// Output-side of the channel is closed.
380
    case outputClosed
381
382
    /// Input-side of the channel is closed.
383
    case inputClosed
384
385
    /// A read operation reached end-of-file. This usually means the remote peer closed the socket but it's still
386
    /// open locally.
387
    case eof
388
389
    /// A `DatagramChannel` `write` was made with a buffer that is larger than the MTU for the connection, and so the
390
    /// datagram was not written. Either shorten the datagram or manually fragment, and then try again.
391
    case writeMessageTooLarge
392
393
    /// A `DatagramChannel` `write` was made with an address that was not reachable and so could not be delivered.
394
    case writeHostUnreachable
395
396
    /// The local address of the `Channel` could not be determined.
397
    case unknownLocalAddress
398
399
    /// The address family of the multicast group was not valid for this `Channel`.
400
    case badMulticastGroupAddressFamily
401
402
    /// The address family of the provided multicast group join is not valid for this `Channel`.
403
    case badInterfaceAddressFamily
404
405
    /// An attempt was made to join a multicast group that does not correspond to a multicast
406
    /// address.
407
    case illegalMulticastAddress(SocketAddress)
408
409
    #if !os(WASI)
410
    /// Multicast is not supported on Interface
411
    @available(*, deprecated, renamed: "NIOMulticastNotSupportedError")
412
    case multicastNotSupported(NIONetworkInterface)
413
    #endif
414
415
    /// An operation that was inappropriate given the current `Channel` state was attempted.
416
    case inappropriateOperationForState
417
418
    /// An attempt was made to remove a ChannelHandler that is not removable.
419
    case unremovableHandler
420
}
421
422
extension ChannelError {
423
    // 'any Error' is unconditionally boxed, avoid allocating per use by statically boxing them.
424
    static let _alreadyClosed: any Error = ChannelError.alreadyClosed
425
    static let _inputClosed: any Error = ChannelError.inputClosed
426
    @usableFromInline
427
    static let _ioOnClosedChannel: any Error = ChannelError.ioOnClosedChannel
428
    static let _operationUnsupported: any Error = ChannelError.operationUnsupported
429
    static let _outputClosed: any Error = ChannelError.outputClosed
430
    static let _unremovableHandler: any Error = ChannelError.unremovableHandler
431
}
432
433
extension ChannelError: Equatable {}
434
435
extension ChannelError: CustomStringConvertible {
436
0
    public var description: String {
437
0
        switch self {
438
0
        case .connectPending:
439
0
            "Connect pending"
440
0
        case let .connectTimeout(value):
441
0
            "Connect timeout (\(value))"
442
0
        case .operationUnsupported:
443
0
            "Operation unsupported"
444
0
        case .ioOnClosedChannel:
445
0
            "I/O on closed channel"
446
0
        case .alreadyClosed:
447
0
            "Already closed"
448
0
        case .outputClosed:
449
0
            "Output closed"
450
0
        case .inputClosed:
451
0
            "Input closed"
452
0
        case .eof:
453
0
            "End of file"
454
0
        case .writeMessageTooLarge:
455
0
            "Write message too large"
456
0
        case .writeHostUnreachable:
457
0
            "Write host unreachable"
458
0
        case .unknownLocalAddress:
459
0
            "Unknown local address"
460
0
        case .badMulticastGroupAddressFamily:
461
0
            "Bad multicast group address family"
462
0
        case .badInterfaceAddressFamily:
463
0
            "Bad interface address family"
464
0
        case let .illegalMulticastAddress(address):
465
0
            "Illegal multicast address \(address)"
466
        #if !os(WASI)
467
0
        case let .multicastNotSupported(interface):
468
0
            "Multicast not supported on interface \(interface)"
469
        #endif
470
0
        case .inappropriateOperationForState:
471
0
            "Inappropriate operation for state"
472
0
        case .unremovableHandler:
473
0
            "Unremovable handler"
474
0
        }
475
0
    }
476
}
477
478
/// The removal of a `ChannelHandler` using `ChannelPipeline.removeHandler` has been attempted more than once.
479
public struct NIOAttemptedToRemoveHandlerMultipleTimesError: Error {}
480
481
public enum DatagramChannelError: Sendable {
482
    public struct WriteOnUnconnectedSocketWithoutAddress: Error {
483
0
        public init() {}
484
    }
485
486
    public struct WriteOnConnectedSocketWithInvalidAddress: Error {
487
        let envelopeRemoteAddress: SocketAddress
488
        let connectedRemoteAddress: SocketAddress
489
490
        public init(
491
            envelopeRemoteAddress: SocketAddress,
492
            connectedRemoteAddress: SocketAddress
493
0
        ) {
494
0
            self.envelopeRemoteAddress = envelopeRemoteAddress
495
0
            self.connectedRemoteAddress = connectedRemoteAddress
496
0
        }
497
    }
498
}
499
500
/// An `Channel` related event that is passed through the `ChannelPipeline` to notify the user.
501
public enum ChannelEvent: Equatable, Sendable {
502
    /// `ChannelOptions.allowRemoteHalfClosure` is `true` and input portion of the `Channel` was closed.
503
    case inputClosed
504
    /// Output portion of the `Channel` was closed.
505
    case outputClosed
506
}
507
508
/// A `Channel` user event that is sent when the `Channel` has been asked to quiesce.
509
///
510
/// The action(s) that should be taken after receiving this event are both application and protocol dependent. If the
511
/// protocol supports a notion of requests and responses, it might make sense to stop accepting new requests but finish
512
/// processing the request currently in flight.
513
public struct ChannelShouldQuiesceEvent: Sendable {
514
0
    public init() {
515
0
    }
516
}