Coverage Report

Created: 2026-03-12 06:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/swift-nio/Sources/NIOCore/ChannelPipeline.swift
Line
Count
Source
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2017-2024 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
15
/// A list of `ChannelHandler`s that handle or intercept inbound events and outbound operations of a
16
/// `Channel`. `ChannelPipeline` implements an advanced form of the Intercepting Filter pattern
17
/// to give a user full control over how an event is handled and how the `ChannelHandler`s in a pipeline
18
/// interact with each other.
19
///
20
/// # Creation of a pipeline
21
///
22
/// Each `Channel` has its own `ChannelPipeline` and it is created automatically when a new `Channel` is created.
23
///
24
/// # How an event flows in a pipeline
25
///
26
/// The following diagram describes how I/O events are typically processed by `ChannelHandler`s in a `ChannelPipeline`.
27
/// An I/O event is handled by either a `ChannelInboundHandler` or a `ChannelOutboundHandler`
28
/// and is forwarded to the next handler in the `ChannelPipeline` by calling the event propagation methods defined in
29
/// `ChannelHandlerContext`, such as `ChannelHandlerContext.fireChannelRead` and
30
/// `ChannelHandlerContext.write`.
31
///
32
/// ```
33
///                                                    I/O Request
34
///                                                    via `Channel` or
35
///                                                    `ChannelHandlerContext`
36
///                                                      |
37
///  +---------------------------------------------------+---------------+
38
///  |                           ChannelPipeline         |               |
39
///  |                                TAIL              \|/              |
40
///  |    +---------------------+            +-----------+----------+    |
41
///  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
42
///  |    +----------+----------+            +-----------+----------+    |
43
///  |              /|\                                  |               |
44
///  |               |                                  \|/              |
45
///  |    +----------+----------+            +-----------+----------+    |
46
///  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
47
///  |    +----------+----------+            +-----------+----------+    |
48
///  |              /|\                                  .               |
49
///  |               .                                   .               |
50
///  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
51
///  |        [ method call]                       [method call]         |
52
///  |               .                                   .               |
53
///  |               .                                  \|/              |
54
///  |    +----------+----------+            +-----------+----------+    |
55
///  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
56
///  |    +----------+----------+            +-----------+----------+    |
57
///  |              /|\                                  |               |
58
///  |               |                                  \|/              |
59
///  |    +----------+----------+            +-----------+----------+    |
60
///  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
61
///  |    +----------+----------+            +-----------+----------+    |
62
///  |              /|\             HEAD                 |               |
63
///  +---------------+-----------------------------------+---------------+
64
///                  |                                  \|/
65
///  +---------------+-----------------------------------+---------------+
66
///  |               |                                   |               |
67
///  |       [ Socket.read ]                    [ Socket.write ]         |
68
///  |                                                                   |
69
///  |  SwiftNIO Internal I/O Threads (Transport Implementation)         |
70
///  +-------------------------------------------------------------------+
71
/// ```
72
///
73
/// An inbound event is handled by the inbound handlers in the head-to-tail direction as shown on the left side of the
74
/// diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the
75
/// diagram. The inbound data is often read from a remote peer via the actual input operation such as
76
/// `Socket.read`. If an inbound event goes beyond the tail inbound handler, it is discarded
77
/// silently, or logged if it needs your attention.
78
///
79
/// An outbound event is handled by the outbound handlers in the tail-to-head direction as shown on the right side of the
80
/// diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests.
81
/// If an outbound event goes beyond the head outbound handler, it is handled by an I/O thread associated with the
82
/// `Channel`. The I/O thread often performs the actual output operation such as `Socket.write`.
83
///
84
///
85
/// For example, let us assume that we created the following pipeline:
86
///
87
/// ```
88
/// ChannelPipeline p = ...
89
/// let future = p.add(name: "1", handler: InboundHandlerA()).flatMap {
90
///   p.add(name: "2", handler: InboundHandlerB())
91
/// }.flatMap {
92
///   p.add(name: "3", handler: OutboundHandlerA())
93
/// }.flatMap {
94
///   p.add(name: "4", handler: OutboundHandlerB())
95
/// }.flatMap {
96
///   p.add(name: "5", handler: InboundOutboundHandlerX())
97
/// }
98
/// // Handle the future as well ....
99
/// ```
100
///
101
/// In the example above, a class whose name starts with `Inbound` is an inbound handler.
102
/// A class whose name starts with `Outbound` is an outbound handler.
103
///
104
/// In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound.
105
/// When an event goes outbound, the order is 5, 4, 3, 2, 1.  On top of this principle, `ChannelPipeline` skips
106
/// the evaluation of certain handlers to shorten the stack depth:
107
///
108
/// - 3 and 4 don't implement `ChannelInboundHandler`, and therefore the actual evaluation order of an inbound event will be: 1, 2, and 5.
109
/// - 1 and 2 don't implement `ChannelOutboundHandler`, and therefore the actual evaluation order of a outbound event will be: 5, 4, and 3.
110
/// - If 5 implements both `ChannelInboundHandler` and `ChannelOutboundHandler`, the evaluation order of an inbound and a outbound event could be 125 and 543 respectively.
111
///
112
/// - Note: Handlers may choose not to propagate messages down the pipeline immediately.  For example a handler may need to wait
113
/// for additional data before sending a protocol event to the next handler in the pipeline.  Due to this you can't assume that later handlers
114
/// in the pipeline will receive the same number of events as were sent, or that events of different types will arrive in the same order.
115
/// For example - a user event could overtake a data event if a handler is aggregating data events before propagating but immediately
116
/// propagating user events.
117
///
118
/// # Forwarding an event to the next handler
119
///
120
/// As you might noticed in the diagram above, a handler has to invoke the event propagation methods in
121
/// `ChannelHandlerContext` to forward an event to its next handler.
122
/// Those methods include:
123
///
124
/// - Inbound event propagation methods defined in `ChannelInboundInvoker`
125
/// - Outbound event propagation methods defined in `ChannelOutboundInvoker`.
126
///
127
/// # Building a pipeline
128
///
129
/// A user is supposed to have one or more `ChannelHandler`s in a `ChannelPipeline` to receive I/O events (e.g. read) and
130
/// to request I/O operations (e.g. write and close).  For example, a typical server will have the following handlers
131
/// in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the
132
/// protocol and business logic:
133
///
134
/// - Protocol Decoder - translates binary data (e.g. `ByteBuffer`) into a struct / class
135
/// - Protocol Encoder - translates a struct / class into binary data (e.g. `ByteBuffer`)
136
/// - Business Logic Handler - performs the actual business logic (e.g. database access)
137
///
138
/// # Thread safety
139
///
140
/// A `ChannelHandler` can be added or removed at any time because a `ChannelPipeline` is thread safe.
141
public final class ChannelPipeline: ChannelInvoker {
142
    private var head: Optional<ChannelHandlerContext>
143
    private var tail: Optional<ChannelHandlerContext>
144
145
33.8k
    private var idx: Int = 0
146
33.8k
    internal private(set) var destroyed: Bool = false
147
148
    /// The `EventLoop` that is used by the underlying `Channel`.
149
    public let eventLoop: EventLoop
150
151
    /// The `Channel` that this `ChannelPipeline` belongs to.
152
    ///
153
    /// - Note: This will be nil after the channel has closed
154
    private var _channel: Optional<Channel>
155
156
    /// The `Channel` that this `ChannelPipeline` belongs to.
157
    @usableFromInline
158
30.3M
    internal var channel: Channel {
159
30.3M
        self.eventLoop.assertInEventLoop()
160
30.3M
        assert(self._channel != nil || self.destroyed)
161
30.3M
        return self._channel ?? DeadChannel(pipeline: self)
162
30.3M
    }
163
164
    /// Add a `ChannelHandler` to the `ChannelPipeline`.
165
    ///
166
    /// - Parameters:
167
    ///   - name: the name to use for the `ChannelHandler` when it's added. If none is specified it will generate a name.
168
    ///   - handler: the `ChannelHandler` to add
169
    ///   - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`.
170
    /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was added.
171
    @preconcurrency
172
    public func addHandler(
173
        _ handler: ChannelHandler & Sendable,
174
        name: String? = nil,
175
        position: ChannelPipeline.Position = .last
176
13.1k
    ) -> EventLoopFuture<Void> {
177
13.1k
        let future: EventLoopFuture<Void>
178
13.1k
179
13.1k
        if self.eventLoop.inEventLoop {
180
13.1k
            let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
181
13.1k
            future = self.eventLoop.makeCompletedFuture(
182
13.1k
                self.addHandlerSync(handler, name: name, position: syncPosition)
183
13.1k
            )
184
13.1k
        } else {
185
0
            future = self.eventLoop.submit {
186
0
                let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
187
0
                try self.addHandlerSync(handler, name: name, position: syncPosition).get()
188
0
            }
189
0
        }
190
13.1k
191
13.1k
        return future
192
13.1k
    }
193
194
    /// Synchronously add a `ChannelHandler` to the `ChannelPipeline`.
195
    ///
196
    /// May only be called from on the event loop.
197
    ///
198
    /// - Parameters:
199
    ///   - handler: the `ChannelHandler` to add
200
    ///   - name: the name to use for the `ChannelHandler` when it's added. If none is specified a name will be generated.
201
    ///   - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`.
202
    /// - Returns: the result of adding this handler - either success or failure with an error code if this could not be completed.
203
    fileprivate func addHandlerSync(
204
        _ handler: ChannelHandler,
205
        name: String? = nil,
206
        position: ChannelPipeline.SynchronousOperations.Position = .last
207
13.1k
    ) -> Result<Void, Error> {
208
13.1k
        self.eventLoop.assertInEventLoop()
209
13.1k
210
13.1k
        if self.destroyed {
211
0
            return .failure(ChannelError._ioOnClosedChannel)
212
13.1k
        }
213
13.1k
214
13.1k
        switch position {
215
13.1k
        case .first:
216
0
            return self.add0(
217
0
                name: name,
218
0
                handler: handler,
219
0
                relativeContext: head!,
220
0
                operation: self.add0(context:after:)
Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu_
Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu_yAS_AStYbcfu0_
221
0
            )
222
13.1k
        case .last:
223
13.1k
            return self.add0(
224
13.1k
                name: name,
225
13.1k
                handler: handler,
226
13.1k
                relativeContext: tail!,
227
26.2k
                operation: self.add0(context:before:)
$s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu1_
Line
Count
Source
227
13.1k
                operation: self.add0(context:before:)
$s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu1_yAS_AStYbcfu2_
Line
Count
Source
227
13.1k
                operation: self.add0(context:before:)
228
13.1k
            )
229
13.1k
        case .before(let beforeHandler):
230
0
            return self.add0(
231
0
                name: name,
232
0
                handler: handler,
233
0
                relativeHandler: beforeHandler,
234
0
                operation: self.add0(context:before:)
Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu3_
Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu3_yAS_AStYbcfu4_
235
0
            )
236
13.1k
        case .after(let afterHandler):
237
0
            return self.add0(
238
0
                name: name,
239
0
                handler: handler,
240
0
                relativeHandler: afterHandler,
241
0
                operation: self.add0(context:after:)
Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu5_
Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu5_yAS_AStYbcfu6_
242
0
            )
243
13.1k
        }
244
13.1k
    }
245
246
    /// Synchronously add a `ChannelHandler` to the pipeline, relative to another `ChannelHandler`,
247
    /// where the insertion is done by a specific operation.
248
    ///
249
    /// May only be called from on the event loop.
250
    ///
251
    /// This will search the pipeline for `relativeHandler` and, if it cannot find it, will fail
252
    /// `promise` with `ChannelPipelineError.notFound`.
253
    ///
254
    /// - Parameters:
255
    ///   - name: The name to use for the `ChannelHandler` when its added. If none is specified, a name will be
256
    ///         automatically generated.
257
    ///   - handler: The `ChannelHandler` to add.
258
    ///   - relativeHandler: The `ChannelHandler` already in the `ChannelPipeline` that `handler` will be
259
    ///         inserted relative to.
260
    ///   - operation: A callback that will insert `handler` relative to `relativeHandler`.
261
    /// - Returns: the result of adding this handler - either success or failure with an error code if this could not be completed.
262
    private func add0(
263
        name: String?,
264
        handler: ChannelHandler,
265
        relativeHandler: ChannelHandler,
266
        operation: (ChannelHandlerContext, ChannelHandlerContext) -> Void
267
0
    ) -> Result<Void, Error> {
268
0
        self.eventLoop.assertInEventLoop()
269
0
        if self.destroyed {
270
0
            return .failure(ChannelError._ioOnClosedChannel)
271
0
        }
272
0
273
0
        guard let context = self.contextForPredicate0({ $0.handler === relativeHandler }) else {
274
0
            return .failure(ChannelPipelineError.notFound)
275
0
        }
276
0
277
0
        return self.add0(name: name, handler: handler, relativeContext: context, operation: operation)
278
0
    }
279
280
    /// Synchronously add a `ChannelHandler` to the pipeline, relative to a `ChannelHandlerContext`,
281
    /// where the insertion is done by a specific operation.
282
    ///
283
    /// May only be called from on the event loop.
284
    ///
285
    /// This method is more efficient than the one that takes a `relativeHandler` as it does not need to
286
    /// search the pipeline for the insertion point. It should be used whenever possible.
287
    ///
288
    /// - Parameters:
289
    ///   - name: The name to use for the `ChannelHandler` when its added. If none is specified, a name will be
290
    ///         automatically generated.
291
    ///   - handler: The `ChannelHandler` to add.
292
    ///   - relativeContext: The `ChannelHandlerContext` already in the `ChannelPipeline` that `handler` will be
293
    ///         inserted relative to.
294
    ///   - operation: A callback that will insert `handler` relative to `relativeHandler`.
295
    /// - Returns: the result of adding this handler - either success or failure with an error code if this could not be completed.
296
    private func add0(
297
        name: String?,
298
        handler: ChannelHandler,
299
        relativeContext: ChannelHandlerContext,
300
        operation: (ChannelHandlerContext, ChannelHandlerContext) -> Void
301
13.1k
    ) -> Result<Void, Error> {
302
13.1k
        self.eventLoop.assertInEventLoop()
303
13.1k
304
13.1k
        if self.destroyed {
305
0
            return .failure(ChannelError._ioOnClosedChannel)
306
13.1k
        }
307
13.1k
308
13.1k
        let context = ChannelHandlerContext(name: name ?? nextName(), handler: handler, pipeline: self)
309
13.1k
        operation(context, relativeContext)
310
13.1k
311
13.1k
        context.invokeHandlerAdded()
312
13.1k
        return .success(())
313
13.1k
    }
314
315
    /// Synchronously add a single new `ChannelHandlerContext` after one that currently exists in the
316
    /// pipeline.
317
    ///
318
    /// Must be called from within the event loop thread, as it synchronously manipulates the
319
    /// `ChannelHandlerContext`s on the `ChannelPipeline`.
320
    ///
321
    /// - Parameters:
322
    ///   - new: The `ChannelHandlerContext` to add to the pipeline.
323
    ///   - existing: The `ChannelHandlerContext` that `new` will be added after.
324
0
    private func add0(context new: ChannelHandlerContext, after existing: ChannelHandlerContext) {
325
0
        self.eventLoop.assertInEventLoop()
326
0
327
0
        let next = existing.next
328
0
        new.prev = existing
329
0
        new.next = next
330
0
        existing.next = new
331
0
        next?.prev = new
332
0
    }
333
334
    /// Synchronously add a single new `ChannelHandlerContext` before one that currently exists in the
335
    /// pipeline.
336
    ///
337
    /// Must be called from within the event loop thread, as it synchronously manipulates the
338
    /// `ChannelHandlerContext`s on the `ChannelPipeline`.
339
    ///
340
    /// - Parameters:
341
    ///   - new: The `ChannelHandlerContext` to add to the pipeline.
342
    ///   - existing: The `ChannelHandlerContext` that `new` will be added before.
343
13.1k
    private func add0(context new: ChannelHandlerContext, before existing: ChannelHandlerContext) {
344
13.1k
        self.eventLoop.assertInEventLoop()
345
13.1k
346
13.1k
        let prev = existing.prev
347
13.1k
        new.prev = prev
348
13.1k
        new.next = existing
349
13.1k
        existing.prev = new
350
13.1k
        prev?.next = new
351
13.1k
    }
352
353
    /// Remove a `ChannelHandler` from the `ChannelPipeline`.
354
    ///
355
    /// - Parameters:
356
    ///   - handler: the `ChannelHandler` to remove.
357
    /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
358
    @preconcurrency
359
0
    public func removeHandler(_ handler: RemovableChannelHandler & Sendable) -> EventLoopFuture<Void> {
360
0
        let promise = self.eventLoop.makePromise(of: Void.self)
361
0
        self.removeHandler(handler, promise: promise)
362
0
        return promise.futureResult
363
0
    }
364
365
    /// Remove a `ChannelHandler` from the `ChannelPipeline`.
366
    ///
367
    /// - Parameters:
368
    ///   - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
369
    /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
370
0
    public func removeHandler(name: String) -> EventLoopFuture<Void> {
371
0
        let promise = self.eventLoop.makePromise(of: Void.self)
372
0
        self.removeHandler(name: name, promise: promise)
373
0
        return promise.futureResult
374
0
    }
375
376
    /// Remove a `ChannelHandler` from the `ChannelPipeline`.
377
    ///
378
    /// - Parameters:
379
    ///   - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
380
    /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
381
    @available(
382
        *,
383
        deprecated,
384
        message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
385
    )
386
0
    public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> {
387
0
        let promise = self.eventLoop.makePromise(of: Void.self)
388
0
        self.removeHandler(context: context, promise: promise)
389
0
        return promise.futureResult
390
0
    }
391
392
    /// Remove a `ChannelHandler` from the `ChannelPipeline`.
393
    ///
394
    /// - Parameters:
395
    ///   - handler: the `ChannelHandler` to remove.
396
    ///   - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
397
    @preconcurrency
398
0
    public func removeHandler(_ handler: RemovableChannelHandler & Sendable, promise: EventLoopPromise<Void>?) {
399
0
        @Sendable
400
0
        func removeHandler0() {
401
0
            self.syncOperations.removeHandler(handler, promise: promise)
402
0
        }
403
0
404
0
        if self.eventLoop.inEventLoop {
405
0
            removeHandler0()
406
0
        } else {
407
0
            self.eventLoop.execute {
408
0
                removeHandler0()
409
0
            }
410
0
        }
411
0
    }
412
413
    /// Remove a `ChannelHandler` from the `ChannelPipeline`.
414
    ///
415
    /// - Parameters:
416
    ///   - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
417
    ///   - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
418
0
    public func removeHandler(name: String, promise: EventLoopPromise<Void>?) {
419
0
        @Sendable
420
0
        func removeHandler0() {
421
0
            self.syncOperations.removeHandler(name: name, promise: promise)
422
0
        }
423
0
424
0
        if self.eventLoop.inEventLoop {
425
0
            removeHandler0()
426
0
        } else {
427
0
            self.eventLoop.execute {
428
0
                removeHandler0()
429
0
            }
430
0
        }
431
0
    }
432
433
    /// Remove a `ChannelHandler` from the `ChannelPipeline`.
434
    ///
435
    /// - Parameters:
436
    ///   - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
437
    ///   - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
438
    @available(
439
        *,
440
        deprecated,
441
        message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
442
    )
443
0
    public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
444
0
        let sendableView = context.sendableView
445
0
446
0
        guard sendableView.channelHandlerIsRemovable else {
447
0
            promise?.fail(ChannelError._unremovableHandler)
448
0
            return
449
0
        }
450
0
451
0
        @Sendable
452
0
        func removeHandler0() {
453
0
            sendableView.wrappedValue.startUserTriggeredRemoval(promise: promise)
454
0
        }
455
0
456
0
        if self.eventLoop.inEventLoop {
457
0
            removeHandler0()
458
0
        } else {
459
0
            self.eventLoop.execute {
460
0
                removeHandler0()
461
0
            }
462
0
        }
463
0
    }
464
465
    /// Returns the `ChannelHandlerContext` that belongs to a `ChannelHandler`.
466
    ///
467
    /// - Parameters:
468
    ///   - handler: the `ChannelHandler` for which the `ChannelHandlerContext` should be returned
469
    /// - Returns: the `EventLoopFuture` which will be notified once the the operation completes.
470
    @available(
471
        *,
472
        deprecated,
473
        message: "This method is not strict concurrency safe. Prefer .syncOperations.context(handler:)"
474
    )
475
    @preconcurrency
476
0
    public func context(handler: ChannelHandler & Sendable) -> EventLoopFuture<ChannelHandlerContext> {
477
0
        let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)
478
0
479
0
        if self.eventLoop.inEventLoop {
480
0
            promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler))
481
0
        } else {
482
0
            self.eventLoop.execute {
483
0
                promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler))
484
0
            }
485
0
        }
486
0
487
0
        return promise.futureResult
488
0
    }
489
490
    /// Synchronously returns the `ChannelHandlerContext` that belongs to a `ChannelHandler`.
491
    ///
492
    /// - Important: This must be called on the `EventLoop`.
493
    /// - Parameters:
494
    ///   - handler: the `ChannelHandler` for which the `ChannelHandlerContext` should be returned
495
    /// - Returns: the `ChannelHandlerContext` that belongs to the `ChannelHandler`, if one exists.
496
0
    fileprivate func contextSync(handler: ChannelHandler) -> Result<ChannelHandlerContext, Error> {
497
0
        self._contextSync({ $0.handler === handler })
498
0
    }
499
500
    /// Returns the `ChannelHandlerContext` that belongs to a `ChannelHandler`.
501
    ///
502
    /// - Parameters:
503
    ///   - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
504
    /// - Returns: the `EventLoopFuture` which will be notified once the the operation completes.
505
0
    public func context(name: String) -> EventLoopFuture<ChannelHandlerContext> {
506
0
        let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)
507
0
508
0
        if self.eventLoop.inEventLoop {
509
0
            promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name))
510
0
        } else {
511
0
            self.eventLoop.execute {
512
0
                promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name))
513
0
            }
514
0
        }
515
0
516
0
        return promise.futureResult
517
0
    }
518
519
    /// Synchronously finds and returns the `ChannelHandlerContext` that belongs to the
520
    /// `ChannelHandler` with the given name.
521
    ///
522
    /// - Important: This must be called on the `EventLoop`.
523
    /// - Parameter name: The name of the `ChannelHandler` to find.
524
    /// - Returns: the `ChannelHandlerContext` that belongs to the `ChannelHandler`, if one exists.
525
0
    fileprivate func contextSync(name: String) -> Result<ChannelHandlerContext, Error> {
526
0
        self._contextSync({ $0.name == name })
527
0
    }
528
529
    /// Returns the `ChannelHandlerContext` that belongs to a `ChannelHandler` of the given type.
530
    ///
531
    /// If multiple channel handlers of the same type are present in the pipeline, returns the context
532
    /// belonging to the first such handler.
533
    ///
534
    /// - Parameters:
535
    ///   - handlerType: The type of the handler to search for.
536
    /// - Returns: the `EventLoopFuture` which will be notified once the the operation completes.
537
    @inlinable
538
    @preconcurrency
539
    public func context<Handler: ChannelHandler & _NIOCoreSendableMetatype>(
540
        handlerType: Handler.Type
541
0
    ) -> EventLoopFuture<ChannelHandlerContext> {
542
0
        let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)
543
0
544
0
        if self.eventLoop.inEventLoop {
545
0
            promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType))
546
0
        } else {
547
0
            self.eventLoop.execute {
548
0
                promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType))
549
0
            }
550
0
        }
551
0
552
0
        return promise.futureResult
553
0
    }
554
555
    /// Returns if the ``ChannelHandler`` of the given type is contained in the pipeline.
556
    ///
557
    /// - Parameters:
558
    ///   - type: The type of the handler.
559
    /// - Returns: An ``EventLoopFuture`` that is succeeded if a handler of the given type is contained in the pipeline. Otherwise
560
    /// the future will be failed with an error.
561
    @inlinable
562
    @preconcurrency
563
    public func containsHandler<Handler: ChannelHandler & _NIOCoreSendableMetatype>(
564
        type: Handler.Type
565
0
    ) -> EventLoopFuture<Void> {
566
0
        self.handler(type: type).map { _ in () }
567
0
    }
568
569
    /// Returns if the ``ChannelHandler`` of the given type is contained in the pipeline.
570
    ///
571
    /// - Parameters:
572
    ///   - name: The name of the handler.
573
    /// - Returns: An ``EventLoopFuture`` that is succeeded if a handler of the given type is contained in the pipeline. Otherwise
574
    /// the future will be failed with an error.
575
    @inlinable
576
0
    public func containsHandler(name: String) -> EventLoopFuture<Void> {
577
0
        self.context(name: name).map { _ in () }
578
0
    }
579
580
    /// Synchronously finds and returns the `ChannelHandlerContext` that belongs to the first
581
    /// `ChannelHandler` of the given type.
582
    ///
583
    /// - Important: This must be called on the `EventLoop`.
584
    /// - Parameter handlerType: The type of handler to search for.
585
    /// - Returns: the `ChannelHandlerContext` that belongs to the `ChannelHandler`, if one exists.
586
    @inlinable  // should be fileprivate
587
    internal func _contextSync<Handler: ChannelHandler>(
588
        handlerType: Handler.Type
589
0
    ) -> Result<ChannelHandlerContext, Error> {
590
0
        self._contextSync({ $0.handler is Handler })
591
0
    }
592
593
    /// Synchronously finds a `ChannelHandlerContext` in the `ChannelPipeline`.
594
    /// - Important: This must be called on the `EventLoop`.
595
    @usableFromInline  // should be fileprivate
596
0
    internal func _contextSync(_ body: (ChannelHandlerContext) -> Bool) -> Result<ChannelHandlerContext, Error> {
597
0
        self.eventLoop.assertInEventLoop()
598
0
599
0
        if let context = self.contextForPredicate0(body) {
600
0
            return .success(context)
601
0
        } else {
602
0
            return .failure(ChannelPipelineError.notFound)
603
0
        }
604
0
    }
605
606
    /// Returns a `ChannelHandlerContext` which matches.
607
    ///
608
    /// This skips head and tail (as these are internal and should not be accessible by the user).
609
    ///
610
    /// - Parameters:
611
    ///   - body: The predicate to execute per `ChannelHandlerContext` in the `ChannelPipeline`.
612
    /// - Returns: The first `ChannelHandlerContext` that matches or `nil` if none did.
613
0
    private func contextForPredicate0(_ body: (ChannelHandlerContext) -> Bool) -> ChannelHandlerContext? {
614
0
        var curCtx: ChannelHandlerContext? = self.head?.next
615
0
        while let context = curCtx, context !== self.tail {
616
0
            if body(context) {
617
0
                return context
618
0
            }
619
0
            curCtx = context.next
620
0
        }
621
0
622
0
        return nil
623
0
    }
624
625
    /// Remove a `ChannelHandlerContext` from the `ChannelPipeline` directly without going through the
626
    /// `RemovableChannelHandler` API. This must only be used to clear the pipeline on `Channel` tear down and
627
    /// as a result of the `leavePipeline` call in the `RemovableChannelHandler` API.
628
18.6k
    internal func removeHandlerFromPipeline(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
629
18.6k
        self.eventLoop.assertInEventLoop()
630
18.6k
631
18.6k
        let nextCtx = context.next
632
18.6k
        let prevCtx = context.prev
633
18.6k
634
18.6k
        if let prevCtx = prevCtx {
635
12.4k
            prevCtx.next = nextCtx
636
12.4k
        }
637
18.6k
        if let nextCtx = nextCtx {
638
6.21k
            nextCtx.prev = prevCtx
639
6.21k
        }
640
18.6k
641
18.6k
        context.invokeHandlerRemoved()
642
18.6k
        promise?.succeed(())
643
18.6k
644
18.6k
        // We need to keep the current node alive until after the callout in case the user uses the context.
645
18.6k
        context.next = nil
646
18.6k
        context.prev = nil
647
18.6k
    }
648
649
    /// Returns the next name to use for a `ChannelHandler`.
650
13.1k
    private func nextName() -> String {
651
13.1k
        self.eventLoop.assertInEventLoop()
652
13.1k
653
13.1k
        let name = "handler\(idx)"
654
13.1k
        idx += 1
655
13.1k
        return name
656
13.1k
    }
657
658
    /// Remove all the `ChannelHandler`s from the `ChannelPipeline` and destroy these.
659
    ///
660
    /// This method must only be called from within the `EventLoop`. It should only be called from a `ChannelCore`
661
    /// implementation. Once called, the `ChannelPipeline` is no longer active and cannot be used again.
662
6.21k
    func removeHandlers() {
663
6.21k
        self.eventLoop.assertInEventLoop()
664
6.21k
665
6.21k
        if let head = self.head {
666
12.4k
            while let context = head.next {
667
12.4k
                removeHandlerFromPipeline(context: context, promise: nil)
668
12.4k
            }
669
6.21k
            removeHandlerFromPipeline(context: self.head!, promise: nil)
670
6.21k
        }
671
6.21k
        self.head = nil
672
6.21k
        self.tail = nil
673
6.21k
674
6.21k
        self.destroyed = true
675
6.21k
        self._channel = nil
676
6.21k
    }
677
678
    // Just delegate to the head and tail context
679
0
    public func fireChannelRegistered() {
680
0
        if eventLoop.inEventLoop {
681
0
            fireChannelRegistered0()
682
0
        } else {
683
0
            eventLoop.execute {
684
0
                self.fireChannelRegistered0()
685
0
            }
686
0
        }
687
0
    }
688
689
0
    public func fireChannelUnregistered() {
690
0
        if eventLoop.inEventLoop {
691
0
            fireChannelUnregistered0()
692
0
        } else {
693
0
            eventLoop.execute {
694
0
                self.fireChannelUnregistered0()
695
0
            }
696
0
        }
697
0
    }
698
699
0
    public func fireChannelInactive() {
700
0
        if eventLoop.inEventLoop {
701
0
            fireChannelInactive0()
702
0
        } else {
703
0
            eventLoop.execute {
704
0
                self.fireChannelInactive0()
705
0
            }
706
0
        }
707
0
    }
708
709
0
    public func fireChannelActive() {
710
0
        if eventLoop.inEventLoop {
711
0
            fireChannelActive0()
712
0
        } else {
713
0
            eventLoop.execute {
714
0
                self.fireChannelActive0()
715
0
            }
716
0
        }
717
0
    }
718
719
    @available(
720
        *,
721
        deprecated,
722
        message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
723
    )
724
0
    public func fireChannelRead(_ data: NIOAny) {
725
0
        if eventLoop.inEventLoop {
726
0
            _fireChannelRead0(data)
727
0
        } else {
728
0
            // This is unsafe, but necessary.
729
0
            let unsafeTransfer = UnsafeTransfer(data)
730
0
            eventLoop.execute {
731
0
                self._fireChannelRead0(unsafeTransfer.wrappedValue)
732
0
            }
733
0
        }
734
0
    }
735
736
    @inlinable
737
0
    public func fireChannelRead<T: Sendable>(_ data: T) {
738
0
        if eventLoop.inEventLoop {
739
0
            _fireChannelRead0(NIOAny(data))
740
0
        } else {
741
0
            eventLoop.execute {
742
0
                self._fireChannelRead0(NIOAny(data))
743
0
            }
744
0
        }
745
0
    }
746
747
0
    public func fireChannelReadComplete() {
748
0
        if eventLoop.inEventLoop {
749
0
            fireChannelReadComplete0()
750
0
        } else {
751
0
            eventLoop.execute {
752
0
                self.fireChannelReadComplete0()
753
0
            }
754
0
        }
755
0
    }
756
757
0
    public func fireChannelWritabilityChanged() {
758
0
        if eventLoop.inEventLoop {
759
0
            fireChannelWritabilityChanged0()
760
0
        } else {
761
0
            eventLoop.execute {
762
0
                self.fireChannelWritabilityChanged0()
763
0
            }
764
0
        }
765
0
    }
766
767
    @preconcurrency
768
0
    public func fireUserInboundEventTriggered(_ event: Any & Sendable) {
769
0
        if eventLoop.inEventLoop {
770
0
            fireUserInboundEventTriggered0(event)
771
0
        } else {
772
0
            eventLoop.execute {
773
0
                self.fireUserInboundEventTriggered0(event)
774
0
            }
775
0
        }
776
0
    }
777
778
0
    public func fireErrorCaught(_ error: Error) {
779
0
        if eventLoop.inEventLoop {
780
0
            fireErrorCaught0(error: error)
781
0
        } else {
782
0
            eventLoop.execute {
783
0
                self.fireErrorCaught0(error: error)
784
0
            }
785
0
        }
786
0
    }
787
788
33.8k
    public func close(mode: CloseMode = .all, promise: EventLoopPromise<Void>?) {
789
33.8k
        if eventLoop.inEventLoop {
790
33.8k
            close0(mode: mode, promise: promise)
791
33.8k
        } else {
792
0
            eventLoop.execute {
793
0
                self.close0(mode: mode, promise: promise)
794
0
            }
795
0
        }
796
33.8k
    }
797
798
0
    public func flush() {
799
0
        if eventLoop.inEventLoop {
800
0
            flush0()
801
0
        } else {
802
0
            eventLoop.execute {
803
0
                self.flush0()
804
0
            }
805
0
        }
806
0
    }
807
808
0
    public func read() {
809
0
        if eventLoop.inEventLoop {
810
0
            read0()
811
0
        } else {
812
0
            eventLoop.execute {
813
0
                self.read0()
814
0
            }
815
0
        }
816
0
    }
817
818
    @available(
819
        *,
820
        deprecated,
821
        message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
822
    )
823
0
    public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
824
0
        if eventLoop.inEventLoop {
825
0
            _write0(data, promise: promise)
826
0
        } else {
827
0
            // This is unsafe, but unavoidable.
828
0
            let unsafeTransfer = UnsafeTransfer(data)
829
0
            eventLoop.execute {
830
0
                self._write0(unsafeTransfer.wrappedValue, promise: promise)
831
0
            }
832
0
        }
833
0
    }
834
835
    @inlinable
836
0
    public func write<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) {
837
0
        if eventLoop.inEventLoop {
838
0
            _write0(NIOAny(data), promise: promise)
839
0
        } else {
840
0
            eventLoop.execute {
841
0
                self._write0(NIOAny(data), promise: promise)
842
0
            }
843
0
        }
844
0
    }
845
846
    @available(
847
        *,
848
        deprecated,
849
        message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
850
    )
851
0
    public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
852
0
        if eventLoop.inEventLoop {
853
0
            _writeAndFlush0(data, promise: promise)
854
0
        } else {
855
0
            // This is unsafe, but unavoidable.
856
0
            let unsafeTransfer = UnsafeTransfer(data)
857
0
            eventLoop.execute {
858
0
                self._writeAndFlush0(unsafeTransfer.wrappedValue, promise: promise)
859
0
            }
860
0
        }
861
0
    }
862
863
    @inlinable
864
0
    public func writeAndFlush<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) {
865
0
        if eventLoop.inEventLoop {
866
0
            _writeAndFlush0(NIOAny(data), promise: promise)
867
0
        } else {
868
0
            eventLoop.execute {
869
0
                self._writeAndFlush0(NIOAny(data), promise: promise)
870
0
            }
871
0
        }
872
0
    }
873
874
0
    public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
875
0
        if eventLoop.inEventLoop {
876
0
            bind0(to: address, promise: promise)
877
0
        } else {
878
0
            eventLoop.execute {
879
0
                self.bind0(to: address, promise: promise)
880
0
            }
881
0
        }
882
0
    }
883
884
0
    public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
885
0
        if eventLoop.inEventLoop {
886
0
            connect0(to: address, promise: promise)
887
0
        } else {
888
0
            eventLoop.execute {
889
0
                self.connect0(to: address, promise: promise)
890
0
            }
891
0
        }
892
0
    }
893
894
33.8k
    public func register(promise: EventLoopPromise<Void>?) {
895
33.8k
        if eventLoop.inEventLoop {
896
33.8k
            register0(promise: promise)
897
33.8k
        } else {
898
0
            eventLoop.execute {
899
0
                self.register0(promise: promise)
900
0
            }
901
0
        }
902
33.8k
    }
903
904
    @preconcurrency
905
0
    public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise<Void>?) {
906
0
        if eventLoop.inEventLoop {
907
0
            triggerUserOutboundEvent0(event, promise: promise)
908
0
        } else {
909
0
            eventLoop.execute {
910
0
                self.triggerUserOutboundEvent0(event, promise: promise)
911
0
            }
912
0
        }
913
0
    }
914
915
    // These methods are expected to only be called from within the EventLoop
916
917
26.2k
    private var firstOutboundCtx: ChannelHandlerContext? {
918
26.2k
        self.tail?.prev
919
26.2k
    }
920
921
65.6k
    private var firstInboundCtx: ChannelHandlerContext? {
922
65.6k
        self.head?.next
923
65.6k
    }
924
925
13.1k
    private func close0(mode: CloseMode, promise: EventLoopPromise<Void>?) {
926
13.1k
        if let firstOutboundCtx = firstOutboundCtx {
927
13.1k
            firstOutboundCtx.invokeClose(mode: mode, promise: promise)
928
13.1k
        } else {
929
0
            promise?.fail(ChannelError._alreadyClosed)
930
0
        }
931
13.1k
    }
932
933
0
    private func flush0() {
934
0
        if let firstOutboundCtx = firstOutboundCtx {
935
0
            firstOutboundCtx.invokeFlush()
936
0
        }
937
0
    }
938
939
0
    private func read0() {
940
0
        if let firstOutboundCtx = firstOutboundCtx {
941
0
            firstOutboundCtx.invokeRead()
942
0
        }
943
0
    }
944
945
0
    @usableFromInline func _write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
946
0
        if let firstOutboundCtx = firstOutboundCtx {
947
0
            firstOutboundCtx.invokeWrite(data, promise: promise)
948
0
        } else {
949
0
            promise?.fail(ChannelError._ioOnClosedChannel)
950
0
        }
951
0
    }
952
953
0
    @usableFromInline func _writeAndFlush0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
954
0
        if let firstOutboundCtx = firstOutboundCtx {
955
0
            firstOutboundCtx.invokeWriteAndFlush(data, promise: promise)
956
0
        } else {
957
0
            promise?.fail(ChannelError._ioOnClosedChannel)
958
0
        }
959
0
    }
960
961
0
    private func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
962
0
        if let firstOutboundCtx = firstOutboundCtx {
963
0
            firstOutboundCtx.invokeBind(to: address, promise: promise)
964
0
        } else {
965
0
            promise?.fail(ChannelError._ioOnClosedChannel)
966
0
        }
967
0
    }
968
969
0
    private func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
970
0
        if let firstOutboundCtx = firstOutboundCtx {
971
0
            firstOutboundCtx.invokeConnect(to: address, promise: promise)
972
0
        } else {
973
0
            promise?.fail(ChannelError._ioOnClosedChannel)
974
0
        }
975
0
    }
976
977
13.1k
    private func register0(promise: EventLoopPromise<Void>?) {
978
13.1k
        if let firstOutboundCtx = firstOutboundCtx {
979
13.1k
            firstOutboundCtx.invokeRegister(promise: promise)
980
13.1k
        } else {
981
0
            promise?.fail(ChannelError._ioOnClosedChannel)
982
0
        }
983
13.1k
    }
984
985
0
    private func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
986
0
        if let firstOutboundCtx = firstOutboundCtx {
987
0
            firstOutboundCtx.invokeTriggerUserOutboundEvent(event, promise: promise)
988
0
        } else {
989
0
            promise?.fail(ChannelError._ioOnClosedChannel)
990
0
        }
991
0
    }
992
993
13.1k
    private func fireChannelRegistered0() {
994
13.1k
        if let firstInboundCtx = firstInboundCtx {
995
13.1k
            firstInboundCtx.invokeChannelRegistered()
996
13.1k
        }
997
13.1k
    }
998
999
13.1k
    private func fireChannelUnregistered0() {
1000
13.1k
        if let firstInboundCtx = firstInboundCtx {
1001
13.1k
            firstInboundCtx.invokeChannelUnregistered()
1002
13.1k
        }
1003
13.1k
    }
1004
1005
13.1k
    private func fireChannelInactive0() {
1006
13.1k
        if let firstInboundCtx = firstInboundCtx {
1007
13.1k
            firstInboundCtx.invokeChannelInactive()
1008
13.1k
        }
1009
13.1k
    }
1010
1011
0
    private func fireChannelActive0() {
1012
0
        if let firstInboundCtx = firstInboundCtx {
1013
0
            firstInboundCtx.invokeChannelActive()
1014
0
        }
1015
0
    }
1016
1017
13.1k
    @usableFromInline func _fireChannelRead0(_ data: NIOAny) {
1018
13.1k
        if let firstInboundCtx = firstInboundCtx {
1019
13.1k
            firstInboundCtx.invokeChannelRead(data)
1020
13.1k
        }
1021
13.1k
    }
1022
1023
13.1k
    private func fireChannelReadComplete0() {
1024
13.1k
        if let firstInboundCtx = firstInboundCtx {
1025
13.1k
            firstInboundCtx.invokeChannelReadComplete()
1026
13.1k
        }
1027
13.1k
    }
1028
1029
0
    private func fireChannelWritabilityChanged0() {
1030
0
        if let firstInboundCtx = firstInboundCtx {
1031
0
            firstInboundCtx.invokeChannelWritabilityChanged()
1032
0
        }
1033
0
    }
1034
1035
0
    private func fireUserInboundEventTriggered0(_ event: Any) {
1036
0
        if let firstInboundCtx = firstInboundCtx {
1037
0
            firstInboundCtx.invokeUserInboundEventTriggered(event)
1038
0
        }
1039
0
    }
1040
1041
0
    private func fireErrorCaught0(error: Error) {
1042
0
        assert((error as? ChannelError).map { $0 != .eof } ?? true)
1043
0
        if let firstInboundCtx = firstInboundCtx {
1044
0
            firstInboundCtx.invokeErrorCaught(error)
1045
0
        }
1046
0
    }
1047
1048
0
    private var inEventLoop: Bool {
1049
0
        eventLoop.inEventLoop
1050
0
    }
1051
1052
    /// Create `ChannelPipeline` for a given `Channel`. This method should never be called by the end-user
1053
    /// directly: it is only intended for use with custom `Channel` implementations. Users should always use
1054
    /// `channel.pipeline` to access the `ChannelPipeline` for a `Channel`.
1055
    ///
1056
    /// - Parameters:
1057
    ///    - channel: The `Channel` this `ChannelPipeline` is created for.
1058
13.1k
    public init(channel: Channel) {
1059
13.1k
        self._channel = channel
1060
13.1k
        self.eventLoop = channel.eventLoop
1061
13.1k
        self.head = nil  // we need to initialise these to `nil` so we can use `self` in the lines below
1062
13.1k
        self.tail = nil  // we need to initialise these to `nil` so we can use `self` in the lines below
1063
13.1k
1064
13.1k
        self.head = ChannelHandlerContext(
1065
13.1k
            name: HeadChannelHandler.name,
1066
13.1k
            handler: HeadChannelHandler.sharedInstance,
1067
13.1k
            pipeline: self
1068
13.1k
        )
1069
13.1k
        self.tail = ChannelHandlerContext(
1070
13.1k
            name: TailChannelHandler.name,
1071
13.1k
            handler: TailChannelHandler.sharedInstance,
1072
13.1k
            pipeline: self
1073
13.1k
        )
1074
13.1k
        self.head?.next = self.tail
1075
13.1k
        self.tail?.prev = self.head
1076
13.1k
    }
1077
}
1078
1079
extension ChannelPipeline: @unchecked Sendable {}
1080
1081
extension ChannelPipeline {
1082
    /// Adds the provided channel handlers to the pipeline in the order given, taking account
1083
    /// of the behaviour of `ChannelHandler.add(first:)`.
1084
    ///
1085
    /// - Parameters:
1086
    ///   - handlers: The array of `ChannelHandler`s to be added.
1087
    ///   - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
1088
    ///
1089
    /// - Returns: A future that will be completed when all of the supplied `ChannelHandler`s were added.
1090
    @preconcurrency
1091
    public func addHandlers(
1092
        _ handlers: [ChannelHandler & Sendable],
1093
        position: ChannelPipeline.Position = .last
1094
0
    ) -> EventLoopFuture<Void> {
1095
0
        let future: EventLoopFuture<Void>
1096
0
1097
0
        if self.eventLoop.inEventLoop {
1098
0
            future = self.eventLoop.makeCompletedFuture(self.addHandlersSync(handlers, position: position))
1099
0
        } else {
1100
0
            future = self.eventLoop.submit {
1101
0
                try self.addHandlersSync(handlers, position: position).get()
1102
0
            }
1103
0
        }
1104
0
1105
0
        return future
1106
0
    }
1107
1108
    /// Adds the provided channel handlers to the pipeline in the order given, taking account
1109
    /// of the behaviour of `ChannelHandler.add(first:)`.
1110
    ///
1111
    /// - Parameters:
1112
    ///   - handlers: One or more `ChannelHandler`s to be added.
1113
    ///   - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
1114
    ///
1115
    /// - Returns: A future that will be completed when all of the supplied `ChannelHandler`s were added.
1116
    @preconcurrency
1117
    public func addHandlers(
1118
        _ handlers: (ChannelHandler & Sendable)...,
1119
        position: ChannelPipeline.Position = .last
1120
0
    ) -> EventLoopFuture<Void> {
1121
0
        self.addHandlers(handlers, position: position)
1122
0
    }
1123
1124
    /// Synchronously adds the provided `ChannelHandler`s to the pipeline in the order given, taking
1125
    /// account of the behaviour of `ChannelHandler.add(first:)`.
1126
    ///
1127
    /// - Important: Must be called on the `EventLoop`.
1128
    /// - Parameters:
1129
    ///   - handlers: The array of `ChannelHandler`s to add.
1130
    ///   - position: The position in the `ChannelPipeline` to add the handlers.
1131
    /// - Returns: A result representing whether the handlers were added or not.
1132
    fileprivate func addHandlersSync(
1133
        _ handlers: [ChannelHandler & Sendable],
1134
        position: ChannelPipeline.Position
1135
0
    ) -> Result<Void, Error> {
1136
0
        let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
1137
0
        switch syncPosition {
1138
0
        case .first, .after:
1139
0
            return self._addHandlersSync(handlers.reversed(), position: syncPosition)
1140
0
        case .last, .before:
1141
0
            return self._addHandlersSync(handlers, position: syncPosition)
1142
0
        }
1143
0
    }
1144
1145
    /// Synchronously adds the provided `ChannelHandler`s to the pipeline in the order given, taking
1146
    /// account of the behaviour of `ChannelHandler.add(first:)`.
1147
    ///
1148
    /// This duplicate of the above method exists to avoid needing to rebox the array of existentials
1149
    /// from any (ChannelHandler & Sendable) to any ChannelHandler.
1150
    ///
1151
    /// - Important: Must be called on the `EventLoop`.
1152
    /// - Parameters:
1153
    ///   - handlers: The array of `ChannelHandler`s to add.
1154
    ///   - position: The position in the `ChannelPipeline` to add the handlers.
1155
    /// - Returns: A result representing whether the handlers were added or not.
1156
    fileprivate func addHandlersSyncNotSendable(
1157
        _ handlers: [ChannelHandler],
1158
        position: ChannelPipeline.SynchronousOperations.Position
1159
13.1k
    ) -> Result<Void, Error> {
1160
13.1k
        switch position {
1161
13.1k
        case .first, .after:
1162
0
            return self._addHandlersSyncNotSendable(handlers.reversed(), position: position)
1163
13.1k
        case .last, .before:
1164
13.1k
            return self._addHandlersSyncNotSendable(handlers, position: position)
1165
13.1k
        }
1166
13.1k
    }
1167
1168
    /// Synchronously adds a sequence of `ChannelHandlers` to the pipeline at the given position.
1169
    ///
1170
    /// - Important: Must be called on the `EventLoop`.
1171
    /// - Parameters:
1172
    ///   - handlers: A sequence of handlers to add.
1173
    ///   - position: The position in the `ChannelPipeline` to add the handlers.
1174
    /// - Returns: A result representing whether the handlers were added or not.
1175
    private func _addHandlersSync<Handlers: Sequence>(
1176
        _ handlers: Handlers,
1177
        position: ChannelPipeline.SynchronousOperations.Position
1178
0
    ) -> Result<Void, Error> where Handlers.Element == ChannelHandler & Sendable {
1179
0
        self.eventLoop.assertInEventLoop()
1180
0
1181
0
        for handler in handlers {
1182
0
            let result = self.addHandlerSync(handler, position: position)
1183
0
            switch result {
1184
0
            case .success:
1185
0
                ()
1186
0
            case .failure:
1187
0
                return result
1188
0
            }
1189
0
        }
1190
0
1191
0
        return .success(())
1192
0
    }
1193
1194
    /// Synchronously adds a sequence of `ChannelHandlers` to the pipeline at the given position.
1195
    ///
1196
    /// This duplicate of the above method exists to avoid needing to rebox the array of existentials
1197
    /// from any (ChannelHandler & Sendable) to any ChannelHandler.
1198
    ///
1199
    /// - Important: Must be called on the `EventLoop`.
1200
    /// - Parameters:
1201
    ///   - handlers: A sequence of handlers to add.
1202
    ///   - position: The position in the `ChannelPipeline` to add the handlers.
1203
    /// - Returns: A result representing whether the handlers were added or not.
1204
    private func _addHandlersSyncNotSendable<Handlers: Sequence>(
1205
        _ handlers: Handlers,
1206
        position: ChannelPipeline.SynchronousOperations.Position
1207
13.1k
    ) -> Result<Void, Error> where Handlers.Element == ChannelHandler {
1208
13.1k
        self.eventLoop.assertInEventLoop()
1209
13.1k
1210
13.1k
        for handler in handlers {
1211
0
            let result = self.addHandlerSync(handler, position: position)
1212
0
            switch result {
1213
0
            case .success:
1214
0
                ()
1215
0
            case .failure:
1216
0
                return result
1217
0
            }
1218
13.1k
        }
1219
13.1k
1220
13.1k
        return .success(())
1221
13.1k
    }
1222
}
1223
1224
// MARK: - Synchronous View
1225
1226
extension ChannelPipeline {
1227
    /// A view of a `ChannelPipeline` which may be used to invoke synchronous operations.
1228
    ///
1229
    /// All functions **must** be called from the pipeline's event loop.
1230
    public struct SynchronousOperations {
1231
        @usableFromInline
1232
        internal let _pipeline: ChannelPipeline
1233
1234
10.6M
        fileprivate init(pipeline: ChannelPipeline) {
1235
10.6M
            self._pipeline = pipeline
1236
10.6M
        }
1237
1238
        /// The `EventLoop` of the `Channel` this synchronous operations view corresponds to.
1239
583k
        public var eventLoop: EventLoop {
1240
583k
            self._pipeline.eventLoop
1241
583k
        }
1242
1243
        /// Add a handler to the pipeline.
1244
        ///
1245
        /// - Important: This *must* be called on the event loop.
1246
        /// - Parameters:
1247
        ///   - handler: The handler to add.
1248
        ///   - name: The name to use for the `ChannelHandler` when it's added. If no name is specified the one will be generated.
1249
        ///   - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`.
1250
        public func addHandler(
1251
            _ handler: ChannelHandler,
1252
            name: String? = nil,
1253
            position: ChannelPipeline.SynchronousOperations.Position = .last
1254
0
        ) throws {
1255
0
            try self._pipeline.addHandlerSync(handler, name: name, position: position).get()
1256
0
        }
1257
1258
        /// Add a handler to the pipeline.
1259
        ///
1260
        /// - Important: This *must* be called on the event loop.
1261
        /// - Parameters:
1262
        ///   - handler: The handler to add.
1263
        ///   - name: The name to use for the `ChannelHandler` when it's added. If no name is specified the one will be generated.
1264
        ///   - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`.
1265
        @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead")
1266
        @_disfavoredOverload
1267
        public func addHandler(
1268
            _ handler: ChannelHandler,
1269
            name: String? = nil,
1270
            position: ChannelPipeline.Position = .last
1271
0
        ) throws {
1272
0
            let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
1273
0
            try self._pipeline.addHandlerSync(handler, name: name, position: syncPosition).get()
1274
0
        }
1275
1276
        /// Add an array of handlers to the pipeline.
1277
        ///
1278
        /// - Important: This *must* be called on the event loop.
1279
        /// - Parameters:
1280
        ///   - handlers: The handlers to add.
1281
        ///   - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
1282
        public func addHandlers(
1283
            _ handlers: [ChannelHandler],
1284
            position: ChannelPipeline.SynchronousOperations.Position = .last
1285
13.1k
        ) throws {
1286
13.1k
            try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get()
1287
13.1k
        }
1288
1289
        /// Add an array of handlers to the pipeline.
1290
        ///
1291
        /// - Important: This *must* be called on the event loop.
1292
        /// - Parameters:
1293
        ///   - handlers: The handlers to add.
1294
        ///   - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
1295
        @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead")
1296
        @_disfavoredOverload
1297
        public func addHandlers(
1298
            _ handlers: [ChannelHandler],
1299
            position: ChannelPipeline.Position = .last
1300
0
        ) throws {
1301
0
            let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
1302
0
            try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get()
1303
0
        }
1304
1305
        /// Add one or more handlers to the pipeline.
1306
        ///
1307
        /// - Important: This *must* be called on the event loop.
1308
        /// - Parameters:
1309
        ///   - handlers: The handlers to add.
1310
        ///   - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
1311
        public func addHandlers(
1312
            _ handlers: ChannelHandler...,
1313
            position: ChannelPipeline.SynchronousOperations.Position = .last
1314
0
        ) throws {
1315
0
            try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get()
1316
0
        }
1317
1318
        /// Add one or more handlers to the pipeline.
1319
        ///
1320
        /// - Important: This *must* be called on the event loop.
1321
        /// - Parameters:
1322
        ///   - handlers: The handlers to add.
1323
        ///   - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
1324
        @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead")
1325
        @_disfavoredOverload
1326
        public func addHandlers(
1327
            _ handlers: ChannelHandler...,
1328
            position: ChannelPipeline.Position = .last
1329
0
        ) throws {
1330
0
            let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
1331
0
            try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get()
1332
0
        }
1333
1334
        /// Remove a `ChannelHandler` from the `ChannelPipeline`.
1335
        ///
1336
        /// - Parameters:
1337
        ///   - handler: the `ChannelHandler` to remove.
1338
        /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
1339
0
        public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture<Void> {
1340
0
            let promise = self.eventLoop.makePromise(of: Void.self)
1341
0
            self.removeHandler(handler, promise: promise)
1342
0
            return promise.futureResult
1343
0
        }
1344
1345
        /// Remove a ``ChannelHandler`` from the ``ChannelPipeline``.
1346
        ///
1347
        /// - Parameters:
1348
        ///   - handler: the ``ChannelHandler`` to remove.
1349
        ///   - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
1350
0
        public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise<Void>?) {
1351
0
            switch self._pipeline.contextSync(handler: handler) {
1352
0
            case .success(let context):
1353
0
                self.removeHandler(context: context, promise: promise)
1354
0
            case .failure(let error):
1355
0
                promise?.fail(error)
1356
0
            }
1357
0
        }
1358
1359
        /// Remove a `ChannelHandler` from the `ChannelPipeline`.
1360
        ///
1361
        /// - Parameters:
1362
        ///   - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
1363
        /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
1364
0
        public func removeHandler(name: String) -> EventLoopFuture<Void> {
1365
0
            let promise = self.eventLoop.makePromise(of: Void.self)
1366
0
            self.removeHandler(name: name, promise: promise)
1367
0
            return promise.futureResult
1368
0
        }
1369
1370
        /// Remove a ``ChannelHandler`` from the ``ChannelPipeline``.
1371
        ///
1372
        /// - Parameters:
1373
        ///   - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
1374
        ///   - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
1375
0
        public func removeHandler(name: String, promise: EventLoopPromise<Void>?) {
1376
0
            switch self._pipeline.contextSync(name: name) {
1377
0
            case .success(let context):
1378
0
                self.removeHandler(context: context, promise: promise)
1379
0
            case .failure(let error):
1380
0
                promise?.fail(error)
1381
0
            }
1382
0
        }
1383
1384
        /// Remove a `ChannelHandler` from the `ChannelPipeline`.
1385
        ///
1386
        /// - Parameters:
1387
        ///   - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
1388
        /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
1389
0
        public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> {
1390
0
            let promise = self.eventLoop.makePromise(of: Void.self)
1391
0
            self.removeHandler(context: context, promise: promise)
1392
0
            return promise.futureResult
1393
0
        }
1394
1395
        /// Remove a `ChannelHandler` from the `ChannelPipeline`.
1396
        ///
1397
        /// - Parameters:
1398
        ///   - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
1399
        ///   - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
1400
0
        public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
1401
0
            if context.handler is RemovableChannelHandler {
1402
0
                context.startUserTriggeredRemoval(promise: promise)
1403
0
            } else {
1404
0
                promise?.fail(ChannelError.unremovableHandler)
1405
0
            }
1406
0
        }
1407
1408
        /// Returns the `ChannelHandlerContext` for the given handler instance if it is in
1409
        /// the `ChannelPipeline`, if it exists.
1410
        ///
1411
        /// - Important: This *must* be called on the event loop.
1412
        /// - Parameter handler: The handler belonging to the context to fetch.
1413
        /// - Returns: The `ChannelHandlerContext` associated with the handler.
1414
0
        public func context(handler: ChannelHandler) throws -> ChannelHandlerContext {
1415
0
            try self._pipeline._contextSync({ $0.handler === handler }).get()
1416
0
        }
1417
1418
        /// Returns the `ChannelHandlerContext` for the handler with the given name, if one exists.
1419
        ///
1420
        /// - Important: This *must* be called on the event loop.
1421
        /// - Parameter name: The name of the handler whose context is being fetched.
1422
        /// - Returns: The `ChannelHandlerContext` associated with the handler.
1423
0
        public func context(name: String) throws -> ChannelHandlerContext {
1424
0
            try self._pipeline.contextSync(name: name).get()
1425
0
        }
1426
1427
        /// Returns the `ChannelHandlerContext` for the handler of given type, if one exists.
1428
        ///
1429
        /// - Important: This *must* be called on the event loop.
1430
        /// - Parameter handlerType: The type of the handler to search for.
1431
        /// - Returns: The `ChannelHandlerContext` associated with the handler.
1432
        @inlinable
1433
0
        public func context<Handler: ChannelHandler>(handlerType: Handler.Type) throws -> ChannelHandlerContext {
1434
0
            try self._pipeline._contextSync(handlerType: handlerType).get()
1435
0
        }
1436
1437
        /// Returns the `ChannelHandler` of the given type from the `ChannelPipeline`, if it exists.
1438
        ///
1439
        /// - Important: This *must* be called on the event loop.
1440
        /// - Returns: A `ChannelHandler` of the given type if one exists in the `ChannelPipeline`.
1441
        @inlinable
1442
0
        public func handler<Handler: ChannelHandler>(type _: Handler.Type) throws -> Handler {
1443
0
            try self._pipeline._handlerSync(type: Handler.self).get()
1444
0
        }
1445
1446
        /// Fires `channelRegistered` from the head to the tail.
1447
        ///
1448
        /// This method should typically only be called by `Channel` implementations directly.
1449
13.1k
        public func fireChannelRegistered() {
1450
13.1k
            self.eventLoop.assertInEventLoop()
1451
13.1k
            self._pipeline.fireChannelRegistered0()
1452
13.1k
        }
1453
1454
        /// Fires `channelUnregistered` from the head to the tail.
1455
        ///
1456
        /// This method should typically only be called by `Channel` implementations directly.
1457
13.1k
        public func fireChannelUnregistered() {
1458
13.1k
            self.eventLoop.assertInEventLoop()
1459
13.1k
            self._pipeline.fireChannelUnregistered0()
1460
13.1k
        }
1461
1462
        /// Fires `channelInactive` from the head to the tail.
1463
        ///
1464
        /// This method should typically only be called by `Channel` implementations directly.
1465
13.1k
        public func fireChannelInactive() {
1466
13.1k
            self.eventLoop.assertInEventLoop()
1467
13.1k
            self._pipeline.fireChannelInactive0()
1468
13.1k
        }
1469
1470
        /// Fires `channelActive` from the head to the tail.
1471
        ///
1472
        /// This method should typically only be called by `Channel` implementations directly.
1473
0
        public func fireChannelActive() {
1474
0
            self.eventLoop.assertInEventLoop()
1475
0
            self._pipeline.fireChannelActive0()
1476
0
        }
1477
1478
        /// Fires `channelRead` from the head to the tail.
1479
        ///
1480
        /// This method should typically only be called by `Channel` implementations directly.
1481
13.1k
        public func fireChannelRead(_ data: NIOAny) {
1482
13.1k
            self.eventLoop.assertInEventLoop()
1483
13.1k
            self._pipeline._fireChannelRead0(data)
1484
13.1k
        }
1485
1486
        /// Fires `channelReadComplete` from the head to the tail.
1487
        ///
1488
        /// This method should typically only be called by `Channel` implementations directly.
1489
13.1k
        public func fireChannelReadComplete() {
1490
13.1k
            self.eventLoop.assertInEventLoop()
1491
13.1k
            self._pipeline.fireChannelReadComplete0()
1492
13.1k
        }
1493
1494
        /// Fires `channelWritabilityChanged` from the head to the tail.
1495
        ///
1496
        /// This method should typically only be called by `Channel` implementations directly.
1497
0
        public func fireChannelWritabilityChanged() {
1498
0
            self.eventLoop.assertInEventLoop()
1499
0
            self._pipeline.fireChannelWritabilityChanged0()
1500
0
        }
1501
1502
        /// Fires `userInboundEventTriggered` from the head to the tail.
1503
        ///
1504
        /// This method should typically only be called by `Channel` implementations directly.
1505
0
        public func fireUserInboundEventTriggered(_ event: Any) {
1506
0
            self.eventLoop.assertInEventLoop()
1507
0
            self._pipeline.fireUserInboundEventTriggered0(event)
1508
0
        }
1509
1510
        /// Fires `errorCaught` from the head to the tail.
1511
        ///
1512
        /// This method should typically only be called by `Channel` implementations directly.
1513
0
        public func fireErrorCaught(_ error: Error) {
1514
0
            self.eventLoop.assertInEventLoop()
1515
0
            self._pipeline.fireErrorCaught0(error: error)
1516
0
        }
1517
1518
        /// Fires `close` from the tail to the head.
1519
        ///
1520
        /// This method should typically only be called by `Channel` implementations directly.
1521
0
        public func close(mode: CloseMode = .all, promise: EventLoopPromise<Void>?) {
1522
0
            self.eventLoop.assertInEventLoop()
1523
0
            self._pipeline.close0(mode: mode, promise: promise)
1524
0
        }
1525
1526
        /// Fires `flush` from the tail to the head.
1527
        ///
1528
        /// This method should typically only be called by `Channel` implementations directly.
1529
0
        public func flush() {
1530
0
            self.eventLoop.assertInEventLoop()
1531
0
            self._pipeline.flush0()
1532
0
        }
1533
1534
        /// Fires `read` from the tail to the head.
1535
        ///
1536
        /// This method should typically only be called by `Channel` implementations directly.
1537
0
        public func read() {
1538
0
            self.eventLoop.assertInEventLoop()
1539
0
            self._pipeline.read0()
1540
0
        }
1541
1542
        /// Fires `write` from the tail to the head.
1543
        ///
1544
        /// This method should typically only be called by `Channel` implementations directly.
1545
0
        public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
1546
0
            self.eventLoop.assertInEventLoop()
1547
0
            self._pipeline._write0(data, promise: promise)
1548
0
        }
1549
1550
        /// Fires `write` from the tail to the head.
1551
        ///
1552
        /// This method should typically only be called by `Channel` implementations directly.
1553
0
        public func write(_ data: NIOAny) -> EventLoopFuture<Void> {
1554
0
            self.eventLoop.assertInEventLoop()
1555
0
            let promise = self.eventLoop.makePromise(of: Void.self)
1556
0
            self._pipeline._write0(data, promise: promise)
1557
0
            return promise.futureResult
1558
0
        }
1559
1560
        /// Fires `writeAndFlush` from the tail to the head.
1561
        ///
1562
        /// This method should typically only be called by `Channel` implementations directly.
1563
0
        public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
1564
0
            self.eventLoop.assertInEventLoop()
1565
0
            self._pipeline._writeAndFlush0(data, promise: promise)
1566
0
        }
1567
1568
        /// Fires `writeAndFlush` from the tail to the head.
1569
        ///
1570
        /// This method should typically only be called by `Channel` implementations directly.
1571
0
        public func writeAndFlush(_ data: NIOAny) -> EventLoopFuture<Void> {
1572
0
            self.eventLoop.assertInEventLoop()
1573
0
            let promise = self.eventLoop.makePromise(of: Void.self)
1574
0
            self._pipeline._writeAndFlush0(data, promise: promise)
1575
0
            return promise.futureResult
1576
0
        }
1577
1578
        /// Fires `bind` from the tail to the head.
1579
        ///
1580
        /// This method should typically only be called by `Channel` implementations directly.
1581
0
        public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1582
0
            self.eventLoop.assertInEventLoop()
1583
0
            self._pipeline.bind0(to: address, promise: promise)
1584
0
        }
1585
1586
        /// Fires `connect` from the tail to the head.
1587
        ///
1588
        /// This method should typically only be called by `Channel` implementations directly.
1589
0
        public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1590
0
            self.eventLoop.assertInEventLoop()
1591
0
            self._pipeline.connect0(to: address, promise: promise)
1592
0
        }
1593
1594
        /// Fires `register` from the tail to the head.
1595
        ///
1596
        /// This method should typically only be called by `Channel` implementations directly.
1597
0
        public func register(promise: EventLoopPromise<Void>?) {
1598
0
            self.eventLoop.assertInEventLoop()
1599
0
            self._pipeline.register0(promise: promise)
1600
0
        }
1601
1602
        /// Fires `triggerUserOutboundEvent` from the tail to the head.
1603
        ///
1604
        /// This method should typically only be called by `Channel` implementations directly.
1605
0
        public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) {
1606
0
            self.eventLoop.assertInEventLoop()
1607
0
            self._pipeline.triggerUserOutboundEvent0(event, promise: promise)
1608
0
        }
1609
1610
        /// Provides scoped access to the underlying transport, if the channel supports it.
1611
        ///
1612
        /// This is an advanced API for reading or manipulating the underlying transport that backs a channel. Users
1613
        /// must not close the transport or invalidate any invariants that the channel relies upon for its operation.
1614
        ///
1615
        /// Not all channels support access to the underlying channel. If the channel does not support this API, the
1616
        /// closure is not called and this function immediately returns `nil`.
1617
        ///
1618
        /// Note that you must call this API with an appropriate closure, or otherwise explicitly specify the correct
1619
        /// transport type prarameter, in order for the closure to be run. Calling this function such that the compiler
1620
        /// infers a type for the transport closure parameter that differs from the channel implementation will result
1621
        /// in the closure not being run and this function will return `nil`.
1622
        ///
1623
        /// For example, for socket-based channels, that expose the underlying socket handle:
1624
        ///
1625
        /// ```swift
1626
        /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable { transport in
1627
        ///     // This closure is called.
1628
        ///     transport == NIOBSDSocketHandle.invalid
1629
        /// }
1630
        ///
1631
        /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable { (_: NIOBSDSocket.Handle) in
1632
        ///     // This closure is called.
1633
        ///     return
1634
        /// }
1635
        ///
1636
        /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable(of: NIOBSDSocket.Handle.self) { _ in
1637
        ///     // This closure is called.
1638
        ///     return
1639
        /// }
1640
        ///
1641
        /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable {
1642
        ///     // This closure is NOT called.
1643
        ///     return
1644
        /// }
1645
        ///
1646
        /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable { (_: Any.self) in
1647
        ///     // This closure is NOT called.
1648
        ///     return
1649
        /// }
1650
        ///
1651
        /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable(of: Any.self) { _ in
1652
        ///     // This closure is NOT called.
1653
        ///     return
1654
        /// }
1655
        /// ```
1656
        ///
1657
        /// - Parameters:
1658
        ///   - type: The expected transport type the channel makes available.
1659
        ///   - body: /// A closure that takes the underlying transport, if the channel supports this operation.
1660
        /// - Returns: The value returned by the closure, or `nil` if the channel does not expose its transport.
1661
        /// - Throws: If there was an error accessing the underlying transport, or an error was thrown by the closure.
1662
        @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *)
1663
        @inlinable
1664
        public func withUnsafeTransportIfAvailable<Transport, Result>(
1665
            of type: Transport.Type = Transport.self,
1666
            _ body: (_ transport: Transport) throws -> Result
1667
0
        ) throws -> Result? {
1668
0
            self.eventLoop.assertInEventLoop()
1669
0
            guard let core = self._pipeline.channel._channelCore as? any NIOTransportAccessibleChannelCore<Transport>
1670
0
            else {
1671
0
                return nil
1672
0
            }
1673
0
            return try core.withUnsafeTransport(body)
1674
0
        }
1675
    }
1676
1677
    /// Returns a view of operations which can be performed synchronously on this pipeline. All
1678
    /// operations **must** be called on the event loop.
1679
10.6M
    public var syncOperations: SynchronousOperations {
1680
10.6M
        SynchronousOperations(pipeline: self)
1681
10.6M
    }
1682
}
1683
1684
@available(*, unavailable)
1685
extension ChannelPipeline.SynchronousOperations: Sendable {}
1686
1687
extension ChannelPipeline {
1688
    /// A `Position` within the `ChannelPipeline` used to insert handlers into the `ChannelPipeline`.
1689
    @preconcurrency
1690
    public enum Position: Sendable {
1691
        /// The first `ChannelHandler` -- the front of the `ChannelPipeline`.
1692
        case first
1693
1694
        /// The last `ChannelHandler` -- the back of the `ChannelPipeline`.
1695
        case last
1696
1697
        /// Before the given `ChannelHandler`.
1698
        case before(ChannelHandler & Sendable)
1699
1700
        /// After the given `ChannelHandler`.
1701
        case after(ChannelHandler & Sendable)
1702
    }
1703
}
1704
1705
extension ChannelPipeline.SynchronousOperations {
1706
    /// A `Position` within the `ChannelPipeline`'s `SynchronousOperations` used to insert non-sendable handlers
1707
    /// into the `ChannelPipeline` at a certain position.
1708
    public enum Position {
1709
        /// The first `ChannelHandler` -- the front of the `ChannelPipeline`.
1710
        case first
1711
1712
        /// The last `ChannelHandler` -- the back of the `ChannelPipeline`.
1713
        case last
1714
1715
        /// Before the given `ChannelHandler`.
1716
        case before(ChannelHandler)
1717
1718
        /// After the given `ChannelHandler`.
1719
        case after(ChannelHandler)
1720
1721
33.8k
        public init(_ position: ChannelPipeline.Position) {
1722
33.8k
            switch position {
1723
33.8k
            case .first:
1724
0
                self = .first
1725
33.8k
            case .last:
1726
33.8k
                self = .last
1727
33.8k
            case .before(let handler):
1728
0
                self = .before(handler)
1729
33.8k
            case .after(let handler):
1730
0
                self = .after(handler)
1731
33.8k
            }
1732
33.8k
        }
1733
    }
1734
}
1735
1736
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
1737
@available(*, unavailable)
1738
extension ChannelPipeline.SynchronousOperations.Position: Sendable {}
1739
1740
/// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation.
1741
final class HeadChannelHandler: _ChannelOutboundHandler, Sendable {
1742
1743
    static let name = "head"
1744
    static let sharedInstance = HeadChannelHandler()
1745
1746
2
    private init() {}
1747
1748
6.21k
    func register(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
1749
6.21k
        context.channel._channelCore.register0(promise: promise)
1750
6.21k
    }
1751
1752
0
    func bind(context: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1753
0
        context.channel._channelCore.bind0(to: address, promise: promise)
1754
0
    }
1755
1756
0
    func connect(context: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1757
0
        context.channel._channelCore.connect0(to: address, promise: promise)
1758
0
    }
1759
1760
0
    func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
1761
0
        context.channel._channelCore.write0(data, promise: promise)
1762
0
    }
1763
1764
0
    func flush(context: ChannelHandlerContext) {
1765
0
        context.channel._channelCore.flush0()
1766
0
    }
1767
1768
6.21k
    func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
1769
6.21k
        context.channel._channelCore.close0(error: mode.error, mode: mode, promise: promise)
1770
6.21k
    }
1771
1772
0
    func read(context: ChannelHandlerContext) {
1773
0
        context.channel._channelCore.read0()
1774
0
    }
1775
1776
0
    func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
1777
0
        context.channel._channelCore.triggerUserOutboundEvent0(event, promise: promise)
1778
0
    }
1779
1780
}
1781
1782
extension CloseMode {
1783
    /// Returns the error to fail outstanding operations writes with.
1784
13.1k
    fileprivate var error: any Error {
1785
13.1k
        switch self {
1786
13.1k
        case .all:
1787
13.1k
            return ChannelError._ioOnClosedChannel
1788
13.1k
        case .output:
1789
0
            return ChannelError._outputClosed
1790
13.1k
        case .input:
1791
0
            return ChannelError._inputClosed
1792
13.1k
        }
1793
13.1k
    }
1794
}
1795
1796
/// Special `ChannelInboundHandler` which will consume all inbound events.
1797
final class TailChannelHandler: _ChannelInboundHandler, Sendable {
1798
1799
    static let name = "tail"
1800
    static let sharedInstance = TailChannelHandler()
1801
1802
2
    private init() {}
1803
1804
6.21k
    func channelRegistered(context: ChannelHandlerContext) {
1805
6.21k
        // Discard
1806
6.21k
    }
1807
1808
6.21k
    func channelUnregistered(context: ChannelHandlerContext) {
1809
6.21k
        // Discard
1810
6.21k
    }
1811
1812
0
    func channelActive(context: ChannelHandlerContext) {
1813
0
        // Discard
1814
0
    }
1815
1816
6.21k
    func channelInactive(context: ChannelHandlerContext) {
1817
6.21k
        // Discard
1818
6.21k
    }
1819
1820
6.21k
    func channelReadComplete(context: ChannelHandlerContext) {
1821
6.21k
        // Discard
1822
6.21k
    }
1823
1824
0
    func channelWritabilityChanged(context: ChannelHandlerContext) {
1825
0
        // Discard
1826
0
    }
1827
1828
0
    func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
1829
0
        // Discard
1830
0
    }
1831
1832
5.74k
    func errorCaught(context: ChannelHandlerContext, error: Error) {
1833
5.74k
        context.channel._channelCore.errorCaught0(error: error)
1834
5.74k
    }
1835
1836
985k
    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
1837
985k
        context.channel._channelCore.channelRead0(data)
1838
985k
    }
1839
}
1840
1841
/// `Error` that is used by the `ChannelPipeline` to inform the user of an error.
1842
public enum ChannelPipelineError: Error {
1843
    /// `ChannelHandler` was already removed.
1844
    case alreadyRemoved
1845
    /// `ChannelHandler` was not found.
1846
    case notFound
1847
}
1848
1849
/// Every `ChannelHandler` has -- when added to a `ChannelPipeline` -- a corresponding `ChannelHandlerContext` which is
1850
/// the way `ChannelHandler`s can interact with other `ChannelHandler`s in the pipeline.
1851
///
1852
/// Most `ChannelHandler`s need to send events through the `ChannelPipeline` which they do by calling the respective
1853
/// method on their `ChannelHandlerContext`. In fact all the `ChannelHandler` default implementations just forward
1854
/// the event using the `ChannelHandlerContext`.
1855
///
1856
/// Many events are instrumental for a `ChannelHandler`'s life-cycle and it is therefore very important to send them
1857
/// at the right point in time. Often, the right behaviour is to react to an event and then forward it to the next
1858
/// `ChannelHandler`.
1859
public final class ChannelHandlerContext: ChannelInvoker {
1860
    // visible for ChannelPipeline to modify
1861
    fileprivate var next: Optional<ChannelHandlerContext>
1862
    fileprivate var prev: Optional<ChannelHandlerContext>
1863
1864
    public let pipeline: ChannelPipeline
1865
1866
30.3M
    public var channel: Channel {
1867
30.3M
        self.pipeline.channel
1868
30.3M
    }
1869
1870
135k
    public var handler: ChannelHandler {
1871
135k
        self.inboundHandler ?? self.outboundHandler!
1872
135k
    }
1873
1874
0
    public var remoteAddress: SocketAddress? {
1875
0
        do {
1876
0
            // Fast-path access to the remoteAddress.
1877
0
            return try self.channel._channelCore.remoteAddress0()
1878
0
        } catch ChannelError.ioOnClosedChannel {
1879
0
            // Channel was closed already but we may still have the address cached so try to access it via the Channel
1880
0
            // so we are able to use it in channelInactive(...) / handlerRemoved(...) methods.
1881
0
            return self.channel.remoteAddress
1882
0
        } catch {
1883
0
            return nil
1884
0
        }
1885
0
    }
1886
1887
0
    public var localAddress: SocketAddress? {
1888
0
        do {
1889
0
            // Fast-path access to the localAddress.
1890
0
            return try self.channel._channelCore.localAddress0()
1891
0
        } catch ChannelError.ioOnClosedChannel {
1892
0
            // Channel was closed already but we may still have the address cached so try to access it via the Channel
1893
0
            // so we are able to use it in channelInactive(...) / handlerRemoved(...) methods.
1894
0
            return self.channel.localAddress
1895
0
        } catch {
1896
0
            return nil
1897
0
        }
1898
0
    }
1899
1900
232M
    public var eventLoop: EventLoop {
1901
232M
        self.pipeline.eventLoop
1902
232M
    }
1903
1904
    public let name: String
1905
    private let inboundHandler: _ChannelInboundHandler?
1906
    private let outboundHandler: _ChannelOutboundHandler?
1907
101k
    private var removeHandlerInvoked = false
1908
101k
    private var userTriggeredRemovalStarted = false
1909
1910
    // Only created from within ChannelPipeline
1911
39.3k
    fileprivate init(name: String, handler: ChannelHandler, pipeline: ChannelPipeline) {
1912
39.3k
        self.name = name
1913
39.3k
        self.pipeline = pipeline
1914
39.3k
        self.inboundHandler = handler as? _ChannelInboundHandler
1915
39.3k
        self.outboundHandler = handler as? _ChannelOutboundHandler
1916
39.3k
        self.next = nil
1917
39.3k
        self.prev = nil
1918
39.3k
        precondition(
1919
39.3k
            self.inboundHandler != nil || self.outboundHandler != nil,
1920
39.3k
            "ChannelHandlers need to either be inbound or outbound"
1921
39.3k
        )
1922
39.3k
    }
1923
1924
    /// Send a `channelRegistered` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`.
1925
    ///
1926
    /// - Note: For correct operation it is very important to forward any `channelRegistered` event using this method at the right point in time, that is usually when received.
1927
0
    public func fireChannelRegistered() {
1928
0
        self.next?.invokeChannelRegistered()
1929
0
    }
1930
1931
    /// Send a `channelUnregistered` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`.
1932
    ///
1933
    /// - Note: For correct operation it is very important to forward any `channelUnregistered` event using this method at the right point in time, that is usually when received.
1934
33.8k
    public func fireChannelUnregistered() {
1935
33.8k
        self.next?.invokeChannelUnregistered()
1936
33.8k
    }
1937
1938
    /// Send a `channelActive` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`.
1939
    ///
1940
    /// - Note: For correct operation it is very important to forward any `channelActive` event using this method at the right point in time, that is often when received.
1941
0
    public func fireChannelActive() {
1942
0
        self.next?.invokeChannelActive()
1943
0
    }
1944
1945
    /// Send a `channelInactive` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`.
1946
    ///
1947
    /// - Note: For correct operation it is very important to forward any `channelInactive` event using this method at the right point in time, that is often when received.
1948
178k
    public func fireChannelInactive() {
1949
178k
        self.next?.invokeChannelInactive()
1950
178k
    }
1951
1952
    /// Send data to the next inbound `ChannelHandler`. The data should be of type `ChannelInboundHandler.InboundOut`.
1953
46.1M
    public func fireChannelRead(_ data: NIOAny) {
1954
46.1M
        self.next?.invokeChannelRead(data)
1955
46.1M
    }
1956
1957
    /// Signal to the next `ChannelHandler` that a read burst has finished.
1958
116k
    public func fireChannelReadComplete() {
1959
116k
        self.next?.invokeChannelReadComplete()
1960
116k
    }
1961
1962
    /// Send a `writabilityChanged` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`.
1963
    ///
1964
    /// - Note: For correct operation it is very important to forward any `writabilityChanged` event using this method at the right point in time, that is usually when received.
1965
0
    public func fireChannelWritabilityChanged() {
1966
0
        self.next?.invokeChannelWritabilityChanged()
1967
0
    }
1968
1969
    /// Send an error to the next inbound `ChannelHandler`.
1970
247k
    public func fireErrorCaught(_ error: Error) {
1971
247k
        self.next?.invokeErrorCaught(error)
1972
247k
    }
1973
1974
    /// Send a user event to the next inbound `ChannelHandler`.
1975
    ///
1976
    /// This method exists for compatiblity with ``ChannelInboundInvoker``.
1977
    @available(*, deprecated)
1978
    @_disfavoredOverload
1979
0
    public func fireUserInboundEventTriggered(_ event: Any & Sendable) {
1980
0
        self.next?.invokeUserInboundEventTriggered(event)
1981
0
    }
1982
1983
    /// Send a user event to the next inbound `ChannelHandler` from on the event loop.
1984
0
    public func fireUserInboundEventTriggered(_ event: Any) {
1985
0
        self.next?.invokeUserInboundEventTriggered(event)
1986
0
    }
1987
1988
    /// Send a `register` event to the next (outbound) `ChannelHandler` in the `ChannelPipeline`.
1989
    ///
1990
    /// - Note: For correct operation it is very important to forward any `register` event using this method at the right point in time, that is usually when received.
1991
0
    public func register(promise: EventLoopPromise<Void>?) {
1992
0
        if let outboundNext = self.prev {
1993
0
            outboundNext.invokeRegister(promise: promise)
1994
0
        } else {
1995
0
            promise?.fail(ChannelError._ioOnClosedChannel)
1996
0
        }
1997
0
    }
1998
1999
    /// Send a `bind` event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2000
    /// When the `bind` event reaches the `HeadChannelHandler` a `ServerSocketChannel` will be bound.
2001
    ///
2002
    /// - Parameters:
2003
    ///   - address: The address to bind to.
2004
    ///   - promise: The promise fulfilled when the socket is bound or failed if it cannot be bound.
2005
0
    public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
2006
0
        if let outboundNext = self.prev {
2007
0
            outboundNext.invokeBind(to: address, promise: promise)
2008
0
        } else {
2009
0
            promise?.fail(ChannelError._ioOnClosedChannel)
2010
0
        }
2011
0
    }
2012
2013
    /// Send a `connect` event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2014
    /// When the `connect` event reaches the `HeadChannelHandler` a `SocketChannel` will be connected.
2015
    ///
2016
    /// - Parameters:
2017
    ///   - address: The address to connect to.
2018
    ///   - promise: The promise fulfilled when the socket is connected or failed if it cannot be connected.
2019
0
    public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
2020
0
        if let outboundNext = self.prev {
2021
0
            outboundNext.invokeConnect(to: address, promise: promise)
2022
0
        } else {
2023
0
            promise?.fail(ChannelError._ioOnClosedChannel)
2024
0
        }
2025
0
    }
2026
2027
    /// Send a `write` event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2028
    /// When the `write` event reaches the `HeadChannelHandler` the data will be enqueued to be written on the next
2029
    /// `flush` event.
2030
    ///
2031
    /// - Parameters:
2032
    ///   - data: The data to write, should be of type `ChannelOutboundHandler.OutboundOut`.
2033
    ///   - promise: The promise fulfilled when the data has been written or failed if it cannot be written.
2034
0
    public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
2035
0
        if let outboundNext = self.prev {
2036
0
            outboundNext.invokeWrite(data, promise: promise)
2037
0
        } else {
2038
0
            promise?.fail(ChannelError._ioOnClosedChannel)
2039
0
        }
2040
0
    }
2041
2042
    /// Send a `flush` event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2043
    /// When the `flush` event reaches the `HeadChannelHandler` the data previously enqueued will be attempted to be
2044
    /// written to the socket.
2045
0
    public func flush() {
2046
0
        if let outboundNext = self.prev {
2047
0
            outboundNext.invokeFlush()
2048
0
        }
2049
0
    }
2050
2051
    /// Send a `write` event followed by a `flush` event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2052
    /// When the `write` event reaches the `HeadChannelHandler` the data will be enqueued to be written when the `flush`
2053
    /// also reaches the `HeadChannelHandler`.
2054
    ///
2055
    /// - Parameters:
2056
    ///   - data: The data to write, should be of type `ChannelOutboundHandler.OutboundOut`.
2057
    ///   - promise: The promise fulfilled when the previously written data been written and flushed or if that failed.
2058
0
    public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
2059
0
        if let outboundNext = self.prev {
2060
0
            outboundNext.invokeWriteAndFlush(data, promise: promise)
2061
0
        } else {
2062
0
            promise?.fail(ChannelError._ioOnClosedChannel)
2063
0
        }
2064
0
    }
2065
2066
    /// Send a `read` event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2067
    /// When the `read` event reaches the `HeadChannelHandler` the interest to read data will be signalled to the
2068
    /// `Selector`. This will subsequently -- when data becomes readable -- cause `channelRead` events containing the
2069
    /// data being sent through the `ChannelPipeline`.
2070
0
    public func read() {
2071
0
        if let outboundNext = self.prev {
2072
0
            outboundNext.invokeRead()
2073
0
        }
2074
0
    }
2075
2076
    /// Send a `close` event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2077
    /// When the `close` event reaches the `HeadChannelHandler` the socket will be closed.
2078
    ///
2079
    /// - Parameters:
2080
    ///   - mode: The `CloseMode` to use.
2081
    ///   - promise: The promise fulfilled when the `Channel` has been closed or failed if it the closing failed.
2082
0
    public func close(mode: CloseMode = .all, promise: EventLoopPromise<Void>?) {
2083
0
        if let outboundNext = self.prev {
2084
0
            outboundNext.invokeClose(mode: mode, promise: promise)
2085
0
        } else {
2086
0
            promise?.fail(ChannelError._alreadyClosed)
2087
0
        }
2088
0
    }
2089
2090
    /// Send a user event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2091
    ///
2092
    /// - Parameters:
2093
    ///   - event: The user event to send.
2094
    ///   - promise: The promise fulfilled when the user event has been sent or failed if it couldn't be sent.
2095
    @available(*, deprecated)
2096
    @_disfavoredOverload
2097
0
    public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise<Void>?) {
2098
0
        self._triggerUserOutboundEvent(event, promise: promise)
2099
0
    }
2100
2101
    /// Send a user event to the next outbound `ChannelHandler` in the `ChannelPipeline`.
2102
    ///
2103
    /// - Parameters:
2104
    ///   - event: The user event to send.
2105
    ///   - promise: The promise fulfilled when the user event has been sent or failed if it couldn't be sent.
2106
0
    public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) {
2107
0
        self._triggerUserOutboundEvent(event, promise: promise)
2108
0
    }
2109
2110
0
    private func _triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) {
2111
0
        if let outboundNext = self.prev {
2112
0
            outboundNext.invokeTriggerUserOutboundEvent(event, promise: promise)
2113
0
        } else {
2114
0
            promise?.fail(ChannelError._ioOnClosedChannel)
2115
0
        }
2116
0
    }
2117
2118
13.1k
    fileprivate func invokeChannelRegistered() {
2119
13.1k
        self.eventLoop.assertInEventLoop()
2120
13.1k
2121
13.1k
        if let inboundHandler = self.inboundHandler {
2122
13.1k
            inboundHandler.channelRegistered(context: self)
2123
13.1k
        } else {
2124
0
            self.next?.invokeChannelRegistered()
2125
0
        }
2126
13.1k
    }
2127
2128
26.2k
    fileprivate func invokeChannelUnregistered() {
2129
26.2k
        self.eventLoop.assertInEventLoop()
2130
26.2k
2131
26.2k
        if let inboundHandler = self.inboundHandler {
2132
26.2k
            inboundHandler.channelUnregistered(context: self)
2133
26.2k
        } else {
2134
0
            self.next?.invokeChannelUnregistered()
2135
0
        }
2136
26.2k
    }
2137
2138
0
    fileprivate func invokeChannelActive() {
2139
0
        self.eventLoop.assertInEventLoop()
2140
0
2141
0
        if let inboundHandler = self.inboundHandler {
2142
0
            inboundHandler.channelActive(context: self)
2143
0
        } else {
2144
0
            self.next?.invokeChannelActive()
2145
0
        }
2146
0
    }
2147
2148
26.2k
    fileprivate func invokeChannelInactive() {
2149
26.2k
        self.eventLoop.assertInEventLoop()
2150
26.2k
2151
26.2k
        if let inboundHandler = self.inboundHandler {
2152
26.2k
            inboundHandler.channelInactive(context: self)
2153
26.2k
        } else {
2154
0
            self.next?.invokeChannelInactive()
2155
0
        }
2156
26.2k
    }
2157
2158
2.80M
    fileprivate func invokeChannelRead(_ data: NIOAny) {
2159
2.80M
        self.eventLoop.assertInEventLoop()
2160
2.80M
2161
2.80M
        if let inboundHandler = self.inboundHandler {
2162
2.80M
            inboundHandler.channelRead(context: self, data: data)
2163
2.80M
        } else {
2164
0
            self.next?.invokeChannelRead(data)
2165
0
        }
2166
2.80M
    }
2167
2168
26.2k
    fileprivate func invokeChannelReadComplete() {
2169
26.2k
        self.eventLoop.assertInEventLoop()
2170
26.2k
2171
26.2k
        if let inboundHandler = self.inboundHandler {
2172
26.2k
            inboundHandler.channelReadComplete(context: self)
2173
26.2k
        } else {
2174
0
            self.next?.invokeChannelReadComplete()
2175
0
        }
2176
26.2k
    }
2177
2178
0
    fileprivate func invokeChannelWritabilityChanged() {
2179
0
        self.eventLoop.assertInEventLoop()
2180
0
2181
0
        if let inboundHandler = self.inboundHandler {
2182
0
            inboundHandler.channelWritabilityChanged(context: self)
2183
0
        } else {
2184
0
            self.next?.invokeChannelWritabilityChanged()
2185
0
        }
2186
0
    }
2187
2188
12.4k
    fileprivate func invokeErrorCaught(_ error: Error) {
2189
12.4k
        self.eventLoop.assertInEventLoop()
2190
12.4k
2191
12.4k
        if let inboundHandler = self.inboundHandler {
2192
12.4k
            inboundHandler.errorCaught(context: self, error: error)
2193
12.4k
        } else {
2194
0
            self.next?.invokeErrorCaught(error)
2195
0
        }
2196
12.4k
    }
2197
2198
0
    fileprivate func invokeUserInboundEventTriggered(_ event: Any) {
2199
0
        self.eventLoop.assertInEventLoop()
2200
0
2201
0
        if let inboundHandler = self.inboundHandler {
2202
0
            inboundHandler.userInboundEventTriggered(context: self, event: event)
2203
0
        } else {
2204
0
            self.next?.invokeUserInboundEventTriggered(event)
2205
0
        }
2206
0
    }
2207
2208
13.1k
    fileprivate func invokeRegister(promise: EventLoopPromise<Void>?) {
2209
13.1k
        self.eventLoop.assertInEventLoop()
2210
13.1k
2211
13.1k
        if let outboundHandler = self.outboundHandler {
2212
13.1k
            outboundHandler.register(context: self, promise: promise)
2213
13.1k
        } else {
2214
0
            self.prev?.invokeRegister(promise: promise)
2215
0
        }
2216
13.1k
    }
2217
2218
0
    fileprivate func invokeBind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
2219
0
        self.eventLoop.assertInEventLoop()
2220
0
2221
0
        if let outboundHandler = self.outboundHandler {
2222
0
            outboundHandler.bind(context: self, to: address, promise: promise)
2223
0
        } else {
2224
0
            self.prev?.invokeBind(to: address, promise: promise)
2225
0
        }
2226
0
    }
2227
2228
0
    fileprivate func invokeConnect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
2229
0
        self.eventLoop.assertInEventLoop()
2230
0
2231
0
        if let outboundHandler = self.outboundHandler {
2232
0
            outboundHandler.connect(context: self, to: address, promise: promise)
2233
0
        } else {
2234
0
            self.prev?.invokeConnect(to: address, promise: promise)
2235
0
        }
2236
0
    }
2237
2238
0
    fileprivate func invokeWrite(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
2239
0
        self.eventLoop.assertInEventLoop()
2240
0
2241
0
        if let outboundHandler = self.outboundHandler {
2242
0
            outboundHandler.write(context: self, data: data, promise: promise)
2243
0
        } else {
2244
0
            self.prev?.invokeWrite(data, promise: promise)
2245
0
        }
2246
0
    }
2247
2248
0
    fileprivate func invokeFlush() {
2249
0
        self.eventLoop.assertInEventLoop()
2250
0
2251
0
        if let outboundHandler = self.outboundHandler {
2252
0
            outboundHandler.flush(context: self)
2253
0
        } else {
2254
0
            self.prev?.invokeFlush()
2255
0
        }
2256
0
    }
2257
2258
0
    fileprivate func invokeWriteAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
2259
0
        self.eventLoop.assertInEventLoop()
2260
0
2261
0
        if let outboundHandler = self.outboundHandler {
2262
0
            outboundHandler.write(context: self, data: data, promise: promise)
2263
0
            outboundHandler.flush(context: self)
2264
0
        } else {
2265
0
            self.prev?.invokeWriteAndFlush(data, promise: promise)
2266
0
        }
2267
0
    }
2268
2269
0
    fileprivate func invokeRead() {
2270
0
        self.eventLoop.assertInEventLoop()
2271
0
2272
0
        if let outboundHandler = self.outboundHandler {
2273
0
            outboundHandler.read(context: self)
2274
0
        } else {
2275
0
            self.prev?.invokeRead()
2276
0
        }
2277
0
    }
2278
2279
26.2k
    fileprivate func invokeClose(mode: CloseMode, promise: EventLoopPromise<Void>?) {
2280
26.2k
        self.eventLoop.assertInEventLoop()
2281
26.2k
2282
26.2k
        if let outboundHandler = self.outboundHandler {
2283
13.1k
            outboundHandler.close(context: self, mode: mode, promise: promise)
2284
13.1k
        } else {
2285
13.1k
            self.prev?.invokeClose(mode: mode, promise: promise)
2286
13.1k
        }
2287
26.2k
    }
2288
2289
0
    fileprivate func invokeTriggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) {
2290
0
        self.eventLoop.assertInEventLoop()
2291
0
2292
0
        if let outboundHandler = self.outboundHandler {
2293
0
            outboundHandler.triggerUserOutboundEvent(context: self, event: event, promise: promise)
2294
0
        } else {
2295
0
            self.prev?.invokeTriggerUserOutboundEvent(event, promise: promise)
2296
0
        }
2297
0
    }
2298
2299
13.1k
    fileprivate func invokeHandlerAdded() {
2300
13.1k
        self.eventLoop.assertInEventLoop()
2301
13.1k
2302
13.1k
        handler.handlerAdded(context: self)
2303
13.1k
    }
2304
2305
39.3k
    fileprivate func invokeHandlerRemoved() {
2306
39.3k
        self.eventLoop.assertInEventLoop()
2307
39.3k
        guard !self.removeHandlerInvoked else {
2308
0
            return
2309
39.3k
        }
2310
39.3k
        self.removeHandlerInvoked = true
2311
39.3k
2312
39.3k
        handler.handlerRemoved(context: self)
2313
39.3k
    }
2314
}
2315
2316
// This extension "un-deprecates" some parts of the ChannelInvoker API for
2317
// ChannelHandlerContext specifically. These methods were not sound elsewhere,
2318
// but they're fine here.
2319
extension ChannelHandlerContext {
2320
    /// Write data to the remote peer.
2321
    ///
2322
    /// Be aware that to be sure that data is really written to the remote peer you need to call `flush` or use `writeAndFlush`.
2323
    /// Calling `write` multiple times and then `flush` may allow the `Channel` to `write` multiple data objects to the remote peer with one syscall.
2324
    ///
2325
    /// - Parameters:
2326
    ///   - data: the data to write
2327
    ///   - file: The file this function was called in, for debugging purposes.
2328
    ///   - line: The line this function was called on, for debugging purposes.
2329
    /// - Returns: the future which will be notified once the operation completes.
2330
0
    public func write(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<Void> {
2331
0
        let promise = self.eventLoop.makePromise(of: Void.self, file: file, line: line)
2332
0
        self.write(data, promise: promise)
2333
0
        return promise.futureResult
2334
0
    }
2335
2336
    /// Shortcut for calling `write` and `flush`.
2337
    ///
2338
    /// - Parameters:
2339
    ///   - data: the data to write
2340
    ///   - file: The file this function was called in, for debugging purposes.
2341
    ///   - line: The line this function was called on, for debugging purposes.
2342
    /// - Returns: the future which will be notified once the `write` operation completes.
2343
    public func writeAndFlush(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<Void>
2344
0
    {
2345
0
        let promise = self.eventLoop.makePromise(of: Void.self, file: file, line: line)
2346
0
        self.writeAndFlush(data, promise: promise)
2347
0
        return promise.futureResult
2348
0
    }
2349
2350
    /// Returns this `ChannelHandlerContext` as a `NIOLoopBound`, bound to `self.eventLoop`.
2351
    ///
2352
    /// This is a shorthand for `NIOLoopBound(self, eventLoop: self.eventLoop)`.
2353
    ///
2354
    /// Being able to capture `ChannelHandlerContext`s in `EventLoopFuture` callbacks is important in SwiftNIO programs.
2355
    /// Of course, this is not always safe because the `EventLoopFuture` callbacks may run on other threads. SwiftNIO
2356
    /// programmers therefore always had to manually arrange for those callbacks to run on the correct `EventLoop`
2357
    /// (i.e. `context.eventLoop`) which then made that construction safe.
2358
    ///
2359
    /// Newer Swift versions contain a static feature to automatically detect data races which of course can't detect
2360
    /// the only _dynamically_ ``EventLoop`` a ``EventLoopFuture`` callback is running on. ``NIOLoopBound`` can be used
2361
    /// to prove to the compiler that this is safe and in case it is not, ``NIOLoopBound`` will trap at runtime. This is
2362
    /// therefore dynamically enforce the correct behaviour.
2363
0
    public var loopBound: NIOLoopBound<ChannelHandlerContext> {
2364
0
        NIOLoopBound(self, eventLoop: self.eventLoop)
2365
0
    }
2366
}
2367
2368
@available(*, unavailable)
2369
extension ChannelHandlerContext: Sendable {}
2370
2371
extension ChannelHandlerContext {
2372
    /// A `RemovalToken` is handed to a `RemovableChannelHandler` when its `removeHandler` function is invoked. A
2373
    /// `RemovableChannelHandler` is then required to remove itself from the `ChannelPipeline`. The removal process
2374
    /// is finalized by handing the `RemovalToken` to the `ChannelHandlerContext.leavePipeline` function.
2375
    public struct RemovalToken: Sendable {
2376
        internal let promise: EventLoopPromise<Void>?
2377
    }
2378
2379
    /// Synchronously remove the `ChannelHandler` with the given `ChannelHandlerContext`.
2380
    ///
2381
    /// - Note: This function must only be used from a `RemovableChannelHandler` to remove itself. Calling this method
2382
    ///         on any other `ChannelHandlerContext` leads to undefined behaviour.
2383
    ///
2384
    /// - Parameters:
2385
    ///    - removalToken: The removal token received from `RemovableChannelHandler.removeHandler`
2386
0
    public func leavePipeline(removalToken: RemovalToken) {
2387
0
        self.eventLoop.preconditionInEventLoop()
2388
0
        self.pipeline.removeHandlerFromPipeline(context: self, promise: removalToken.promise)
2389
0
    }
2390
2391
0
    internal func startUserTriggeredRemoval(promise: EventLoopPromise<Void>?) {
2392
0
        self.eventLoop.assertInEventLoop()
2393
0
        guard !self.userTriggeredRemovalStarted else {
2394
0
            promise?.fail(NIOAttemptedToRemoveHandlerMultipleTimesError())
2395
0
            return
2396
0
        }
2397
0
        self.userTriggeredRemovalStarted = true
2398
0
        (self.handler as! RemovableChannelHandler).removeHandler(
2399
0
            context: self,
2400
0
            removalToken: .init(promise: promise)
2401
0
        )
2402
0
    }
2403
}
2404
2405
extension ChannelHandlerContext {
2406
0
    var sendableView: SendableView {
2407
0
        SendableView(wrapping: self)
2408
0
    }
2409
2410
    /// A wrapper over ``ChannelHandlerContext`` that allows access to the thread-safe API
2411
    /// surface on the type.
2412
    ///
2413
    /// Very little of ``ChannelHandlerContext`` is thread-safe, but in a rare few places
2414
    /// there are things we can access. This type makes those available.
2415
    struct SendableView: @unchecked Sendable {
2416
        private let context: ChannelHandlerContext
2417
2418
0
        fileprivate init(wrapping context: ChannelHandlerContext) {
2419
0
            self.context = context
2420
0
        }
2421
2422
        /// Whether the ``ChannelHandler`` associated with this context conforms to
2423
        /// ``RemovableChannelHandler``.
2424
0
        var channelHandlerIsRemovable: Bool {
2425
0
            // `context.handler` is not mutable, and set at construction, so this access is
2426
0
            // acceptable. The protocol conformance check is also safe.
2427
0
            self.context.handler is RemovableChannelHandler
2428
0
        }
2429
2430
        /// Grabs the underlying ``ChannelHandlerContext``. May only be called on the
2431
        /// event loop.
2432
0
        var wrappedValue: ChannelHandlerContext {
2433
0
            // The event loop lookup here is also thread-safe, so we can grab the value out
2434
0
            // and use it.
2435
0
            self.context.eventLoop.preconditionInEventLoop()
2436
0
            return self.context
2437
0
        }
2438
    }
2439
}
2440
2441
extension ChannelPipeline: CustomDebugStringConvertible {
2442
0
    public var debugDescription: String {
2443
0
        // This method forms output in the following format:
2444
0
        //
2445
0
        // ChannelPipeline[0x0000000000000000]:
2446
0
        //                      [I] ↓↑ [O]
2447
0
        //  <incoming handler type> ↓↑                         [<name>]
2448
0
        //                          ↓↑ <outgoing handler type> [<name>]
2449
0
        //    <duplex handler type> ↓↑ <duplex handler type>   [<name>]
2450
0
        //
2451
0
        var desc = ["ChannelPipeline[\(ObjectIdentifier(self))]:"]
2452
0
        let debugInfos = self.collectHandlerDebugInfos()
2453
0
        let maxIncomingTypeNameCount =
2454
0
            debugInfos.filter { $0.isIncoming }
2455
0
            .map { $0.typeName.count }
2456
0
            .max() ?? 0
2457
0
        let maxOutgoingTypeNameCount =
2458
0
            debugInfos.filter { $0.isOutgoing }
2459
0
            .map { $0.typeName.count }
2460
0
            .max() ?? 0
2461
0
2462
0
        func whitespace(count: Int) -> String {
2463
0
            String(repeating: " ", count: count)
2464
0
        }
2465
0
2466
0
        if debugInfos.isEmpty {
2467
0
            desc.append(" <no handlers>")
2468
0
        } else {
2469
0
            desc.append(whitespace(count: maxIncomingTypeNameCount - 2) + "[I] ↓↑ [O]")
2470
0
            for debugInfo in debugInfos {
2471
0
                var line = [String]()
2472
0
                line.append(" ")
2473
0
                if debugInfo.isIncoming {
2474
0
                    line.append(whitespace(count: maxIncomingTypeNameCount - debugInfo.typeName.count))
2475
0
                    line.append(debugInfo.typeName)
2476
0
                } else {
2477
0
                    line.append(whitespace(count: maxIncomingTypeNameCount))
2478
0
                }
2479
0
                line.append(" ↓↑ ")
2480
0
                if debugInfo.isOutgoing {
2481
0
                    line.append(debugInfo.typeName)
2482
0
                    line.append(whitespace(count: maxOutgoingTypeNameCount - debugInfo.typeName.count))
2483
0
                } else {
2484
0
                    line.append(whitespace(count: maxOutgoingTypeNameCount))
2485
0
                }
2486
0
                line.append(" ")
2487
0
                line.append("[\(debugInfo.name)]")
2488
0
                desc.append(line.joined())
2489
0
            }
2490
0
        }
2491
0
2492
0
        return desc.joined(separator: "\n")
2493
0
    }
2494
2495
    /// Returns the first `ChannelHandler` of the given type.
2496
    ///
2497
    /// - Parameters:
2498
    ///   - type: the type of `ChannelHandler` to return.
2499
    @inlinable
2500
    @preconcurrency
2501
    public func handler<Handler: ChannelHandler & _NIOCoreSendableMetatype>(
2502
        type _: Handler.Type
2503
0
    ) -> EventLoopFuture<Handler> {
2504
0
        self.context(handlerType: Handler.self).map { context in
2505
0
            guard let typedContext = context.handler as? Handler else {
2506
0
                preconditionFailure(
2507
0
                    "Expected channel handler of type \(Handler.self), got \(type(of: context.handler)) instead."
2508
0
                )
2509
0
            }
2510
0
2511
0
            return typedContext
2512
0
        }
2513
0
    }
2514
2515
    /// Synchronously finds and returns the first `ChannelHandler` of the given type.
2516
    ///
2517
    /// - Important: This must be called on the `EventLoop`.
2518
    /// - Parameters:
2519
    ///   - type: the type of `ChannelHandler` to return.
2520
    @inlinable  // should be fileprivate
2521
0
    internal func _handlerSync<Handler: ChannelHandler>(type _: Handler.Type) -> Result<Handler, Error> {
2522
0
        self._contextSync(handlerType: Handler.self).map { context in
2523
0
            guard let typedContext = context.handler as? Handler else {
2524
0
                preconditionFailure(
2525
0
                    "Expected channel handler of type \(Handler.self), got \(type(of: context.handler)) instead."
2526
0
                )
2527
0
            }
2528
0
            return typedContext
2529
0
        }
2530
0
    }
2531
2532
    private struct ChannelHandlerDebugInfo {
2533
        let handler: ChannelHandler
2534
        let name: String
2535
0
        var isIncoming: Bool {
2536
0
            self.handler is _ChannelInboundHandler
2537
0
        }
2538
0
        var isOutgoing: Bool {
2539
0
            self.handler is _ChannelOutboundHandler
2540
0
        }
2541
0
        var typeName: String {
2542
0
            "\(type(of: self.handler))"
2543
0
        }
2544
    }
2545
2546
0
    private func collectHandlerDebugInfos() -> [ChannelHandlerDebugInfo] {
2547
0
        var handlers = [ChannelHandlerDebugInfo]()
2548
0
        var node = self.head?.next
2549
0
        while let context = node, context !== self.tail {
2550
0
            handlers.append(.init(handler: context.handler, name: context.name))
2551
0
            node = context.next
2552
0
        }
2553
0
        return handlers
2554
0
    }
2555
}
2556
2557
extension ChannelPipeline {
2558
    private enum BufferingDirection: Equatable {
2559
        case inbound
2560
        case outbound
2561
    }
2562
2563
    /// Retrieve the total number of bytes buffered for outbound.
2564
0
    public func outboundBufferedBytes() -> EventLoopFuture<Int> {
2565
0
        let future: EventLoopFuture<Int>
2566
0
2567
0
        if self.eventLoop.inEventLoop {
2568
0
            future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .outbound))
2569
0
        } else {
2570
0
            future = self.eventLoop.submit {
2571
0
                self.countAllBufferedBytes(direction: .outbound)
2572
0
            }
2573
0
        }
2574
0
2575
0
        return future
2576
0
    }
2577
2578
    /// Retrieve the total number of bytes buffered for inbound.
2579
0
    public func inboundBufferedBytes() -> EventLoopFuture<Int> {
2580
0
        let future: EventLoopFuture<Int>
2581
0
2582
0
        if self.eventLoop.inEventLoop {
2583
0
            future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .inbound))
2584
0
        } else {
2585
0
            future = self.eventLoop.submit {
2586
0
                self.countAllBufferedBytes(direction: .inbound)
2587
0
            }
2588
0
        }
2589
0
2590
0
        return future
2591
0
    }
2592
2593
0
    private static func countBufferedBytes(context: ChannelHandlerContext, direction: BufferingDirection) -> Int? {
2594
0
        switch direction {
2595
0
        case .inbound:
2596
0
            guard let handler = context.handler as? NIOInboundByteBufferingChannelHandler else {
2597
0
                return nil
2598
0
            }
2599
0
            return handler.inboundBufferedBytes
2600
0
        case .outbound:
2601
0
            guard let handler = context.handler as? NIOOutboundByteBufferingChannelHandler else {
2602
0
                return nil
2603
0
            }
2604
0
            return handler.outboundBufferedBytes
2605
0
        }
2606
0
2607
0
    }
2608
2609
0
    private func countAllBufferedBytes(direction: BufferingDirection) -> Int {
2610
0
        self.eventLoop.assertInEventLoop()
2611
0
        var total = 0
2612
0
        var current = self.head?.next
2613
0
        switch direction {
2614
0
        case .inbound:
2615
0
            while let c = current, c !== self.tail {
2616
0
                if let inboundHandler = c.handler as? NIOInboundByteBufferingChannelHandler {
2617
0
                    total += inboundHandler.inboundBufferedBytes
2618
0
                }
2619
0
                current = current?.next
2620
0
            }
2621
0
        case .outbound:
2622
0
            while let c = current, c !== self.tail {
2623
0
                if let outboundHandler = c.handler as? NIOOutboundByteBufferingChannelHandler {
2624
0
                    total += outboundHandler.outboundBufferedBytes
2625
0
                }
2626
0
                current = current?.next
2627
0
            }
2628
0
        }
2629
0
2630
0
        return total
2631
0
    }
2632
}
2633
2634
extension ChannelPipeline.SynchronousOperations {
2635
    /// Retrieve the total number of bytes buffered for outbound.
2636
    ///
2637
    /// - Important: This *must* be called on the event loop.
2638
0
    public func outboundBufferedBytes() -> Int {
2639
0
        self.eventLoop.assertInEventLoop()
2640
0
        return self._pipeline.countAllBufferedBytes(direction: .outbound)
2641
0
    }
2642
2643
    /// Retrieve the number of outbound bytes buffered in the `ChannelHandler` associated with the given`ChannelHandlerContext`.
2644
    ///
2645
    /// - Parameters:
2646
    ///   - context: the `ChannelHandlerContext` from which the outbound buffered bytes of the `ChannelHandler` will be retrieved.
2647
    /// - Important: This *must* be called on the event loop.
2648
    ///
2649
    /// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
2650
    ///            If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
2651
    ///            `NIOOutboundByteBufferingChannelHandler`, this method will return `nil`.
2652
0
    public func outboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
2653
0
        self.eventLoop.assertInEventLoop()
2654
0
        return ChannelPipeline.countBufferedBytes(context: context, direction: .outbound)
2655
0
    }
2656
2657
    /// Retrieve total number of bytes buffered for inbound.
2658
    ///
2659
    /// - Important: This *must* be called on the event loop.
2660
0
    public func inboundBufferedBytes() -> Int {
2661
0
        self.eventLoop.assertInEventLoop()
2662
0
        return self._pipeline.countAllBufferedBytes(direction: .inbound)
2663
0
    }
2664
2665
    /// Retrieve the number of inbound bytes buffered in the `ChannelHandler` associated with the given `ChannelHandlerContext`.
2666
    ///
2667
    /// - Parameters:
2668
    ///   - context: the `ChannelHandlerContext` from which the inbound buffered bytes of the `handler` will be retrieved.
2669
    /// - Important: This *must* be called on the event loop.
2670
    ///
2671
    /// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
2672
    ///            If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
2673
    ///            `NIOInboundByteBufferingChannelHandler`, this method will return `nil`.
2674
0
    public func inboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
2675
0
        self.eventLoop.assertInEventLoop()
2676
0
        return ChannelPipeline.countBufferedBytes(context: context, direction: .inbound)
2677
0
    }
2678
}