/src/swift-nio/Sources/NIOCore/ChannelPipeline.swift
Line | Count | Source |
1 | | //===----------------------------------------------------------------------===// |
2 | | // |
3 | | // This source file is part of the SwiftNIO open source project |
4 | | // |
5 | | // Copyright (c) 2017-2024 Apple Inc. and the SwiftNIO project authors |
6 | | // Licensed under Apache License v2.0 |
7 | | // |
8 | | // See LICENSE.txt for license information |
9 | | // See CONTRIBUTORS.txt for the list of SwiftNIO project authors |
10 | | // |
11 | | // SPDX-License-Identifier: Apache-2.0 |
12 | | // |
13 | | //===----------------------------------------------------------------------===// |
14 | | |
15 | | /// A list of `ChannelHandler`s that handle or intercept inbound events and outbound operations of a |
16 | | /// `Channel`. `ChannelPipeline` implements an advanced form of the Intercepting Filter pattern |
17 | | /// to give a user full control over how an event is handled and how the `ChannelHandler`s in a pipeline |
18 | | /// interact with each other. |
19 | | /// |
20 | | /// # Creation of a pipeline |
21 | | /// |
22 | | /// Each `Channel` has its own `ChannelPipeline` and it is created automatically when a new `Channel` is created. |
23 | | /// |
24 | | /// # How an event flows in a pipeline |
25 | | /// |
26 | | /// The following diagram describes how I/O events are typically processed by `ChannelHandler`s in a `ChannelPipeline`. |
27 | | /// An I/O event is handled by either a `ChannelInboundHandler` or a `ChannelOutboundHandler` |
28 | | /// and is forwarded to the next handler in the `ChannelPipeline` by calling the event propagation methods defined in |
29 | | /// `ChannelHandlerContext`, such as `ChannelHandlerContext.fireChannelRead` and |
30 | | /// `ChannelHandlerContext.write`. |
31 | | /// |
32 | | /// ``` |
33 | | /// I/O Request |
34 | | /// via `Channel` or |
35 | | /// `ChannelHandlerContext` |
36 | | /// | |
37 | | /// +---------------------------------------------------+---------------+ |
38 | | /// | ChannelPipeline | | |
39 | | /// | TAIL \|/ | |
40 | | /// | +---------------------+ +-----------+----------+ | |
41 | | /// | | Inbound Handler N | | Outbound Handler 1 | | |
42 | | /// | +----------+----------+ +-----------+----------+ | |
43 | | /// | /|\ | | |
44 | | /// | | \|/ | |
45 | | /// | +----------+----------+ +-----------+----------+ | |
46 | | /// | | Inbound Handler N-1 | | Outbound Handler 2 | | |
47 | | /// | +----------+----------+ +-----------+----------+ | |
48 | | /// | /|\ . | |
49 | | /// | . . | |
50 | | /// | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| |
51 | | /// | [ method call] [method call] | |
52 | | /// | . . | |
53 | | /// | . \|/ | |
54 | | /// | +----------+----------+ +-----------+----------+ | |
55 | | /// | | Inbound Handler 2 | | Outbound Handler M-1 | | |
56 | | /// | +----------+----------+ +-----------+----------+ | |
57 | | /// | /|\ | | |
58 | | /// | | \|/ | |
59 | | /// | +----------+----------+ +-----------+----------+ | |
60 | | /// | | Inbound Handler 1 | | Outbound Handler M | | |
61 | | /// | +----------+----------+ +-----------+----------+ | |
62 | | /// | /|\ HEAD | | |
63 | | /// +---------------+-----------------------------------+---------------+ |
64 | | /// | \|/ |
65 | | /// +---------------+-----------------------------------+---------------+ |
66 | | /// | | | | |
67 | | /// | [ Socket.read ] [ Socket.write ] | |
68 | | /// | | |
69 | | /// | SwiftNIO Internal I/O Threads (Transport Implementation) | |
70 | | /// +-------------------------------------------------------------------+ |
71 | | /// ``` |
72 | | /// |
73 | | /// An inbound event is handled by the inbound handlers in the head-to-tail direction as shown on the left side of the |
74 | | /// diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the |
75 | | /// diagram. The inbound data is often read from a remote peer via the actual input operation such as |
76 | | /// `Socket.read`. If an inbound event goes beyond the tail inbound handler, it is discarded |
77 | | /// silently, or logged if it needs your attention. |
78 | | /// |
79 | | /// An outbound event is handled by the outbound handlers in the tail-to-head direction as shown on the right side of the |
80 | | /// diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests. |
81 | | /// If an outbound event goes beyond the head outbound handler, it is handled by an I/O thread associated with the |
82 | | /// `Channel`. The I/O thread often performs the actual output operation such as `Socket.write`. |
83 | | /// |
84 | | /// |
85 | | /// For example, let us assume that we created the following pipeline: |
86 | | /// |
87 | | /// ``` |
88 | | /// ChannelPipeline p = ... |
89 | | /// let future = p.add(name: "1", handler: InboundHandlerA()).flatMap { |
90 | | /// p.add(name: "2", handler: InboundHandlerB()) |
91 | | /// }.flatMap { |
92 | | /// p.add(name: "3", handler: OutboundHandlerA()) |
93 | | /// }.flatMap { |
94 | | /// p.add(name: "4", handler: OutboundHandlerB()) |
95 | | /// }.flatMap { |
96 | | /// p.add(name: "5", handler: InboundOutboundHandlerX()) |
97 | | /// } |
98 | | /// // Handle the future as well .... |
99 | | /// ``` |
100 | | /// |
101 | | /// In the example above, a class whose name starts with `Inbound` is an inbound handler. |
102 | | /// A class whose name starts with `Outbound` is an outbound handler. |
103 | | /// |
104 | | /// In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound. |
105 | | /// When an event goes outbound, the order is 5, 4, 3, 2, 1. On top of this principle, `ChannelPipeline` skips |
106 | | /// the evaluation of certain handlers to shorten the stack depth: |
107 | | /// |
108 | | /// - 3 and 4 don't implement `ChannelInboundHandler`, and therefore the actual evaluation order of an inbound event will be: 1, 2, and 5. |
109 | | /// - 1 and 2 don't implement `ChannelOutboundHandler`, and therefore the actual evaluation order of a outbound event will be: 5, 4, and 3. |
110 | | /// - If 5 implements both `ChannelInboundHandler` and `ChannelOutboundHandler`, the evaluation order of an inbound and a outbound event could be 125 and 543 respectively. |
111 | | /// |
112 | | /// - Note: Handlers may choose not to propagate messages down the pipeline immediately. For example a handler may need to wait |
113 | | /// for additional data before sending a protocol event to the next handler in the pipeline. Due to this you can't assume that later handlers |
114 | | /// in the pipeline will receive the same number of events as were sent, or that events of different types will arrive in the same order. |
115 | | /// For example - a user event could overtake a data event if a handler is aggregating data events before propagating but immediately |
116 | | /// propagating user events. |
117 | | /// |
118 | | /// # Forwarding an event to the next handler |
119 | | /// |
120 | | /// As you might noticed in the diagram above, a handler has to invoke the event propagation methods in |
121 | | /// `ChannelHandlerContext` to forward an event to its next handler. |
122 | | /// Those methods include: |
123 | | /// |
124 | | /// - Inbound event propagation methods defined in `ChannelInboundInvoker` |
125 | | /// - Outbound event propagation methods defined in `ChannelOutboundInvoker`. |
126 | | /// |
127 | | /// # Building a pipeline |
128 | | /// |
129 | | /// A user is supposed to have one or more `ChannelHandler`s in a `ChannelPipeline` to receive I/O events (e.g. read) and |
130 | | /// to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers |
131 | | /// in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the |
132 | | /// protocol and business logic: |
133 | | /// |
134 | | /// - Protocol Decoder - translates binary data (e.g. `ByteBuffer`) into a struct / class |
135 | | /// - Protocol Encoder - translates a struct / class into binary data (e.g. `ByteBuffer`) |
136 | | /// - Business Logic Handler - performs the actual business logic (e.g. database access) |
137 | | /// |
138 | | /// # Thread safety |
139 | | /// |
140 | | /// A `ChannelHandler` can be added or removed at any time because a `ChannelPipeline` is thread safe. |
141 | | public final class ChannelPipeline: ChannelInvoker { |
142 | | private var head: Optional<ChannelHandlerContext> |
143 | | private var tail: Optional<ChannelHandlerContext> |
144 | | |
145 | 33.8k | private var idx: Int = 0 |
146 | 33.8k | internal private(set) var destroyed: Bool = false |
147 | | |
148 | | /// The `EventLoop` that is used by the underlying `Channel`. |
149 | | public let eventLoop: EventLoop |
150 | | |
151 | | /// The `Channel` that this `ChannelPipeline` belongs to. |
152 | | /// |
153 | | /// - Note: This will be nil after the channel has closed |
154 | | private var _channel: Optional<Channel> |
155 | | |
156 | | /// The `Channel` that this `ChannelPipeline` belongs to. |
157 | | @usableFromInline |
158 | 30.3M | internal var channel: Channel { |
159 | 30.3M | self.eventLoop.assertInEventLoop() |
160 | 30.3M | assert(self._channel != nil || self.destroyed) |
161 | 30.3M | return self._channel ?? DeadChannel(pipeline: self) |
162 | 30.3M | } |
163 | | |
164 | | /// Add a `ChannelHandler` to the `ChannelPipeline`. |
165 | | /// |
166 | | /// - Parameters: |
167 | | /// - name: the name to use for the `ChannelHandler` when it's added. If none is specified it will generate a name. |
168 | | /// - handler: the `ChannelHandler` to add |
169 | | /// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`. |
170 | | /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was added. |
171 | | @preconcurrency |
172 | | public func addHandler( |
173 | | _ handler: ChannelHandler & Sendable, |
174 | | name: String? = nil, |
175 | | position: ChannelPipeline.Position = .last |
176 | 13.1k | ) -> EventLoopFuture<Void> { |
177 | 13.1k | let future: EventLoopFuture<Void> |
178 | 13.1k | |
179 | 13.1k | if self.eventLoop.inEventLoop { |
180 | 13.1k | let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) |
181 | 13.1k | future = self.eventLoop.makeCompletedFuture( |
182 | 13.1k | self.addHandlerSync(handler, name: name, position: syncPosition) |
183 | 13.1k | ) |
184 | 13.1k | } else { |
185 | 0 | future = self.eventLoop.submit { |
186 | 0 | let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) |
187 | 0 | try self.addHandlerSync(handler, name: name, position: syncPosition).get() |
188 | 0 | } |
189 | 0 | } |
190 | 13.1k | |
191 | 13.1k | return future |
192 | 13.1k | } |
193 | | |
194 | | /// Synchronously add a `ChannelHandler` to the `ChannelPipeline`. |
195 | | /// |
196 | | /// May only be called from on the event loop. |
197 | | /// |
198 | | /// - Parameters: |
199 | | /// - handler: the `ChannelHandler` to add |
200 | | /// - name: the name to use for the `ChannelHandler` when it's added. If none is specified a name will be generated. |
201 | | /// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`. |
202 | | /// - Returns: the result of adding this handler - either success or failure with an error code if this could not be completed. |
203 | | fileprivate func addHandlerSync( |
204 | | _ handler: ChannelHandler, |
205 | | name: String? = nil, |
206 | | position: ChannelPipeline.SynchronousOperations.Position = .last |
207 | 13.1k | ) -> Result<Void, Error> { |
208 | 13.1k | self.eventLoop.assertInEventLoop() |
209 | 13.1k | |
210 | 13.1k | if self.destroyed { |
211 | 0 | return .failure(ChannelError._ioOnClosedChannel) |
212 | 13.1k | } |
213 | 13.1k | |
214 | 13.1k | switch position { |
215 | 13.1k | case .first: |
216 | 0 | return self.add0( |
217 | 0 | name: name, |
218 | 0 | handler: handler, |
219 | 0 | relativeContext: head!, |
220 | 0 | operation: self.add0(context:after:) Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu_ Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu_yAS_AStYbcfu0_ |
221 | 0 | ) |
222 | 13.1k | case .last: |
223 | 13.1k | return self.add0( |
224 | 13.1k | name: name, |
225 | 13.1k | handler: handler, |
226 | 13.1k | relativeContext: tail!, |
227 | 26.2k | operation: self.add0(context:before:) $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu1_ Line | Count | Source | 227 | 13.1k | operation: self.add0(context:before:) |
$s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu1_yAS_AStYbcfu2_ Line | Count | Source | 227 | 13.1k | operation: self.add0(context:before:) |
|
228 | 13.1k | ) |
229 | 13.1k | case .before(let beforeHandler): |
230 | 0 | return self.add0( |
231 | 0 | name: name, |
232 | 0 | handler: handler, |
233 | 0 | relativeHandler: beforeHandler, |
234 | 0 | operation: self.add0(context:before:) Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu3_ Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu3_yAS_AStYbcfu4_ |
235 | 0 | ) |
236 | 13.1k | case .after(let afterHandler): |
237 | 0 | return self.add0( |
238 | 0 | name: name, |
239 | 0 | handler: handler, |
240 | 0 | relativeHandler: afterHandler, |
241 | 0 | operation: self.add0(context:after:) Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu5_ Unexecuted instantiation: $s7NIOCore15ChannelPipelineC14addHandlerSync33_F5AC316541457BD146E3694279514AA3LL_4name8positions6ResultOyyts5Error_pGAA0bE0_p_SSSgAC21SynchronousOperationsV8PositionOtFyAA0bE7ContextC_AStYbcACYbcfu5_yAS_AStYbcfu6_ |
242 | 0 | ) |
243 | 13.1k | } |
244 | 13.1k | } |
245 | | |
246 | | /// Synchronously add a `ChannelHandler` to the pipeline, relative to another `ChannelHandler`, |
247 | | /// where the insertion is done by a specific operation. |
248 | | /// |
249 | | /// May only be called from on the event loop. |
250 | | /// |
251 | | /// This will search the pipeline for `relativeHandler` and, if it cannot find it, will fail |
252 | | /// `promise` with `ChannelPipelineError.notFound`. |
253 | | /// |
254 | | /// - Parameters: |
255 | | /// - name: The name to use for the `ChannelHandler` when its added. If none is specified, a name will be |
256 | | /// automatically generated. |
257 | | /// - handler: The `ChannelHandler` to add. |
258 | | /// - relativeHandler: The `ChannelHandler` already in the `ChannelPipeline` that `handler` will be |
259 | | /// inserted relative to. |
260 | | /// - operation: A callback that will insert `handler` relative to `relativeHandler`. |
261 | | /// - Returns: the result of adding this handler - either success or failure with an error code if this could not be completed. |
262 | | private func add0( |
263 | | name: String?, |
264 | | handler: ChannelHandler, |
265 | | relativeHandler: ChannelHandler, |
266 | | operation: (ChannelHandlerContext, ChannelHandlerContext) -> Void |
267 | 0 | ) -> Result<Void, Error> { |
268 | 0 | self.eventLoop.assertInEventLoop() |
269 | 0 | if self.destroyed { |
270 | 0 | return .failure(ChannelError._ioOnClosedChannel) |
271 | 0 | } |
272 | 0 |
|
273 | 0 | guard let context = self.contextForPredicate0({ $0.handler === relativeHandler }) else { |
274 | 0 | return .failure(ChannelPipelineError.notFound) |
275 | 0 | } |
276 | 0 |
|
277 | 0 | return self.add0(name: name, handler: handler, relativeContext: context, operation: operation) |
278 | 0 | } |
279 | | |
280 | | /// Synchronously add a `ChannelHandler` to the pipeline, relative to a `ChannelHandlerContext`, |
281 | | /// where the insertion is done by a specific operation. |
282 | | /// |
283 | | /// May only be called from on the event loop. |
284 | | /// |
285 | | /// This method is more efficient than the one that takes a `relativeHandler` as it does not need to |
286 | | /// search the pipeline for the insertion point. It should be used whenever possible. |
287 | | /// |
288 | | /// - Parameters: |
289 | | /// - name: The name to use for the `ChannelHandler` when its added. If none is specified, a name will be |
290 | | /// automatically generated. |
291 | | /// - handler: The `ChannelHandler` to add. |
292 | | /// - relativeContext: The `ChannelHandlerContext` already in the `ChannelPipeline` that `handler` will be |
293 | | /// inserted relative to. |
294 | | /// - operation: A callback that will insert `handler` relative to `relativeHandler`. |
295 | | /// - Returns: the result of adding this handler - either success or failure with an error code if this could not be completed. |
296 | | private func add0( |
297 | | name: String?, |
298 | | handler: ChannelHandler, |
299 | | relativeContext: ChannelHandlerContext, |
300 | | operation: (ChannelHandlerContext, ChannelHandlerContext) -> Void |
301 | 13.1k | ) -> Result<Void, Error> { |
302 | 13.1k | self.eventLoop.assertInEventLoop() |
303 | 13.1k | |
304 | 13.1k | if self.destroyed { |
305 | 0 | return .failure(ChannelError._ioOnClosedChannel) |
306 | 13.1k | } |
307 | 13.1k | |
308 | 13.1k | let context = ChannelHandlerContext(name: name ?? nextName(), handler: handler, pipeline: self) |
309 | 13.1k | operation(context, relativeContext) |
310 | 13.1k | |
311 | 13.1k | context.invokeHandlerAdded() |
312 | 13.1k | return .success(()) |
313 | 13.1k | } |
314 | | |
315 | | /// Synchronously add a single new `ChannelHandlerContext` after one that currently exists in the |
316 | | /// pipeline. |
317 | | /// |
318 | | /// Must be called from within the event loop thread, as it synchronously manipulates the |
319 | | /// `ChannelHandlerContext`s on the `ChannelPipeline`. |
320 | | /// |
321 | | /// - Parameters: |
322 | | /// - new: The `ChannelHandlerContext` to add to the pipeline. |
323 | | /// - existing: The `ChannelHandlerContext` that `new` will be added after. |
324 | 0 | private func add0(context new: ChannelHandlerContext, after existing: ChannelHandlerContext) { |
325 | 0 | self.eventLoop.assertInEventLoop() |
326 | 0 |
|
327 | 0 | let next = existing.next |
328 | 0 | new.prev = existing |
329 | 0 | new.next = next |
330 | 0 | existing.next = new |
331 | 0 | next?.prev = new |
332 | 0 | } |
333 | | |
334 | | /// Synchronously add a single new `ChannelHandlerContext` before one that currently exists in the |
335 | | /// pipeline. |
336 | | /// |
337 | | /// Must be called from within the event loop thread, as it synchronously manipulates the |
338 | | /// `ChannelHandlerContext`s on the `ChannelPipeline`. |
339 | | /// |
340 | | /// - Parameters: |
341 | | /// - new: The `ChannelHandlerContext` to add to the pipeline. |
342 | | /// - existing: The `ChannelHandlerContext` that `new` will be added before. |
343 | 13.1k | private func add0(context new: ChannelHandlerContext, before existing: ChannelHandlerContext) { |
344 | 13.1k | self.eventLoop.assertInEventLoop() |
345 | 13.1k | |
346 | 13.1k | let prev = existing.prev |
347 | 13.1k | new.prev = prev |
348 | 13.1k | new.next = existing |
349 | 13.1k | existing.prev = new |
350 | 13.1k | prev?.next = new |
351 | 13.1k | } |
352 | | |
353 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
354 | | /// |
355 | | /// - Parameters: |
356 | | /// - handler: the `ChannelHandler` to remove. |
357 | | /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. |
358 | | @preconcurrency |
359 | 0 | public func removeHandler(_ handler: RemovableChannelHandler & Sendable) -> EventLoopFuture<Void> { |
360 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
361 | 0 | self.removeHandler(handler, promise: promise) |
362 | 0 | return promise.futureResult |
363 | 0 | } |
364 | | |
365 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
366 | | /// |
367 | | /// - Parameters: |
368 | | /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. |
369 | | /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. |
370 | 0 | public func removeHandler(name: String) -> EventLoopFuture<Void> { |
371 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
372 | 0 | self.removeHandler(name: name, promise: promise) |
373 | 0 | return promise.futureResult |
374 | 0 | } |
375 | | |
376 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
377 | | /// |
378 | | /// - Parameters: |
379 | | /// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed. |
380 | | /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. |
381 | | @available( |
382 | | *, |
383 | | deprecated, |
384 | | message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe." |
385 | | ) |
386 | 0 | public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> { |
387 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
388 | 0 | self.removeHandler(context: context, promise: promise) |
389 | 0 | return promise.futureResult |
390 | 0 | } |
391 | | |
392 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
393 | | /// |
394 | | /// - Parameters: |
395 | | /// - handler: the `ChannelHandler` to remove. |
396 | | /// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed. |
397 | | @preconcurrency |
398 | 0 | public func removeHandler(_ handler: RemovableChannelHandler & Sendable, promise: EventLoopPromise<Void>?) { |
399 | 0 | @Sendable |
400 | 0 | func removeHandler0() { |
401 | 0 | self.syncOperations.removeHandler(handler, promise: promise) |
402 | 0 | } |
403 | 0 |
|
404 | 0 | if self.eventLoop.inEventLoop { |
405 | 0 | removeHandler0() |
406 | 0 | } else { |
407 | 0 | self.eventLoop.execute { |
408 | 0 | removeHandler0() |
409 | 0 | } |
410 | 0 | } |
411 | 0 | } |
412 | | |
413 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
414 | | /// |
415 | | /// - Parameters: |
416 | | /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. |
417 | | /// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed. |
418 | 0 | public func removeHandler(name: String, promise: EventLoopPromise<Void>?) { |
419 | 0 | @Sendable |
420 | 0 | func removeHandler0() { |
421 | 0 | self.syncOperations.removeHandler(name: name, promise: promise) |
422 | 0 | } |
423 | 0 |
|
424 | 0 | if self.eventLoop.inEventLoop { |
425 | 0 | removeHandler0() |
426 | 0 | } else { |
427 | 0 | self.eventLoop.execute { |
428 | 0 | removeHandler0() |
429 | 0 | } |
430 | 0 | } |
431 | 0 | } |
432 | | |
433 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
434 | | /// |
435 | | /// - Parameters: |
436 | | /// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed. |
437 | | /// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed. |
438 | | @available( |
439 | | *, |
440 | | deprecated, |
441 | | message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe." |
442 | | ) |
443 | 0 | public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) { |
444 | 0 | let sendableView = context.sendableView |
445 | 0 |
|
446 | 0 | guard sendableView.channelHandlerIsRemovable else { |
447 | 0 | promise?.fail(ChannelError._unremovableHandler) |
448 | 0 | return |
449 | 0 | } |
450 | 0 |
|
451 | 0 | @Sendable |
452 | 0 | func removeHandler0() { |
453 | 0 | sendableView.wrappedValue.startUserTriggeredRemoval(promise: promise) |
454 | 0 | } |
455 | 0 |
|
456 | 0 | if self.eventLoop.inEventLoop { |
457 | 0 | removeHandler0() |
458 | 0 | } else { |
459 | 0 | self.eventLoop.execute { |
460 | 0 | removeHandler0() |
461 | 0 | } |
462 | 0 | } |
463 | 0 | } |
464 | | |
465 | | /// Returns the `ChannelHandlerContext` that belongs to a `ChannelHandler`. |
466 | | /// |
467 | | /// - Parameters: |
468 | | /// - handler: the `ChannelHandler` for which the `ChannelHandlerContext` should be returned |
469 | | /// - Returns: the `EventLoopFuture` which will be notified once the the operation completes. |
470 | | @available( |
471 | | *, |
472 | | deprecated, |
473 | | message: "This method is not strict concurrency safe. Prefer .syncOperations.context(handler:)" |
474 | | ) |
475 | | @preconcurrency |
476 | 0 | public func context(handler: ChannelHandler & Sendable) -> EventLoopFuture<ChannelHandlerContext> { |
477 | 0 | let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) |
478 | 0 |
|
479 | 0 | if self.eventLoop.inEventLoop { |
480 | 0 | promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler)) |
481 | 0 | } else { |
482 | 0 | self.eventLoop.execute { |
483 | 0 | promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler)) |
484 | 0 | } |
485 | 0 | } |
486 | 0 |
|
487 | 0 | return promise.futureResult |
488 | 0 | } |
489 | | |
490 | | /// Synchronously returns the `ChannelHandlerContext` that belongs to a `ChannelHandler`. |
491 | | /// |
492 | | /// - Important: This must be called on the `EventLoop`. |
493 | | /// - Parameters: |
494 | | /// - handler: the `ChannelHandler` for which the `ChannelHandlerContext` should be returned |
495 | | /// - Returns: the `ChannelHandlerContext` that belongs to the `ChannelHandler`, if one exists. |
496 | 0 | fileprivate func contextSync(handler: ChannelHandler) -> Result<ChannelHandlerContext, Error> { |
497 | 0 | self._contextSync({ $0.handler === handler }) |
498 | 0 | } |
499 | | |
500 | | /// Returns the `ChannelHandlerContext` that belongs to a `ChannelHandler`. |
501 | | /// |
502 | | /// - Parameters: |
503 | | /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. |
504 | | /// - Returns: the `EventLoopFuture` which will be notified once the the operation completes. |
505 | 0 | public func context(name: String) -> EventLoopFuture<ChannelHandlerContext> { |
506 | 0 | let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) |
507 | 0 |
|
508 | 0 | if self.eventLoop.inEventLoop { |
509 | 0 | promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name)) |
510 | 0 | } else { |
511 | 0 | self.eventLoop.execute { |
512 | 0 | promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name)) |
513 | 0 | } |
514 | 0 | } |
515 | 0 |
|
516 | 0 | return promise.futureResult |
517 | 0 | } |
518 | | |
519 | | /// Synchronously finds and returns the `ChannelHandlerContext` that belongs to the |
520 | | /// `ChannelHandler` with the given name. |
521 | | /// |
522 | | /// - Important: This must be called on the `EventLoop`. |
523 | | /// - Parameter name: The name of the `ChannelHandler` to find. |
524 | | /// - Returns: the `ChannelHandlerContext` that belongs to the `ChannelHandler`, if one exists. |
525 | 0 | fileprivate func contextSync(name: String) -> Result<ChannelHandlerContext, Error> { |
526 | 0 | self._contextSync({ $0.name == name }) |
527 | 0 | } |
528 | | |
529 | | /// Returns the `ChannelHandlerContext` that belongs to a `ChannelHandler` of the given type. |
530 | | /// |
531 | | /// If multiple channel handlers of the same type are present in the pipeline, returns the context |
532 | | /// belonging to the first such handler. |
533 | | /// |
534 | | /// - Parameters: |
535 | | /// - handlerType: The type of the handler to search for. |
536 | | /// - Returns: the `EventLoopFuture` which will be notified once the the operation completes. |
537 | | @inlinable |
538 | | @preconcurrency |
539 | | public func context<Handler: ChannelHandler & _NIOCoreSendableMetatype>( |
540 | | handlerType: Handler.Type |
541 | 0 | ) -> EventLoopFuture<ChannelHandlerContext> { |
542 | 0 | let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) |
543 | 0 |
|
544 | 0 | if self.eventLoop.inEventLoop { |
545 | 0 | promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType)) |
546 | 0 | } else { |
547 | 0 | self.eventLoop.execute { |
548 | 0 | promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType)) |
549 | 0 | } |
550 | 0 | } |
551 | 0 |
|
552 | 0 | return promise.futureResult |
553 | 0 | } |
554 | | |
555 | | /// Returns if the ``ChannelHandler`` of the given type is contained in the pipeline. |
556 | | /// |
557 | | /// - Parameters: |
558 | | /// - type: The type of the handler. |
559 | | /// - Returns: An ``EventLoopFuture`` that is succeeded if a handler of the given type is contained in the pipeline. Otherwise |
560 | | /// the future will be failed with an error. |
561 | | @inlinable |
562 | | @preconcurrency |
563 | | public func containsHandler<Handler: ChannelHandler & _NIOCoreSendableMetatype>( |
564 | | type: Handler.Type |
565 | 0 | ) -> EventLoopFuture<Void> { |
566 | 0 | self.handler(type: type).map { _ in () } |
567 | 0 | } |
568 | | |
569 | | /// Returns if the ``ChannelHandler`` of the given type is contained in the pipeline. |
570 | | /// |
571 | | /// - Parameters: |
572 | | /// - name: The name of the handler. |
573 | | /// - Returns: An ``EventLoopFuture`` that is succeeded if a handler of the given type is contained in the pipeline. Otherwise |
574 | | /// the future will be failed with an error. |
575 | | @inlinable |
576 | 0 | public func containsHandler(name: String) -> EventLoopFuture<Void> { |
577 | 0 | self.context(name: name).map { _ in () } |
578 | 0 | } |
579 | | |
580 | | /// Synchronously finds and returns the `ChannelHandlerContext` that belongs to the first |
581 | | /// `ChannelHandler` of the given type. |
582 | | /// |
583 | | /// - Important: This must be called on the `EventLoop`. |
584 | | /// - Parameter handlerType: The type of handler to search for. |
585 | | /// - Returns: the `ChannelHandlerContext` that belongs to the `ChannelHandler`, if one exists. |
586 | | @inlinable // should be fileprivate |
587 | | internal func _contextSync<Handler: ChannelHandler>( |
588 | | handlerType: Handler.Type |
589 | 0 | ) -> Result<ChannelHandlerContext, Error> { |
590 | 0 | self._contextSync({ $0.handler is Handler }) |
591 | 0 | } |
592 | | |
593 | | /// Synchronously finds a `ChannelHandlerContext` in the `ChannelPipeline`. |
594 | | /// - Important: This must be called on the `EventLoop`. |
595 | | @usableFromInline // should be fileprivate |
596 | 0 | internal func _contextSync(_ body: (ChannelHandlerContext) -> Bool) -> Result<ChannelHandlerContext, Error> { |
597 | 0 | self.eventLoop.assertInEventLoop() |
598 | 0 |
|
599 | 0 | if let context = self.contextForPredicate0(body) { |
600 | 0 | return .success(context) |
601 | 0 | } else { |
602 | 0 | return .failure(ChannelPipelineError.notFound) |
603 | 0 | } |
604 | 0 | } |
605 | | |
606 | | /// Returns a `ChannelHandlerContext` which matches. |
607 | | /// |
608 | | /// This skips head and tail (as these are internal and should not be accessible by the user). |
609 | | /// |
610 | | /// - Parameters: |
611 | | /// - body: The predicate to execute per `ChannelHandlerContext` in the `ChannelPipeline`. |
612 | | /// - Returns: The first `ChannelHandlerContext` that matches or `nil` if none did. |
613 | 0 | private func contextForPredicate0(_ body: (ChannelHandlerContext) -> Bool) -> ChannelHandlerContext? { |
614 | 0 | var curCtx: ChannelHandlerContext? = self.head?.next |
615 | 0 | while let context = curCtx, context !== self.tail { |
616 | 0 | if body(context) { |
617 | 0 | return context |
618 | 0 | } |
619 | 0 | curCtx = context.next |
620 | 0 | } |
621 | 0 |
|
622 | 0 | return nil |
623 | 0 | } |
624 | | |
625 | | /// Remove a `ChannelHandlerContext` from the `ChannelPipeline` directly without going through the |
626 | | /// `RemovableChannelHandler` API. This must only be used to clear the pipeline on `Channel` tear down and |
627 | | /// as a result of the `leavePipeline` call in the `RemovableChannelHandler` API. |
628 | 18.6k | internal func removeHandlerFromPipeline(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) { |
629 | 18.6k | self.eventLoop.assertInEventLoop() |
630 | 18.6k | |
631 | 18.6k | let nextCtx = context.next |
632 | 18.6k | let prevCtx = context.prev |
633 | 18.6k | |
634 | 18.6k | if let prevCtx = prevCtx { |
635 | 12.4k | prevCtx.next = nextCtx |
636 | 12.4k | } |
637 | 18.6k | if let nextCtx = nextCtx { |
638 | 6.21k | nextCtx.prev = prevCtx |
639 | 6.21k | } |
640 | 18.6k | |
641 | 18.6k | context.invokeHandlerRemoved() |
642 | 18.6k | promise?.succeed(()) |
643 | 18.6k | |
644 | 18.6k | // We need to keep the current node alive until after the callout in case the user uses the context. |
645 | 18.6k | context.next = nil |
646 | 18.6k | context.prev = nil |
647 | 18.6k | } |
648 | | |
649 | | /// Returns the next name to use for a `ChannelHandler`. |
650 | 13.1k | private func nextName() -> String { |
651 | 13.1k | self.eventLoop.assertInEventLoop() |
652 | 13.1k | |
653 | 13.1k | let name = "handler\(idx)" |
654 | 13.1k | idx += 1 |
655 | 13.1k | return name |
656 | 13.1k | } |
657 | | |
658 | | /// Remove all the `ChannelHandler`s from the `ChannelPipeline` and destroy these. |
659 | | /// |
660 | | /// This method must only be called from within the `EventLoop`. It should only be called from a `ChannelCore` |
661 | | /// implementation. Once called, the `ChannelPipeline` is no longer active and cannot be used again. |
662 | 6.21k | func removeHandlers() { |
663 | 6.21k | self.eventLoop.assertInEventLoop() |
664 | 6.21k | |
665 | 6.21k | if let head = self.head { |
666 | 12.4k | while let context = head.next { |
667 | 12.4k | removeHandlerFromPipeline(context: context, promise: nil) |
668 | 12.4k | } |
669 | 6.21k | removeHandlerFromPipeline(context: self.head!, promise: nil) |
670 | 6.21k | } |
671 | 6.21k | self.head = nil |
672 | 6.21k | self.tail = nil |
673 | 6.21k | |
674 | 6.21k | self.destroyed = true |
675 | 6.21k | self._channel = nil |
676 | 6.21k | } |
677 | | |
678 | | // Just delegate to the head and tail context |
679 | 0 | public func fireChannelRegistered() { |
680 | 0 | if eventLoop.inEventLoop { |
681 | 0 | fireChannelRegistered0() |
682 | 0 | } else { |
683 | 0 | eventLoop.execute { |
684 | 0 | self.fireChannelRegistered0() |
685 | 0 | } |
686 | 0 | } |
687 | 0 | } |
688 | | |
689 | 0 | public func fireChannelUnregistered() { |
690 | 0 | if eventLoop.inEventLoop { |
691 | 0 | fireChannelUnregistered0() |
692 | 0 | } else { |
693 | 0 | eventLoop.execute { |
694 | 0 | self.fireChannelUnregistered0() |
695 | 0 | } |
696 | 0 | } |
697 | 0 | } |
698 | | |
699 | 0 | public func fireChannelInactive() { |
700 | 0 | if eventLoop.inEventLoop { |
701 | 0 | fireChannelInactive0() |
702 | 0 | } else { |
703 | 0 | eventLoop.execute { |
704 | 0 | self.fireChannelInactive0() |
705 | 0 | } |
706 | 0 | } |
707 | 0 | } |
708 | | |
709 | 0 | public func fireChannelActive() { |
710 | 0 | if eventLoop.inEventLoop { |
711 | 0 | fireChannelActive0() |
712 | 0 | } else { |
713 | 0 | eventLoop.execute { |
714 | 0 | self.fireChannelActive0() |
715 | 0 | } |
716 | 0 | } |
717 | 0 | } |
718 | | |
719 | | @available( |
720 | | *, |
721 | | deprecated, |
722 | | message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." |
723 | | ) |
724 | 0 | public func fireChannelRead(_ data: NIOAny) { |
725 | 0 | if eventLoop.inEventLoop { |
726 | 0 | _fireChannelRead0(data) |
727 | 0 | } else { |
728 | 0 | // This is unsafe, but necessary. |
729 | 0 | let unsafeTransfer = UnsafeTransfer(data) |
730 | 0 | eventLoop.execute { |
731 | 0 | self._fireChannelRead0(unsafeTransfer.wrappedValue) |
732 | 0 | } |
733 | 0 | } |
734 | 0 | } |
735 | | |
736 | | @inlinable |
737 | 0 | public func fireChannelRead<T: Sendable>(_ data: T) { |
738 | 0 | if eventLoop.inEventLoop { |
739 | 0 | _fireChannelRead0(NIOAny(data)) |
740 | 0 | } else { |
741 | 0 | eventLoop.execute { |
742 | 0 | self._fireChannelRead0(NIOAny(data)) |
743 | 0 | } |
744 | 0 | } |
745 | 0 | } |
746 | | |
747 | 0 | public func fireChannelReadComplete() { |
748 | 0 | if eventLoop.inEventLoop { |
749 | 0 | fireChannelReadComplete0() |
750 | 0 | } else { |
751 | 0 | eventLoop.execute { |
752 | 0 | self.fireChannelReadComplete0() |
753 | 0 | } |
754 | 0 | } |
755 | 0 | } |
756 | | |
757 | 0 | public func fireChannelWritabilityChanged() { |
758 | 0 | if eventLoop.inEventLoop { |
759 | 0 | fireChannelWritabilityChanged0() |
760 | 0 | } else { |
761 | 0 | eventLoop.execute { |
762 | 0 | self.fireChannelWritabilityChanged0() |
763 | 0 | } |
764 | 0 | } |
765 | 0 | } |
766 | | |
767 | | @preconcurrency |
768 | 0 | public func fireUserInboundEventTriggered(_ event: Any & Sendable) { |
769 | 0 | if eventLoop.inEventLoop { |
770 | 0 | fireUserInboundEventTriggered0(event) |
771 | 0 | } else { |
772 | 0 | eventLoop.execute { |
773 | 0 | self.fireUserInboundEventTriggered0(event) |
774 | 0 | } |
775 | 0 | } |
776 | 0 | } |
777 | | |
778 | 0 | public func fireErrorCaught(_ error: Error) { |
779 | 0 | if eventLoop.inEventLoop { |
780 | 0 | fireErrorCaught0(error: error) |
781 | 0 | } else { |
782 | 0 | eventLoop.execute { |
783 | 0 | self.fireErrorCaught0(error: error) |
784 | 0 | } |
785 | 0 | } |
786 | 0 | } |
787 | | |
788 | 33.8k | public func close(mode: CloseMode = .all, promise: EventLoopPromise<Void>?) { |
789 | 33.8k | if eventLoop.inEventLoop { |
790 | 33.8k | close0(mode: mode, promise: promise) |
791 | 33.8k | } else { |
792 | 0 | eventLoop.execute { |
793 | 0 | self.close0(mode: mode, promise: promise) |
794 | 0 | } |
795 | 0 | } |
796 | 33.8k | } |
797 | | |
798 | 0 | public func flush() { |
799 | 0 | if eventLoop.inEventLoop { |
800 | 0 | flush0() |
801 | 0 | } else { |
802 | 0 | eventLoop.execute { |
803 | 0 | self.flush0() |
804 | 0 | } |
805 | 0 | } |
806 | 0 | } |
807 | | |
808 | 0 | public func read() { |
809 | 0 | if eventLoop.inEventLoop { |
810 | 0 | read0() |
811 | 0 | } else { |
812 | 0 | eventLoop.execute { |
813 | 0 | self.read0() |
814 | 0 | } |
815 | 0 | } |
816 | 0 | } |
817 | | |
818 | | @available( |
819 | | *, |
820 | | deprecated, |
821 | | message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." |
822 | | ) |
823 | 0 | public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
824 | 0 | if eventLoop.inEventLoop { |
825 | 0 | _write0(data, promise: promise) |
826 | 0 | } else { |
827 | 0 | // This is unsafe, but unavoidable. |
828 | 0 | let unsafeTransfer = UnsafeTransfer(data) |
829 | 0 | eventLoop.execute { |
830 | 0 | self._write0(unsafeTransfer.wrappedValue, promise: promise) |
831 | 0 | } |
832 | 0 | } |
833 | 0 | } |
834 | | |
835 | | @inlinable |
836 | 0 | public func write<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) { |
837 | 0 | if eventLoop.inEventLoop { |
838 | 0 | _write0(NIOAny(data), promise: promise) |
839 | 0 | } else { |
840 | 0 | eventLoop.execute { |
841 | 0 | self._write0(NIOAny(data), promise: promise) |
842 | 0 | } |
843 | 0 | } |
844 | 0 | } |
845 | | |
846 | | @available( |
847 | | *, |
848 | | deprecated, |
849 | | message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." |
850 | | ) |
851 | 0 | public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
852 | 0 | if eventLoop.inEventLoop { |
853 | 0 | _writeAndFlush0(data, promise: promise) |
854 | 0 | } else { |
855 | 0 | // This is unsafe, but unavoidable. |
856 | 0 | let unsafeTransfer = UnsafeTransfer(data) |
857 | 0 | eventLoop.execute { |
858 | 0 | self._writeAndFlush0(unsafeTransfer.wrappedValue, promise: promise) |
859 | 0 | } |
860 | 0 | } |
861 | 0 | } |
862 | | |
863 | | @inlinable |
864 | 0 | public func writeAndFlush<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) { |
865 | 0 | if eventLoop.inEventLoop { |
866 | 0 | _writeAndFlush0(NIOAny(data), promise: promise) |
867 | 0 | } else { |
868 | 0 | eventLoop.execute { |
869 | 0 | self._writeAndFlush0(NIOAny(data), promise: promise) |
870 | 0 | } |
871 | 0 | } |
872 | 0 | } |
873 | | |
874 | 0 | public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
875 | 0 | if eventLoop.inEventLoop { |
876 | 0 | bind0(to: address, promise: promise) |
877 | 0 | } else { |
878 | 0 | eventLoop.execute { |
879 | 0 | self.bind0(to: address, promise: promise) |
880 | 0 | } |
881 | 0 | } |
882 | 0 | } |
883 | | |
884 | 0 | public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
885 | 0 | if eventLoop.inEventLoop { |
886 | 0 | connect0(to: address, promise: promise) |
887 | 0 | } else { |
888 | 0 | eventLoop.execute { |
889 | 0 | self.connect0(to: address, promise: promise) |
890 | 0 | } |
891 | 0 | } |
892 | 0 | } |
893 | | |
894 | 33.8k | public func register(promise: EventLoopPromise<Void>?) { |
895 | 33.8k | if eventLoop.inEventLoop { |
896 | 33.8k | register0(promise: promise) |
897 | 33.8k | } else { |
898 | 0 | eventLoop.execute { |
899 | 0 | self.register0(promise: promise) |
900 | 0 | } |
901 | 0 | } |
902 | 33.8k | } |
903 | | |
904 | | @preconcurrency |
905 | 0 | public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise<Void>?) { |
906 | 0 | if eventLoop.inEventLoop { |
907 | 0 | triggerUserOutboundEvent0(event, promise: promise) |
908 | 0 | } else { |
909 | 0 | eventLoop.execute { |
910 | 0 | self.triggerUserOutboundEvent0(event, promise: promise) |
911 | 0 | } |
912 | 0 | } |
913 | 0 | } |
914 | | |
915 | | // These methods are expected to only be called from within the EventLoop |
916 | | |
917 | 26.2k | private var firstOutboundCtx: ChannelHandlerContext? { |
918 | 26.2k | self.tail?.prev |
919 | 26.2k | } |
920 | | |
921 | 65.6k | private var firstInboundCtx: ChannelHandlerContext? { |
922 | 65.6k | self.head?.next |
923 | 65.6k | } |
924 | | |
925 | 13.1k | private func close0(mode: CloseMode, promise: EventLoopPromise<Void>?) { |
926 | 13.1k | if let firstOutboundCtx = firstOutboundCtx { |
927 | 13.1k | firstOutboundCtx.invokeClose(mode: mode, promise: promise) |
928 | 13.1k | } else { |
929 | 0 | promise?.fail(ChannelError._alreadyClosed) |
930 | 0 | } |
931 | 13.1k | } |
932 | | |
933 | 0 | private func flush0() { |
934 | 0 | if let firstOutboundCtx = firstOutboundCtx { |
935 | 0 | firstOutboundCtx.invokeFlush() |
936 | 0 | } |
937 | 0 | } |
938 | | |
939 | 0 | private func read0() { |
940 | 0 | if let firstOutboundCtx = firstOutboundCtx { |
941 | 0 | firstOutboundCtx.invokeRead() |
942 | 0 | } |
943 | 0 | } |
944 | | |
945 | 0 | @usableFromInline func _write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
946 | 0 | if let firstOutboundCtx = firstOutboundCtx { |
947 | 0 | firstOutboundCtx.invokeWrite(data, promise: promise) |
948 | 0 | } else { |
949 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
950 | 0 | } |
951 | 0 | } |
952 | | |
953 | 0 | @usableFromInline func _writeAndFlush0(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
954 | 0 | if let firstOutboundCtx = firstOutboundCtx { |
955 | 0 | firstOutboundCtx.invokeWriteAndFlush(data, promise: promise) |
956 | 0 | } else { |
957 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
958 | 0 | } |
959 | 0 | } |
960 | | |
961 | 0 | private func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
962 | 0 | if let firstOutboundCtx = firstOutboundCtx { |
963 | 0 | firstOutboundCtx.invokeBind(to: address, promise: promise) |
964 | 0 | } else { |
965 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
966 | 0 | } |
967 | 0 | } |
968 | | |
969 | 0 | private func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
970 | 0 | if let firstOutboundCtx = firstOutboundCtx { |
971 | 0 | firstOutboundCtx.invokeConnect(to: address, promise: promise) |
972 | 0 | } else { |
973 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
974 | 0 | } |
975 | 0 | } |
976 | | |
977 | 13.1k | private func register0(promise: EventLoopPromise<Void>?) { |
978 | 13.1k | if let firstOutboundCtx = firstOutboundCtx { |
979 | 13.1k | firstOutboundCtx.invokeRegister(promise: promise) |
980 | 13.1k | } else { |
981 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
982 | 0 | } |
983 | 13.1k | } |
984 | | |
985 | 0 | private func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) { |
986 | 0 | if let firstOutboundCtx = firstOutboundCtx { |
987 | 0 | firstOutboundCtx.invokeTriggerUserOutboundEvent(event, promise: promise) |
988 | 0 | } else { |
989 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
990 | 0 | } |
991 | 0 | } |
992 | | |
993 | 13.1k | private func fireChannelRegistered0() { |
994 | 13.1k | if let firstInboundCtx = firstInboundCtx { |
995 | 13.1k | firstInboundCtx.invokeChannelRegistered() |
996 | 13.1k | } |
997 | 13.1k | } |
998 | | |
999 | 13.1k | private func fireChannelUnregistered0() { |
1000 | 13.1k | if let firstInboundCtx = firstInboundCtx { |
1001 | 13.1k | firstInboundCtx.invokeChannelUnregistered() |
1002 | 13.1k | } |
1003 | 13.1k | } |
1004 | | |
1005 | 13.1k | private func fireChannelInactive0() { |
1006 | 13.1k | if let firstInboundCtx = firstInboundCtx { |
1007 | 13.1k | firstInboundCtx.invokeChannelInactive() |
1008 | 13.1k | } |
1009 | 13.1k | } |
1010 | | |
1011 | 0 | private func fireChannelActive0() { |
1012 | 0 | if let firstInboundCtx = firstInboundCtx { |
1013 | 0 | firstInboundCtx.invokeChannelActive() |
1014 | 0 | } |
1015 | 0 | } |
1016 | | |
1017 | 13.1k | @usableFromInline func _fireChannelRead0(_ data: NIOAny) { |
1018 | 13.1k | if let firstInboundCtx = firstInboundCtx { |
1019 | 13.1k | firstInboundCtx.invokeChannelRead(data) |
1020 | 13.1k | } |
1021 | 13.1k | } |
1022 | | |
1023 | 13.1k | private func fireChannelReadComplete0() { |
1024 | 13.1k | if let firstInboundCtx = firstInboundCtx { |
1025 | 13.1k | firstInboundCtx.invokeChannelReadComplete() |
1026 | 13.1k | } |
1027 | 13.1k | } |
1028 | | |
1029 | 0 | private func fireChannelWritabilityChanged0() { |
1030 | 0 | if let firstInboundCtx = firstInboundCtx { |
1031 | 0 | firstInboundCtx.invokeChannelWritabilityChanged() |
1032 | 0 | } |
1033 | 0 | } |
1034 | | |
1035 | 0 | private func fireUserInboundEventTriggered0(_ event: Any) { |
1036 | 0 | if let firstInboundCtx = firstInboundCtx { |
1037 | 0 | firstInboundCtx.invokeUserInboundEventTriggered(event) |
1038 | 0 | } |
1039 | 0 | } |
1040 | | |
1041 | 0 | private func fireErrorCaught0(error: Error) { |
1042 | 0 | assert((error as? ChannelError).map { $0 != .eof } ?? true) |
1043 | 0 | if let firstInboundCtx = firstInboundCtx { |
1044 | 0 | firstInboundCtx.invokeErrorCaught(error) |
1045 | 0 | } |
1046 | 0 | } |
1047 | | |
1048 | 0 | private var inEventLoop: Bool { |
1049 | 0 | eventLoop.inEventLoop |
1050 | 0 | } |
1051 | | |
1052 | | /// Create `ChannelPipeline` for a given `Channel`. This method should never be called by the end-user |
1053 | | /// directly: it is only intended for use with custom `Channel` implementations. Users should always use |
1054 | | /// `channel.pipeline` to access the `ChannelPipeline` for a `Channel`. |
1055 | | /// |
1056 | | /// - Parameters: |
1057 | | /// - channel: The `Channel` this `ChannelPipeline` is created for. |
1058 | 13.1k | public init(channel: Channel) { |
1059 | 13.1k | self._channel = channel |
1060 | 13.1k | self.eventLoop = channel.eventLoop |
1061 | 13.1k | self.head = nil // we need to initialise these to `nil` so we can use `self` in the lines below |
1062 | 13.1k | self.tail = nil // we need to initialise these to `nil` so we can use `self` in the lines below |
1063 | 13.1k | |
1064 | 13.1k | self.head = ChannelHandlerContext( |
1065 | 13.1k | name: HeadChannelHandler.name, |
1066 | 13.1k | handler: HeadChannelHandler.sharedInstance, |
1067 | 13.1k | pipeline: self |
1068 | 13.1k | ) |
1069 | 13.1k | self.tail = ChannelHandlerContext( |
1070 | 13.1k | name: TailChannelHandler.name, |
1071 | 13.1k | handler: TailChannelHandler.sharedInstance, |
1072 | 13.1k | pipeline: self |
1073 | 13.1k | ) |
1074 | 13.1k | self.head?.next = self.tail |
1075 | 13.1k | self.tail?.prev = self.head |
1076 | 13.1k | } |
1077 | | } |
1078 | | |
1079 | | extension ChannelPipeline: @unchecked Sendable {} |
1080 | | |
1081 | | extension ChannelPipeline { |
1082 | | /// Adds the provided channel handlers to the pipeline in the order given, taking account |
1083 | | /// of the behaviour of `ChannelHandler.add(first:)`. |
1084 | | /// |
1085 | | /// - Parameters: |
1086 | | /// - handlers: The array of `ChannelHandler`s to be added. |
1087 | | /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. |
1088 | | /// |
1089 | | /// - Returns: A future that will be completed when all of the supplied `ChannelHandler`s were added. |
1090 | | @preconcurrency |
1091 | | public func addHandlers( |
1092 | | _ handlers: [ChannelHandler & Sendable], |
1093 | | position: ChannelPipeline.Position = .last |
1094 | 0 | ) -> EventLoopFuture<Void> { |
1095 | 0 | let future: EventLoopFuture<Void> |
1096 | 0 |
|
1097 | 0 | if self.eventLoop.inEventLoop { |
1098 | 0 | future = self.eventLoop.makeCompletedFuture(self.addHandlersSync(handlers, position: position)) |
1099 | 0 | } else { |
1100 | 0 | future = self.eventLoop.submit { |
1101 | 0 | try self.addHandlersSync(handlers, position: position).get() |
1102 | 0 | } |
1103 | 0 | } |
1104 | 0 |
|
1105 | 0 | return future |
1106 | 0 | } |
1107 | | |
1108 | | /// Adds the provided channel handlers to the pipeline in the order given, taking account |
1109 | | /// of the behaviour of `ChannelHandler.add(first:)`. |
1110 | | /// |
1111 | | /// - Parameters: |
1112 | | /// - handlers: One or more `ChannelHandler`s to be added. |
1113 | | /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. |
1114 | | /// |
1115 | | /// - Returns: A future that will be completed when all of the supplied `ChannelHandler`s were added. |
1116 | | @preconcurrency |
1117 | | public func addHandlers( |
1118 | | _ handlers: (ChannelHandler & Sendable)..., |
1119 | | position: ChannelPipeline.Position = .last |
1120 | 0 | ) -> EventLoopFuture<Void> { |
1121 | 0 | self.addHandlers(handlers, position: position) |
1122 | 0 | } |
1123 | | |
1124 | | /// Synchronously adds the provided `ChannelHandler`s to the pipeline in the order given, taking |
1125 | | /// account of the behaviour of `ChannelHandler.add(first:)`. |
1126 | | /// |
1127 | | /// - Important: Must be called on the `EventLoop`. |
1128 | | /// - Parameters: |
1129 | | /// - handlers: The array of `ChannelHandler`s to add. |
1130 | | /// - position: The position in the `ChannelPipeline` to add the handlers. |
1131 | | /// - Returns: A result representing whether the handlers were added or not. |
1132 | | fileprivate func addHandlersSync( |
1133 | | _ handlers: [ChannelHandler & Sendable], |
1134 | | position: ChannelPipeline.Position |
1135 | 0 | ) -> Result<Void, Error> { |
1136 | 0 | let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) |
1137 | 0 | switch syncPosition { |
1138 | 0 | case .first, .after: |
1139 | 0 | return self._addHandlersSync(handlers.reversed(), position: syncPosition) |
1140 | 0 | case .last, .before: |
1141 | 0 | return self._addHandlersSync(handlers, position: syncPosition) |
1142 | 0 | } |
1143 | 0 | } |
1144 | | |
1145 | | /// Synchronously adds the provided `ChannelHandler`s to the pipeline in the order given, taking |
1146 | | /// account of the behaviour of `ChannelHandler.add(first:)`. |
1147 | | /// |
1148 | | /// This duplicate of the above method exists to avoid needing to rebox the array of existentials |
1149 | | /// from any (ChannelHandler & Sendable) to any ChannelHandler. |
1150 | | /// |
1151 | | /// - Important: Must be called on the `EventLoop`. |
1152 | | /// - Parameters: |
1153 | | /// - handlers: The array of `ChannelHandler`s to add. |
1154 | | /// - position: The position in the `ChannelPipeline` to add the handlers. |
1155 | | /// - Returns: A result representing whether the handlers were added or not. |
1156 | | fileprivate func addHandlersSyncNotSendable( |
1157 | | _ handlers: [ChannelHandler], |
1158 | | position: ChannelPipeline.SynchronousOperations.Position |
1159 | 13.1k | ) -> Result<Void, Error> { |
1160 | 13.1k | switch position { |
1161 | 13.1k | case .first, .after: |
1162 | 0 | return self._addHandlersSyncNotSendable(handlers.reversed(), position: position) |
1163 | 13.1k | case .last, .before: |
1164 | 13.1k | return self._addHandlersSyncNotSendable(handlers, position: position) |
1165 | 13.1k | } |
1166 | 13.1k | } |
1167 | | |
1168 | | /// Synchronously adds a sequence of `ChannelHandlers` to the pipeline at the given position. |
1169 | | /// |
1170 | | /// - Important: Must be called on the `EventLoop`. |
1171 | | /// - Parameters: |
1172 | | /// - handlers: A sequence of handlers to add. |
1173 | | /// - position: The position in the `ChannelPipeline` to add the handlers. |
1174 | | /// - Returns: A result representing whether the handlers were added or not. |
1175 | | private func _addHandlersSync<Handlers: Sequence>( |
1176 | | _ handlers: Handlers, |
1177 | | position: ChannelPipeline.SynchronousOperations.Position |
1178 | 0 | ) -> Result<Void, Error> where Handlers.Element == ChannelHandler & Sendable { |
1179 | 0 | self.eventLoop.assertInEventLoop() |
1180 | 0 |
|
1181 | 0 | for handler in handlers { |
1182 | 0 | let result = self.addHandlerSync(handler, position: position) |
1183 | 0 | switch result { |
1184 | 0 | case .success: |
1185 | 0 | () |
1186 | 0 | case .failure: |
1187 | 0 | return result |
1188 | 0 | } |
1189 | 0 | } |
1190 | 0 |
|
1191 | 0 | return .success(()) |
1192 | 0 | } |
1193 | | |
1194 | | /// Synchronously adds a sequence of `ChannelHandlers` to the pipeline at the given position. |
1195 | | /// |
1196 | | /// This duplicate of the above method exists to avoid needing to rebox the array of existentials |
1197 | | /// from any (ChannelHandler & Sendable) to any ChannelHandler. |
1198 | | /// |
1199 | | /// - Important: Must be called on the `EventLoop`. |
1200 | | /// - Parameters: |
1201 | | /// - handlers: A sequence of handlers to add. |
1202 | | /// - position: The position in the `ChannelPipeline` to add the handlers. |
1203 | | /// - Returns: A result representing whether the handlers were added or not. |
1204 | | private func _addHandlersSyncNotSendable<Handlers: Sequence>( |
1205 | | _ handlers: Handlers, |
1206 | | position: ChannelPipeline.SynchronousOperations.Position |
1207 | 13.1k | ) -> Result<Void, Error> where Handlers.Element == ChannelHandler { |
1208 | 13.1k | self.eventLoop.assertInEventLoop() |
1209 | 13.1k | |
1210 | 13.1k | for handler in handlers { |
1211 | 0 | let result = self.addHandlerSync(handler, position: position) |
1212 | 0 | switch result { |
1213 | 0 | case .success: |
1214 | 0 | () |
1215 | 0 | case .failure: |
1216 | 0 | return result |
1217 | 0 | } |
1218 | 13.1k | } |
1219 | 13.1k | |
1220 | 13.1k | return .success(()) |
1221 | 13.1k | } |
1222 | | } |
1223 | | |
1224 | | // MARK: - Synchronous View |
1225 | | |
1226 | | extension ChannelPipeline { |
1227 | | /// A view of a `ChannelPipeline` which may be used to invoke synchronous operations. |
1228 | | /// |
1229 | | /// All functions **must** be called from the pipeline's event loop. |
1230 | | public struct SynchronousOperations { |
1231 | | @usableFromInline |
1232 | | internal let _pipeline: ChannelPipeline |
1233 | | |
1234 | 10.6M | fileprivate init(pipeline: ChannelPipeline) { |
1235 | 10.6M | self._pipeline = pipeline |
1236 | 10.6M | } |
1237 | | |
1238 | | /// The `EventLoop` of the `Channel` this synchronous operations view corresponds to. |
1239 | 583k | public var eventLoop: EventLoop { |
1240 | 583k | self._pipeline.eventLoop |
1241 | 583k | } |
1242 | | |
1243 | | /// Add a handler to the pipeline. |
1244 | | /// |
1245 | | /// - Important: This *must* be called on the event loop. |
1246 | | /// - Parameters: |
1247 | | /// - handler: The handler to add. |
1248 | | /// - name: The name to use for the `ChannelHandler` when it's added. If no name is specified the one will be generated. |
1249 | | /// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`. |
1250 | | public func addHandler( |
1251 | | _ handler: ChannelHandler, |
1252 | | name: String? = nil, |
1253 | | position: ChannelPipeline.SynchronousOperations.Position = .last |
1254 | 0 | ) throws { |
1255 | 0 | try self._pipeline.addHandlerSync(handler, name: name, position: position).get() |
1256 | 0 | } |
1257 | | |
1258 | | /// Add a handler to the pipeline. |
1259 | | /// |
1260 | | /// - Important: This *must* be called on the event loop. |
1261 | | /// - Parameters: |
1262 | | /// - handler: The handler to add. |
1263 | | /// - name: The name to use for the `ChannelHandler` when it's added. If no name is specified the one will be generated. |
1264 | | /// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`. |
1265 | | @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") |
1266 | | @_disfavoredOverload |
1267 | | public func addHandler( |
1268 | | _ handler: ChannelHandler, |
1269 | | name: String? = nil, |
1270 | | position: ChannelPipeline.Position = .last |
1271 | 0 | ) throws { |
1272 | 0 | let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) |
1273 | 0 | try self._pipeline.addHandlerSync(handler, name: name, position: syncPosition).get() |
1274 | 0 | } |
1275 | | |
1276 | | /// Add an array of handlers to the pipeline. |
1277 | | /// |
1278 | | /// - Important: This *must* be called on the event loop. |
1279 | | /// - Parameters: |
1280 | | /// - handlers: The handlers to add. |
1281 | | /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. |
1282 | | public func addHandlers( |
1283 | | _ handlers: [ChannelHandler], |
1284 | | position: ChannelPipeline.SynchronousOperations.Position = .last |
1285 | 13.1k | ) throws { |
1286 | 13.1k | try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get() |
1287 | 13.1k | } |
1288 | | |
1289 | | /// Add an array of handlers to the pipeline. |
1290 | | /// |
1291 | | /// - Important: This *must* be called on the event loop. |
1292 | | /// - Parameters: |
1293 | | /// - handlers: The handlers to add. |
1294 | | /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. |
1295 | | @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") |
1296 | | @_disfavoredOverload |
1297 | | public func addHandlers( |
1298 | | _ handlers: [ChannelHandler], |
1299 | | position: ChannelPipeline.Position = .last |
1300 | 0 | ) throws { |
1301 | 0 | let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) |
1302 | 0 | try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get() |
1303 | 0 | } |
1304 | | |
1305 | | /// Add one or more handlers to the pipeline. |
1306 | | /// |
1307 | | /// - Important: This *must* be called on the event loop. |
1308 | | /// - Parameters: |
1309 | | /// - handlers: The handlers to add. |
1310 | | /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. |
1311 | | public func addHandlers( |
1312 | | _ handlers: ChannelHandler..., |
1313 | | position: ChannelPipeline.SynchronousOperations.Position = .last |
1314 | 0 | ) throws { |
1315 | 0 | try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get() |
1316 | 0 | } |
1317 | | |
1318 | | /// Add one or more handlers to the pipeline. |
1319 | | /// |
1320 | | /// - Important: This *must* be called on the event loop. |
1321 | | /// - Parameters: |
1322 | | /// - handlers: The handlers to add. |
1323 | | /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. |
1324 | | @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") |
1325 | | @_disfavoredOverload |
1326 | | public func addHandlers( |
1327 | | _ handlers: ChannelHandler..., |
1328 | | position: ChannelPipeline.Position = .last |
1329 | 0 | ) throws { |
1330 | 0 | let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) |
1331 | 0 | try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get() |
1332 | 0 | } |
1333 | | |
1334 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
1335 | | /// |
1336 | | /// - Parameters: |
1337 | | /// - handler: the `ChannelHandler` to remove. |
1338 | | /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. |
1339 | 0 | public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture<Void> { |
1340 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
1341 | 0 | self.removeHandler(handler, promise: promise) |
1342 | 0 | return promise.futureResult |
1343 | 0 | } |
1344 | | |
1345 | | /// Remove a ``ChannelHandler`` from the ``ChannelPipeline``. |
1346 | | /// |
1347 | | /// - Parameters: |
1348 | | /// - handler: the ``ChannelHandler`` to remove. |
1349 | | /// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed. |
1350 | 0 | public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise<Void>?) { |
1351 | 0 | switch self._pipeline.contextSync(handler: handler) { |
1352 | 0 | case .success(let context): |
1353 | 0 | self.removeHandler(context: context, promise: promise) |
1354 | 0 | case .failure(let error): |
1355 | 0 | promise?.fail(error) |
1356 | 0 | } |
1357 | 0 | } |
1358 | | |
1359 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
1360 | | /// |
1361 | | /// - Parameters: |
1362 | | /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. |
1363 | | /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. |
1364 | 0 | public func removeHandler(name: String) -> EventLoopFuture<Void> { |
1365 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
1366 | 0 | self.removeHandler(name: name, promise: promise) |
1367 | 0 | return promise.futureResult |
1368 | 0 | } |
1369 | | |
1370 | | /// Remove a ``ChannelHandler`` from the ``ChannelPipeline``. |
1371 | | /// |
1372 | | /// - Parameters: |
1373 | | /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. |
1374 | | /// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed. |
1375 | 0 | public func removeHandler(name: String, promise: EventLoopPromise<Void>?) { |
1376 | 0 | switch self._pipeline.contextSync(name: name) { |
1377 | 0 | case .success(let context): |
1378 | 0 | self.removeHandler(context: context, promise: promise) |
1379 | 0 | case .failure(let error): |
1380 | 0 | promise?.fail(error) |
1381 | 0 | } |
1382 | 0 | } |
1383 | | |
1384 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
1385 | | /// |
1386 | | /// - Parameters: |
1387 | | /// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed. |
1388 | | /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. |
1389 | 0 | public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> { |
1390 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
1391 | 0 | self.removeHandler(context: context, promise: promise) |
1392 | 0 | return promise.futureResult |
1393 | 0 | } |
1394 | | |
1395 | | /// Remove a `ChannelHandler` from the `ChannelPipeline`. |
1396 | | /// |
1397 | | /// - Parameters: |
1398 | | /// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed. |
1399 | | /// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed. |
1400 | 0 | public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) { |
1401 | 0 | if context.handler is RemovableChannelHandler { |
1402 | 0 | context.startUserTriggeredRemoval(promise: promise) |
1403 | 0 | } else { |
1404 | 0 | promise?.fail(ChannelError.unremovableHandler) |
1405 | 0 | } |
1406 | 0 | } |
1407 | | |
1408 | | /// Returns the `ChannelHandlerContext` for the given handler instance if it is in |
1409 | | /// the `ChannelPipeline`, if it exists. |
1410 | | /// |
1411 | | /// - Important: This *must* be called on the event loop. |
1412 | | /// - Parameter handler: The handler belonging to the context to fetch. |
1413 | | /// - Returns: The `ChannelHandlerContext` associated with the handler. |
1414 | 0 | public func context(handler: ChannelHandler) throws -> ChannelHandlerContext { |
1415 | 0 | try self._pipeline._contextSync({ $0.handler === handler }).get() |
1416 | 0 | } |
1417 | | |
1418 | | /// Returns the `ChannelHandlerContext` for the handler with the given name, if one exists. |
1419 | | /// |
1420 | | /// - Important: This *must* be called on the event loop. |
1421 | | /// - Parameter name: The name of the handler whose context is being fetched. |
1422 | | /// - Returns: The `ChannelHandlerContext` associated with the handler. |
1423 | 0 | public func context(name: String) throws -> ChannelHandlerContext { |
1424 | 0 | try self._pipeline.contextSync(name: name).get() |
1425 | 0 | } |
1426 | | |
1427 | | /// Returns the `ChannelHandlerContext` for the handler of given type, if one exists. |
1428 | | /// |
1429 | | /// - Important: This *must* be called on the event loop. |
1430 | | /// - Parameter handlerType: The type of the handler to search for. |
1431 | | /// - Returns: The `ChannelHandlerContext` associated with the handler. |
1432 | | @inlinable |
1433 | 0 | public func context<Handler: ChannelHandler>(handlerType: Handler.Type) throws -> ChannelHandlerContext { |
1434 | 0 | try self._pipeline._contextSync(handlerType: handlerType).get() |
1435 | 0 | } |
1436 | | |
1437 | | /// Returns the `ChannelHandler` of the given type from the `ChannelPipeline`, if it exists. |
1438 | | /// |
1439 | | /// - Important: This *must* be called on the event loop. |
1440 | | /// - Returns: A `ChannelHandler` of the given type if one exists in the `ChannelPipeline`. |
1441 | | @inlinable |
1442 | 0 | public func handler<Handler: ChannelHandler>(type _: Handler.Type) throws -> Handler { |
1443 | 0 | try self._pipeline._handlerSync(type: Handler.self).get() |
1444 | 0 | } |
1445 | | |
1446 | | /// Fires `channelRegistered` from the head to the tail. |
1447 | | /// |
1448 | | /// This method should typically only be called by `Channel` implementations directly. |
1449 | 13.1k | public func fireChannelRegistered() { |
1450 | 13.1k | self.eventLoop.assertInEventLoop() |
1451 | 13.1k | self._pipeline.fireChannelRegistered0() |
1452 | 13.1k | } |
1453 | | |
1454 | | /// Fires `channelUnregistered` from the head to the tail. |
1455 | | /// |
1456 | | /// This method should typically only be called by `Channel` implementations directly. |
1457 | 13.1k | public func fireChannelUnregistered() { |
1458 | 13.1k | self.eventLoop.assertInEventLoop() |
1459 | 13.1k | self._pipeline.fireChannelUnregistered0() |
1460 | 13.1k | } |
1461 | | |
1462 | | /// Fires `channelInactive` from the head to the tail. |
1463 | | /// |
1464 | | /// This method should typically only be called by `Channel` implementations directly. |
1465 | 13.1k | public func fireChannelInactive() { |
1466 | 13.1k | self.eventLoop.assertInEventLoop() |
1467 | 13.1k | self._pipeline.fireChannelInactive0() |
1468 | 13.1k | } |
1469 | | |
1470 | | /// Fires `channelActive` from the head to the tail. |
1471 | | /// |
1472 | | /// This method should typically only be called by `Channel` implementations directly. |
1473 | 0 | public func fireChannelActive() { |
1474 | 0 | self.eventLoop.assertInEventLoop() |
1475 | 0 | self._pipeline.fireChannelActive0() |
1476 | 0 | } |
1477 | | |
1478 | | /// Fires `channelRead` from the head to the tail. |
1479 | | /// |
1480 | | /// This method should typically only be called by `Channel` implementations directly. |
1481 | 13.1k | public func fireChannelRead(_ data: NIOAny) { |
1482 | 13.1k | self.eventLoop.assertInEventLoop() |
1483 | 13.1k | self._pipeline._fireChannelRead0(data) |
1484 | 13.1k | } |
1485 | | |
1486 | | /// Fires `channelReadComplete` from the head to the tail. |
1487 | | /// |
1488 | | /// This method should typically only be called by `Channel` implementations directly. |
1489 | 13.1k | public func fireChannelReadComplete() { |
1490 | 13.1k | self.eventLoop.assertInEventLoop() |
1491 | 13.1k | self._pipeline.fireChannelReadComplete0() |
1492 | 13.1k | } |
1493 | | |
1494 | | /// Fires `channelWritabilityChanged` from the head to the tail. |
1495 | | /// |
1496 | | /// This method should typically only be called by `Channel` implementations directly. |
1497 | 0 | public func fireChannelWritabilityChanged() { |
1498 | 0 | self.eventLoop.assertInEventLoop() |
1499 | 0 | self._pipeline.fireChannelWritabilityChanged0() |
1500 | 0 | } |
1501 | | |
1502 | | /// Fires `userInboundEventTriggered` from the head to the tail. |
1503 | | /// |
1504 | | /// This method should typically only be called by `Channel` implementations directly. |
1505 | 0 | public func fireUserInboundEventTriggered(_ event: Any) { |
1506 | 0 | self.eventLoop.assertInEventLoop() |
1507 | 0 | self._pipeline.fireUserInboundEventTriggered0(event) |
1508 | 0 | } |
1509 | | |
1510 | | /// Fires `errorCaught` from the head to the tail. |
1511 | | /// |
1512 | | /// This method should typically only be called by `Channel` implementations directly. |
1513 | 0 | public func fireErrorCaught(_ error: Error) { |
1514 | 0 | self.eventLoop.assertInEventLoop() |
1515 | 0 | self._pipeline.fireErrorCaught0(error: error) |
1516 | 0 | } |
1517 | | |
1518 | | /// Fires `close` from the tail to the head. |
1519 | | /// |
1520 | | /// This method should typically only be called by `Channel` implementations directly. |
1521 | 0 | public func close(mode: CloseMode = .all, promise: EventLoopPromise<Void>?) { |
1522 | 0 | self.eventLoop.assertInEventLoop() |
1523 | 0 | self._pipeline.close0(mode: mode, promise: promise) |
1524 | 0 | } |
1525 | | |
1526 | | /// Fires `flush` from the tail to the head. |
1527 | | /// |
1528 | | /// This method should typically only be called by `Channel` implementations directly. |
1529 | 0 | public func flush() { |
1530 | 0 | self.eventLoop.assertInEventLoop() |
1531 | 0 | self._pipeline.flush0() |
1532 | 0 | } |
1533 | | |
1534 | | /// Fires `read` from the tail to the head. |
1535 | | /// |
1536 | | /// This method should typically only be called by `Channel` implementations directly. |
1537 | 0 | public func read() { |
1538 | 0 | self.eventLoop.assertInEventLoop() |
1539 | 0 | self._pipeline.read0() |
1540 | 0 | } |
1541 | | |
1542 | | /// Fires `write` from the tail to the head. |
1543 | | /// |
1544 | | /// This method should typically only be called by `Channel` implementations directly. |
1545 | 0 | public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
1546 | 0 | self.eventLoop.assertInEventLoop() |
1547 | 0 | self._pipeline._write0(data, promise: promise) |
1548 | 0 | } |
1549 | | |
1550 | | /// Fires `write` from the tail to the head. |
1551 | | /// |
1552 | | /// This method should typically only be called by `Channel` implementations directly. |
1553 | 0 | public func write(_ data: NIOAny) -> EventLoopFuture<Void> { |
1554 | 0 | self.eventLoop.assertInEventLoop() |
1555 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
1556 | 0 | self._pipeline._write0(data, promise: promise) |
1557 | 0 | return promise.futureResult |
1558 | 0 | } |
1559 | | |
1560 | | /// Fires `writeAndFlush` from the tail to the head. |
1561 | | /// |
1562 | | /// This method should typically only be called by `Channel` implementations directly. |
1563 | 0 | public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
1564 | 0 | self.eventLoop.assertInEventLoop() |
1565 | 0 | self._pipeline._writeAndFlush0(data, promise: promise) |
1566 | 0 | } |
1567 | | |
1568 | | /// Fires `writeAndFlush` from the tail to the head. |
1569 | | /// |
1570 | | /// This method should typically only be called by `Channel` implementations directly. |
1571 | 0 | public func writeAndFlush(_ data: NIOAny) -> EventLoopFuture<Void> { |
1572 | 0 | self.eventLoop.assertInEventLoop() |
1573 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
1574 | 0 | self._pipeline._writeAndFlush0(data, promise: promise) |
1575 | 0 | return promise.futureResult |
1576 | 0 | } |
1577 | | |
1578 | | /// Fires `bind` from the tail to the head. |
1579 | | /// |
1580 | | /// This method should typically only be called by `Channel` implementations directly. |
1581 | 0 | public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1582 | 0 | self.eventLoop.assertInEventLoop() |
1583 | 0 | self._pipeline.bind0(to: address, promise: promise) |
1584 | 0 | } |
1585 | | |
1586 | | /// Fires `connect` from the tail to the head. |
1587 | | /// |
1588 | | /// This method should typically only be called by `Channel` implementations directly. |
1589 | 0 | public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1590 | 0 | self.eventLoop.assertInEventLoop() |
1591 | 0 | self._pipeline.connect0(to: address, promise: promise) |
1592 | 0 | } |
1593 | | |
1594 | | /// Fires `register` from the tail to the head. |
1595 | | /// |
1596 | | /// This method should typically only be called by `Channel` implementations directly. |
1597 | 0 | public func register(promise: EventLoopPromise<Void>?) { |
1598 | 0 | self.eventLoop.assertInEventLoop() |
1599 | 0 | self._pipeline.register0(promise: promise) |
1600 | 0 | } |
1601 | | |
1602 | | /// Fires `triggerUserOutboundEvent` from the tail to the head. |
1603 | | /// |
1604 | | /// This method should typically only be called by `Channel` implementations directly. |
1605 | 0 | public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) { |
1606 | 0 | self.eventLoop.assertInEventLoop() |
1607 | 0 | self._pipeline.triggerUserOutboundEvent0(event, promise: promise) |
1608 | 0 | } |
1609 | | |
1610 | | /// Provides scoped access to the underlying transport, if the channel supports it. |
1611 | | /// |
1612 | | /// This is an advanced API for reading or manipulating the underlying transport that backs a channel. Users |
1613 | | /// must not close the transport or invalidate any invariants that the channel relies upon for its operation. |
1614 | | /// |
1615 | | /// Not all channels support access to the underlying channel. If the channel does not support this API, the |
1616 | | /// closure is not called and this function immediately returns `nil`. |
1617 | | /// |
1618 | | /// Note that you must call this API with an appropriate closure, or otherwise explicitly specify the correct |
1619 | | /// transport type prarameter, in order for the closure to be run. Calling this function such that the compiler |
1620 | | /// infers a type for the transport closure parameter that differs from the channel implementation will result |
1621 | | /// in the closure not being run and this function will return `nil`. |
1622 | | /// |
1623 | | /// For example, for socket-based channels, that expose the underlying socket handle: |
1624 | | /// |
1625 | | /// ```swift |
1626 | | /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable { transport in |
1627 | | /// // This closure is called. |
1628 | | /// transport == NIOBSDSocketHandle.invalid |
1629 | | /// } |
1630 | | /// |
1631 | | /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable { (_: NIOBSDSocket.Handle) in |
1632 | | /// // This closure is called. |
1633 | | /// return |
1634 | | /// } |
1635 | | /// |
1636 | | /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable(of: NIOBSDSocket.Handle.self) { _ in |
1637 | | /// // This closure is called. |
1638 | | /// return |
1639 | | /// } |
1640 | | /// |
1641 | | /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable { |
1642 | | /// // This closure is NOT called. |
1643 | | /// return |
1644 | | /// } |
1645 | | /// |
1646 | | /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable { (_: Any.self) in |
1647 | | /// // This closure is NOT called. |
1648 | | /// return |
1649 | | /// } |
1650 | | /// |
1651 | | /// try channel.pipeline.syncOperations.withUnsafeTransportIfAvailable(of: Any.self) { _ in |
1652 | | /// // This closure is NOT called. |
1653 | | /// return |
1654 | | /// } |
1655 | | /// ``` |
1656 | | /// |
1657 | | /// - Parameters: |
1658 | | /// - type: The expected transport type the channel makes available. |
1659 | | /// - body: /// A closure that takes the underlying transport, if the channel supports this operation. |
1660 | | /// - Returns: The value returned by the closure, or `nil` if the channel does not expose its transport. |
1661 | | /// - Throws: If there was an error accessing the underlying transport, or an error was thrown by the closure. |
1662 | | @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) |
1663 | | @inlinable |
1664 | | public func withUnsafeTransportIfAvailable<Transport, Result>( |
1665 | | of type: Transport.Type = Transport.self, |
1666 | | _ body: (_ transport: Transport) throws -> Result |
1667 | 0 | ) throws -> Result? { |
1668 | 0 | self.eventLoop.assertInEventLoop() |
1669 | 0 | guard let core = self._pipeline.channel._channelCore as? any NIOTransportAccessibleChannelCore<Transport> |
1670 | 0 | else { |
1671 | 0 | return nil |
1672 | 0 | } |
1673 | 0 | return try core.withUnsafeTransport(body) |
1674 | 0 | } |
1675 | | } |
1676 | | |
1677 | | /// Returns a view of operations which can be performed synchronously on this pipeline. All |
1678 | | /// operations **must** be called on the event loop. |
1679 | 10.6M | public var syncOperations: SynchronousOperations { |
1680 | 10.6M | SynchronousOperations(pipeline: self) |
1681 | 10.6M | } |
1682 | | } |
1683 | | |
1684 | | @available(*, unavailable) |
1685 | | extension ChannelPipeline.SynchronousOperations: Sendable {} |
1686 | | |
1687 | | extension ChannelPipeline { |
1688 | | /// A `Position` within the `ChannelPipeline` used to insert handlers into the `ChannelPipeline`. |
1689 | | @preconcurrency |
1690 | | public enum Position: Sendable { |
1691 | | /// The first `ChannelHandler` -- the front of the `ChannelPipeline`. |
1692 | | case first |
1693 | | |
1694 | | /// The last `ChannelHandler` -- the back of the `ChannelPipeline`. |
1695 | | case last |
1696 | | |
1697 | | /// Before the given `ChannelHandler`. |
1698 | | case before(ChannelHandler & Sendable) |
1699 | | |
1700 | | /// After the given `ChannelHandler`. |
1701 | | case after(ChannelHandler & Sendable) |
1702 | | } |
1703 | | } |
1704 | | |
1705 | | extension ChannelPipeline.SynchronousOperations { |
1706 | | /// A `Position` within the `ChannelPipeline`'s `SynchronousOperations` used to insert non-sendable handlers |
1707 | | /// into the `ChannelPipeline` at a certain position. |
1708 | | public enum Position { |
1709 | | /// The first `ChannelHandler` -- the front of the `ChannelPipeline`. |
1710 | | case first |
1711 | | |
1712 | | /// The last `ChannelHandler` -- the back of the `ChannelPipeline`. |
1713 | | case last |
1714 | | |
1715 | | /// Before the given `ChannelHandler`. |
1716 | | case before(ChannelHandler) |
1717 | | |
1718 | | /// After the given `ChannelHandler`. |
1719 | | case after(ChannelHandler) |
1720 | | |
1721 | 33.8k | public init(_ position: ChannelPipeline.Position) { |
1722 | 33.8k | switch position { |
1723 | 33.8k | case .first: |
1724 | 0 | self = .first |
1725 | 33.8k | case .last: |
1726 | 33.8k | self = .last |
1727 | 33.8k | case .before(let handler): |
1728 | 0 | self = .before(handler) |
1729 | 33.8k | case .after(let handler): |
1730 | 0 | self = .after(handler) |
1731 | 33.8k | } |
1732 | 33.8k | } |
1733 | | } |
1734 | | } |
1735 | | |
1736 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
1737 | | @available(*, unavailable) |
1738 | | extension ChannelPipeline.SynchronousOperations.Position: Sendable {} |
1739 | | |
1740 | | /// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation. |
1741 | | final class HeadChannelHandler: _ChannelOutboundHandler, Sendable { |
1742 | | |
1743 | | static let name = "head" |
1744 | | static let sharedInstance = HeadChannelHandler() |
1745 | | |
1746 | 2 | private init() {} |
1747 | | |
1748 | 6.21k | func register(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) { |
1749 | 6.21k | context.channel._channelCore.register0(promise: promise) |
1750 | 6.21k | } |
1751 | | |
1752 | 0 | func bind(context: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1753 | 0 | context.channel._channelCore.bind0(to: address, promise: promise) |
1754 | 0 | } |
1755 | | |
1756 | 0 | func connect(context: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1757 | 0 | context.channel._channelCore.connect0(to: address, promise: promise) |
1758 | 0 | } |
1759 | | |
1760 | 0 | func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) { |
1761 | 0 | context.channel._channelCore.write0(data, promise: promise) |
1762 | 0 | } |
1763 | | |
1764 | 0 | func flush(context: ChannelHandlerContext) { |
1765 | 0 | context.channel._channelCore.flush0() |
1766 | 0 | } |
1767 | | |
1768 | 6.21k | func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) { |
1769 | 6.21k | context.channel._channelCore.close0(error: mode.error, mode: mode, promise: promise) |
1770 | 6.21k | } |
1771 | | |
1772 | 0 | func read(context: ChannelHandlerContext) { |
1773 | 0 | context.channel._channelCore.read0() |
1774 | 0 | } |
1775 | | |
1776 | 0 | func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) { |
1777 | 0 | context.channel._channelCore.triggerUserOutboundEvent0(event, promise: promise) |
1778 | 0 | } |
1779 | | |
1780 | | } |
1781 | | |
1782 | | extension CloseMode { |
1783 | | /// Returns the error to fail outstanding operations writes with. |
1784 | 13.1k | fileprivate var error: any Error { |
1785 | 13.1k | switch self { |
1786 | 13.1k | case .all: |
1787 | 13.1k | return ChannelError._ioOnClosedChannel |
1788 | 13.1k | case .output: |
1789 | 0 | return ChannelError._outputClosed |
1790 | 13.1k | case .input: |
1791 | 0 | return ChannelError._inputClosed |
1792 | 13.1k | } |
1793 | 13.1k | } |
1794 | | } |
1795 | | |
1796 | | /// Special `ChannelInboundHandler` which will consume all inbound events. |
1797 | | final class TailChannelHandler: _ChannelInboundHandler, Sendable { |
1798 | | |
1799 | | static let name = "tail" |
1800 | | static let sharedInstance = TailChannelHandler() |
1801 | | |
1802 | 2 | private init() {} |
1803 | | |
1804 | 6.21k | func channelRegistered(context: ChannelHandlerContext) { |
1805 | 6.21k | // Discard |
1806 | 6.21k | } |
1807 | | |
1808 | 6.21k | func channelUnregistered(context: ChannelHandlerContext) { |
1809 | 6.21k | // Discard |
1810 | 6.21k | } |
1811 | | |
1812 | 0 | func channelActive(context: ChannelHandlerContext) { |
1813 | 0 | // Discard |
1814 | 0 | } |
1815 | | |
1816 | 6.21k | func channelInactive(context: ChannelHandlerContext) { |
1817 | 6.21k | // Discard |
1818 | 6.21k | } |
1819 | | |
1820 | 6.21k | func channelReadComplete(context: ChannelHandlerContext) { |
1821 | 6.21k | // Discard |
1822 | 6.21k | } |
1823 | | |
1824 | 0 | func channelWritabilityChanged(context: ChannelHandlerContext) { |
1825 | 0 | // Discard |
1826 | 0 | } |
1827 | | |
1828 | 0 | func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { |
1829 | 0 | // Discard |
1830 | 0 | } |
1831 | | |
1832 | 5.74k | func errorCaught(context: ChannelHandlerContext, error: Error) { |
1833 | 5.74k | context.channel._channelCore.errorCaught0(error: error) |
1834 | 5.74k | } |
1835 | | |
1836 | 985k | func channelRead(context: ChannelHandlerContext, data: NIOAny) { |
1837 | 985k | context.channel._channelCore.channelRead0(data) |
1838 | 985k | } |
1839 | | } |
1840 | | |
1841 | | /// `Error` that is used by the `ChannelPipeline` to inform the user of an error. |
1842 | | public enum ChannelPipelineError: Error { |
1843 | | /// `ChannelHandler` was already removed. |
1844 | | case alreadyRemoved |
1845 | | /// `ChannelHandler` was not found. |
1846 | | case notFound |
1847 | | } |
1848 | | |
1849 | | /// Every `ChannelHandler` has -- when added to a `ChannelPipeline` -- a corresponding `ChannelHandlerContext` which is |
1850 | | /// the way `ChannelHandler`s can interact with other `ChannelHandler`s in the pipeline. |
1851 | | /// |
1852 | | /// Most `ChannelHandler`s need to send events through the `ChannelPipeline` which they do by calling the respective |
1853 | | /// method on their `ChannelHandlerContext`. In fact all the `ChannelHandler` default implementations just forward |
1854 | | /// the event using the `ChannelHandlerContext`. |
1855 | | /// |
1856 | | /// Many events are instrumental for a `ChannelHandler`'s life-cycle and it is therefore very important to send them |
1857 | | /// at the right point in time. Often, the right behaviour is to react to an event and then forward it to the next |
1858 | | /// `ChannelHandler`. |
1859 | | public final class ChannelHandlerContext: ChannelInvoker { |
1860 | | // visible for ChannelPipeline to modify |
1861 | | fileprivate var next: Optional<ChannelHandlerContext> |
1862 | | fileprivate var prev: Optional<ChannelHandlerContext> |
1863 | | |
1864 | | public let pipeline: ChannelPipeline |
1865 | | |
1866 | 30.3M | public var channel: Channel { |
1867 | 30.3M | self.pipeline.channel |
1868 | 30.3M | } |
1869 | | |
1870 | 135k | public var handler: ChannelHandler { |
1871 | 135k | self.inboundHandler ?? self.outboundHandler! |
1872 | 135k | } |
1873 | | |
1874 | 0 | public var remoteAddress: SocketAddress? { |
1875 | 0 | do { |
1876 | 0 | // Fast-path access to the remoteAddress. |
1877 | 0 | return try self.channel._channelCore.remoteAddress0() |
1878 | 0 | } catch ChannelError.ioOnClosedChannel { |
1879 | 0 | // Channel was closed already but we may still have the address cached so try to access it via the Channel |
1880 | 0 | // so we are able to use it in channelInactive(...) / handlerRemoved(...) methods. |
1881 | 0 | return self.channel.remoteAddress |
1882 | 0 | } catch { |
1883 | 0 | return nil |
1884 | 0 | } |
1885 | 0 | } |
1886 | | |
1887 | 0 | public var localAddress: SocketAddress? { |
1888 | 0 | do { |
1889 | 0 | // Fast-path access to the localAddress. |
1890 | 0 | return try self.channel._channelCore.localAddress0() |
1891 | 0 | } catch ChannelError.ioOnClosedChannel { |
1892 | 0 | // Channel was closed already but we may still have the address cached so try to access it via the Channel |
1893 | 0 | // so we are able to use it in channelInactive(...) / handlerRemoved(...) methods. |
1894 | 0 | return self.channel.localAddress |
1895 | 0 | } catch { |
1896 | 0 | return nil |
1897 | 0 | } |
1898 | 0 | } |
1899 | | |
1900 | 232M | public var eventLoop: EventLoop { |
1901 | 232M | self.pipeline.eventLoop |
1902 | 232M | } |
1903 | | |
1904 | | public let name: String |
1905 | | private let inboundHandler: _ChannelInboundHandler? |
1906 | | private let outboundHandler: _ChannelOutboundHandler? |
1907 | 101k | private var removeHandlerInvoked = false |
1908 | 101k | private var userTriggeredRemovalStarted = false |
1909 | | |
1910 | | // Only created from within ChannelPipeline |
1911 | 39.3k | fileprivate init(name: String, handler: ChannelHandler, pipeline: ChannelPipeline) { |
1912 | 39.3k | self.name = name |
1913 | 39.3k | self.pipeline = pipeline |
1914 | 39.3k | self.inboundHandler = handler as? _ChannelInboundHandler |
1915 | 39.3k | self.outboundHandler = handler as? _ChannelOutboundHandler |
1916 | 39.3k | self.next = nil |
1917 | 39.3k | self.prev = nil |
1918 | 39.3k | precondition( |
1919 | 39.3k | self.inboundHandler != nil || self.outboundHandler != nil, |
1920 | 39.3k | "ChannelHandlers need to either be inbound or outbound" |
1921 | 39.3k | ) |
1922 | 39.3k | } |
1923 | | |
1924 | | /// Send a `channelRegistered` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`. |
1925 | | /// |
1926 | | /// - Note: For correct operation it is very important to forward any `channelRegistered` event using this method at the right point in time, that is usually when received. |
1927 | 0 | public func fireChannelRegistered() { |
1928 | 0 | self.next?.invokeChannelRegistered() |
1929 | 0 | } |
1930 | | |
1931 | | /// Send a `channelUnregistered` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`. |
1932 | | /// |
1933 | | /// - Note: For correct operation it is very important to forward any `channelUnregistered` event using this method at the right point in time, that is usually when received. |
1934 | 33.8k | public func fireChannelUnregistered() { |
1935 | 33.8k | self.next?.invokeChannelUnregistered() |
1936 | 33.8k | } |
1937 | | |
1938 | | /// Send a `channelActive` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`. |
1939 | | /// |
1940 | | /// - Note: For correct operation it is very important to forward any `channelActive` event using this method at the right point in time, that is often when received. |
1941 | 0 | public func fireChannelActive() { |
1942 | 0 | self.next?.invokeChannelActive() |
1943 | 0 | } |
1944 | | |
1945 | | /// Send a `channelInactive` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`. |
1946 | | /// |
1947 | | /// - Note: For correct operation it is very important to forward any `channelInactive` event using this method at the right point in time, that is often when received. |
1948 | 178k | public func fireChannelInactive() { |
1949 | 178k | self.next?.invokeChannelInactive() |
1950 | 178k | } |
1951 | | |
1952 | | /// Send data to the next inbound `ChannelHandler`. The data should be of type `ChannelInboundHandler.InboundOut`. |
1953 | 46.1M | public func fireChannelRead(_ data: NIOAny) { |
1954 | 46.1M | self.next?.invokeChannelRead(data) |
1955 | 46.1M | } |
1956 | | |
1957 | | /// Signal to the next `ChannelHandler` that a read burst has finished. |
1958 | 116k | public func fireChannelReadComplete() { |
1959 | 116k | self.next?.invokeChannelReadComplete() |
1960 | 116k | } |
1961 | | |
1962 | | /// Send a `writabilityChanged` event to the next (inbound) `ChannelHandler` in the `ChannelPipeline`. |
1963 | | /// |
1964 | | /// - Note: For correct operation it is very important to forward any `writabilityChanged` event using this method at the right point in time, that is usually when received. |
1965 | 0 | public func fireChannelWritabilityChanged() { |
1966 | 0 | self.next?.invokeChannelWritabilityChanged() |
1967 | 0 | } |
1968 | | |
1969 | | /// Send an error to the next inbound `ChannelHandler`. |
1970 | 247k | public func fireErrorCaught(_ error: Error) { |
1971 | 247k | self.next?.invokeErrorCaught(error) |
1972 | 247k | } |
1973 | | |
1974 | | /// Send a user event to the next inbound `ChannelHandler`. |
1975 | | /// |
1976 | | /// This method exists for compatiblity with ``ChannelInboundInvoker``. |
1977 | | @available(*, deprecated) |
1978 | | @_disfavoredOverload |
1979 | 0 | public func fireUserInboundEventTriggered(_ event: Any & Sendable) { |
1980 | 0 | self.next?.invokeUserInboundEventTriggered(event) |
1981 | 0 | } |
1982 | | |
1983 | | /// Send a user event to the next inbound `ChannelHandler` from on the event loop. |
1984 | 0 | public func fireUserInboundEventTriggered(_ event: Any) { |
1985 | 0 | self.next?.invokeUserInboundEventTriggered(event) |
1986 | 0 | } |
1987 | | |
1988 | | /// Send a `register` event to the next (outbound) `ChannelHandler` in the `ChannelPipeline`. |
1989 | | /// |
1990 | | /// - Note: For correct operation it is very important to forward any `register` event using this method at the right point in time, that is usually when received. |
1991 | 0 | public func register(promise: EventLoopPromise<Void>?) { |
1992 | 0 | if let outboundNext = self.prev { |
1993 | 0 | outboundNext.invokeRegister(promise: promise) |
1994 | 0 | } else { |
1995 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
1996 | 0 | } |
1997 | 0 | } |
1998 | | |
1999 | | /// Send a `bind` event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2000 | | /// When the `bind` event reaches the `HeadChannelHandler` a `ServerSocketChannel` will be bound. |
2001 | | /// |
2002 | | /// - Parameters: |
2003 | | /// - address: The address to bind to. |
2004 | | /// - promise: The promise fulfilled when the socket is bound or failed if it cannot be bound. |
2005 | 0 | public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
2006 | 0 | if let outboundNext = self.prev { |
2007 | 0 | outboundNext.invokeBind(to: address, promise: promise) |
2008 | 0 | } else { |
2009 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
2010 | 0 | } |
2011 | 0 | } |
2012 | | |
2013 | | /// Send a `connect` event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2014 | | /// When the `connect` event reaches the `HeadChannelHandler` a `SocketChannel` will be connected. |
2015 | | /// |
2016 | | /// - Parameters: |
2017 | | /// - address: The address to connect to. |
2018 | | /// - promise: The promise fulfilled when the socket is connected or failed if it cannot be connected. |
2019 | 0 | public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
2020 | 0 | if let outboundNext = self.prev { |
2021 | 0 | outboundNext.invokeConnect(to: address, promise: promise) |
2022 | 0 | } else { |
2023 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
2024 | 0 | } |
2025 | 0 | } |
2026 | | |
2027 | | /// Send a `write` event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2028 | | /// When the `write` event reaches the `HeadChannelHandler` the data will be enqueued to be written on the next |
2029 | | /// `flush` event. |
2030 | | /// |
2031 | | /// - Parameters: |
2032 | | /// - data: The data to write, should be of type `ChannelOutboundHandler.OutboundOut`. |
2033 | | /// - promise: The promise fulfilled when the data has been written or failed if it cannot be written. |
2034 | 0 | public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
2035 | 0 | if let outboundNext = self.prev { |
2036 | 0 | outboundNext.invokeWrite(data, promise: promise) |
2037 | 0 | } else { |
2038 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
2039 | 0 | } |
2040 | 0 | } |
2041 | | |
2042 | | /// Send a `flush` event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2043 | | /// When the `flush` event reaches the `HeadChannelHandler` the data previously enqueued will be attempted to be |
2044 | | /// written to the socket. |
2045 | 0 | public func flush() { |
2046 | 0 | if let outboundNext = self.prev { |
2047 | 0 | outboundNext.invokeFlush() |
2048 | 0 | } |
2049 | 0 | } |
2050 | | |
2051 | | /// Send a `write` event followed by a `flush` event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2052 | | /// When the `write` event reaches the `HeadChannelHandler` the data will be enqueued to be written when the `flush` |
2053 | | /// also reaches the `HeadChannelHandler`. |
2054 | | /// |
2055 | | /// - Parameters: |
2056 | | /// - data: The data to write, should be of type `ChannelOutboundHandler.OutboundOut`. |
2057 | | /// - promise: The promise fulfilled when the previously written data been written and flushed or if that failed. |
2058 | 0 | public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
2059 | 0 | if let outboundNext = self.prev { |
2060 | 0 | outboundNext.invokeWriteAndFlush(data, promise: promise) |
2061 | 0 | } else { |
2062 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
2063 | 0 | } |
2064 | 0 | } |
2065 | | |
2066 | | /// Send a `read` event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2067 | | /// When the `read` event reaches the `HeadChannelHandler` the interest to read data will be signalled to the |
2068 | | /// `Selector`. This will subsequently -- when data becomes readable -- cause `channelRead` events containing the |
2069 | | /// data being sent through the `ChannelPipeline`. |
2070 | 0 | public func read() { |
2071 | 0 | if let outboundNext = self.prev { |
2072 | 0 | outboundNext.invokeRead() |
2073 | 0 | } |
2074 | 0 | } |
2075 | | |
2076 | | /// Send a `close` event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2077 | | /// When the `close` event reaches the `HeadChannelHandler` the socket will be closed. |
2078 | | /// |
2079 | | /// - Parameters: |
2080 | | /// - mode: The `CloseMode` to use. |
2081 | | /// - promise: The promise fulfilled when the `Channel` has been closed or failed if it the closing failed. |
2082 | 0 | public func close(mode: CloseMode = .all, promise: EventLoopPromise<Void>?) { |
2083 | 0 | if let outboundNext = self.prev { |
2084 | 0 | outboundNext.invokeClose(mode: mode, promise: promise) |
2085 | 0 | } else { |
2086 | 0 | promise?.fail(ChannelError._alreadyClosed) |
2087 | 0 | } |
2088 | 0 | } |
2089 | | |
2090 | | /// Send a user event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2091 | | /// |
2092 | | /// - Parameters: |
2093 | | /// - event: The user event to send. |
2094 | | /// - promise: The promise fulfilled when the user event has been sent or failed if it couldn't be sent. |
2095 | | @available(*, deprecated) |
2096 | | @_disfavoredOverload |
2097 | 0 | public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise<Void>?) { |
2098 | 0 | self._triggerUserOutboundEvent(event, promise: promise) |
2099 | 0 | } |
2100 | | |
2101 | | /// Send a user event to the next outbound `ChannelHandler` in the `ChannelPipeline`. |
2102 | | /// |
2103 | | /// - Parameters: |
2104 | | /// - event: The user event to send. |
2105 | | /// - promise: The promise fulfilled when the user event has been sent or failed if it couldn't be sent. |
2106 | 0 | public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) { |
2107 | 0 | self._triggerUserOutboundEvent(event, promise: promise) |
2108 | 0 | } |
2109 | | |
2110 | 0 | private func _triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) { |
2111 | 0 | if let outboundNext = self.prev { |
2112 | 0 | outboundNext.invokeTriggerUserOutboundEvent(event, promise: promise) |
2113 | 0 | } else { |
2114 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
2115 | 0 | } |
2116 | 0 | } |
2117 | | |
2118 | 13.1k | fileprivate func invokeChannelRegistered() { |
2119 | 13.1k | self.eventLoop.assertInEventLoop() |
2120 | 13.1k | |
2121 | 13.1k | if let inboundHandler = self.inboundHandler { |
2122 | 13.1k | inboundHandler.channelRegistered(context: self) |
2123 | 13.1k | } else { |
2124 | 0 | self.next?.invokeChannelRegistered() |
2125 | 0 | } |
2126 | 13.1k | } |
2127 | | |
2128 | 26.2k | fileprivate func invokeChannelUnregistered() { |
2129 | 26.2k | self.eventLoop.assertInEventLoop() |
2130 | 26.2k | |
2131 | 26.2k | if let inboundHandler = self.inboundHandler { |
2132 | 26.2k | inboundHandler.channelUnregistered(context: self) |
2133 | 26.2k | } else { |
2134 | 0 | self.next?.invokeChannelUnregistered() |
2135 | 0 | } |
2136 | 26.2k | } |
2137 | | |
2138 | 0 | fileprivate func invokeChannelActive() { |
2139 | 0 | self.eventLoop.assertInEventLoop() |
2140 | 0 |
|
2141 | 0 | if let inboundHandler = self.inboundHandler { |
2142 | 0 | inboundHandler.channelActive(context: self) |
2143 | 0 | } else { |
2144 | 0 | self.next?.invokeChannelActive() |
2145 | 0 | } |
2146 | 0 | } |
2147 | | |
2148 | 26.2k | fileprivate func invokeChannelInactive() { |
2149 | 26.2k | self.eventLoop.assertInEventLoop() |
2150 | 26.2k | |
2151 | 26.2k | if let inboundHandler = self.inboundHandler { |
2152 | 26.2k | inboundHandler.channelInactive(context: self) |
2153 | 26.2k | } else { |
2154 | 0 | self.next?.invokeChannelInactive() |
2155 | 0 | } |
2156 | 26.2k | } |
2157 | | |
2158 | 2.80M | fileprivate func invokeChannelRead(_ data: NIOAny) { |
2159 | 2.80M | self.eventLoop.assertInEventLoop() |
2160 | 2.80M | |
2161 | 2.80M | if let inboundHandler = self.inboundHandler { |
2162 | 2.80M | inboundHandler.channelRead(context: self, data: data) |
2163 | 2.80M | } else { |
2164 | 0 | self.next?.invokeChannelRead(data) |
2165 | 0 | } |
2166 | 2.80M | } |
2167 | | |
2168 | 26.2k | fileprivate func invokeChannelReadComplete() { |
2169 | 26.2k | self.eventLoop.assertInEventLoop() |
2170 | 26.2k | |
2171 | 26.2k | if let inboundHandler = self.inboundHandler { |
2172 | 26.2k | inboundHandler.channelReadComplete(context: self) |
2173 | 26.2k | } else { |
2174 | 0 | self.next?.invokeChannelReadComplete() |
2175 | 0 | } |
2176 | 26.2k | } |
2177 | | |
2178 | 0 | fileprivate func invokeChannelWritabilityChanged() { |
2179 | 0 | self.eventLoop.assertInEventLoop() |
2180 | 0 |
|
2181 | 0 | if let inboundHandler = self.inboundHandler { |
2182 | 0 | inboundHandler.channelWritabilityChanged(context: self) |
2183 | 0 | } else { |
2184 | 0 | self.next?.invokeChannelWritabilityChanged() |
2185 | 0 | } |
2186 | 0 | } |
2187 | | |
2188 | 12.4k | fileprivate func invokeErrorCaught(_ error: Error) { |
2189 | 12.4k | self.eventLoop.assertInEventLoop() |
2190 | 12.4k | |
2191 | 12.4k | if let inboundHandler = self.inboundHandler { |
2192 | 12.4k | inboundHandler.errorCaught(context: self, error: error) |
2193 | 12.4k | } else { |
2194 | 0 | self.next?.invokeErrorCaught(error) |
2195 | 0 | } |
2196 | 12.4k | } |
2197 | | |
2198 | 0 | fileprivate func invokeUserInboundEventTriggered(_ event: Any) { |
2199 | 0 | self.eventLoop.assertInEventLoop() |
2200 | 0 |
|
2201 | 0 | if let inboundHandler = self.inboundHandler { |
2202 | 0 | inboundHandler.userInboundEventTriggered(context: self, event: event) |
2203 | 0 | } else { |
2204 | 0 | self.next?.invokeUserInboundEventTriggered(event) |
2205 | 0 | } |
2206 | 0 | } |
2207 | | |
2208 | 13.1k | fileprivate func invokeRegister(promise: EventLoopPromise<Void>?) { |
2209 | 13.1k | self.eventLoop.assertInEventLoop() |
2210 | 13.1k | |
2211 | 13.1k | if let outboundHandler = self.outboundHandler { |
2212 | 13.1k | outboundHandler.register(context: self, promise: promise) |
2213 | 13.1k | } else { |
2214 | 0 | self.prev?.invokeRegister(promise: promise) |
2215 | 0 | } |
2216 | 13.1k | } |
2217 | | |
2218 | 0 | fileprivate func invokeBind(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
2219 | 0 | self.eventLoop.assertInEventLoop() |
2220 | 0 |
|
2221 | 0 | if let outboundHandler = self.outboundHandler { |
2222 | 0 | outboundHandler.bind(context: self, to: address, promise: promise) |
2223 | 0 | } else { |
2224 | 0 | self.prev?.invokeBind(to: address, promise: promise) |
2225 | 0 | } |
2226 | 0 | } |
2227 | | |
2228 | 0 | fileprivate func invokeConnect(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
2229 | 0 | self.eventLoop.assertInEventLoop() |
2230 | 0 |
|
2231 | 0 | if let outboundHandler = self.outboundHandler { |
2232 | 0 | outboundHandler.connect(context: self, to: address, promise: promise) |
2233 | 0 | } else { |
2234 | 0 | self.prev?.invokeConnect(to: address, promise: promise) |
2235 | 0 | } |
2236 | 0 | } |
2237 | | |
2238 | 0 | fileprivate func invokeWrite(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
2239 | 0 | self.eventLoop.assertInEventLoop() |
2240 | 0 |
|
2241 | 0 | if let outboundHandler = self.outboundHandler { |
2242 | 0 | outboundHandler.write(context: self, data: data, promise: promise) |
2243 | 0 | } else { |
2244 | 0 | self.prev?.invokeWrite(data, promise: promise) |
2245 | 0 | } |
2246 | 0 | } |
2247 | | |
2248 | 0 | fileprivate func invokeFlush() { |
2249 | 0 | self.eventLoop.assertInEventLoop() |
2250 | 0 |
|
2251 | 0 | if let outboundHandler = self.outboundHandler { |
2252 | 0 | outboundHandler.flush(context: self) |
2253 | 0 | } else { |
2254 | 0 | self.prev?.invokeFlush() |
2255 | 0 | } |
2256 | 0 | } |
2257 | | |
2258 | 0 | fileprivate func invokeWriteAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
2259 | 0 | self.eventLoop.assertInEventLoop() |
2260 | 0 |
|
2261 | 0 | if let outboundHandler = self.outboundHandler { |
2262 | 0 | outboundHandler.write(context: self, data: data, promise: promise) |
2263 | 0 | outboundHandler.flush(context: self) |
2264 | 0 | } else { |
2265 | 0 | self.prev?.invokeWriteAndFlush(data, promise: promise) |
2266 | 0 | } |
2267 | 0 | } |
2268 | | |
2269 | 0 | fileprivate func invokeRead() { |
2270 | 0 | self.eventLoop.assertInEventLoop() |
2271 | 0 |
|
2272 | 0 | if let outboundHandler = self.outboundHandler { |
2273 | 0 | outboundHandler.read(context: self) |
2274 | 0 | } else { |
2275 | 0 | self.prev?.invokeRead() |
2276 | 0 | } |
2277 | 0 | } |
2278 | | |
2279 | 26.2k | fileprivate func invokeClose(mode: CloseMode, promise: EventLoopPromise<Void>?) { |
2280 | 26.2k | self.eventLoop.assertInEventLoop() |
2281 | 26.2k | |
2282 | 26.2k | if let outboundHandler = self.outboundHandler { |
2283 | 13.1k | outboundHandler.close(context: self, mode: mode, promise: promise) |
2284 | 13.1k | } else { |
2285 | 13.1k | self.prev?.invokeClose(mode: mode, promise: promise) |
2286 | 13.1k | } |
2287 | 26.2k | } |
2288 | | |
2289 | 0 | fileprivate func invokeTriggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) { |
2290 | 0 | self.eventLoop.assertInEventLoop() |
2291 | 0 |
|
2292 | 0 | if let outboundHandler = self.outboundHandler { |
2293 | 0 | outboundHandler.triggerUserOutboundEvent(context: self, event: event, promise: promise) |
2294 | 0 | } else { |
2295 | 0 | self.prev?.invokeTriggerUserOutboundEvent(event, promise: promise) |
2296 | 0 | } |
2297 | 0 | } |
2298 | | |
2299 | 13.1k | fileprivate func invokeHandlerAdded() { |
2300 | 13.1k | self.eventLoop.assertInEventLoop() |
2301 | 13.1k | |
2302 | 13.1k | handler.handlerAdded(context: self) |
2303 | 13.1k | } |
2304 | | |
2305 | 39.3k | fileprivate func invokeHandlerRemoved() { |
2306 | 39.3k | self.eventLoop.assertInEventLoop() |
2307 | 39.3k | guard !self.removeHandlerInvoked else { |
2308 | 0 | return |
2309 | 39.3k | } |
2310 | 39.3k | self.removeHandlerInvoked = true |
2311 | 39.3k | |
2312 | 39.3k | handler.handlerRemoved(context: self) |
2313 | 39.3k | } |
2314 | | } |
2315 | | |
2316 | | // This extension "un-deprecates" some parts of the ChannelInvoker API for |
2317 | | // ChannelHandlerContext specifically. These methods were not sound elsewhere, |
2318 | | // but they're fine here. |
2319 | | extension ChannelHandlerContext { |
2320 | | /// Write data to the remote peer. |
2321 | | /// |
2322 | | /// Be aware that to be sure that data is really written to the remote peer you need to call `flush` or use `writeAndFlush`. |
2323 | | /// Calling `write` multiple times and then `flush` may allow the `Channel` to `write` multiple data objects to the remote peer with one syscall. |
2324 | | /// |
2325 | | /// - Parameters: |
2326 | | /// - data: the data to write |
2327 | | /// - file: The file this function was called in, for debugging purposes. |
2328 | | /// - line: The line this function was called on, for debugging purposes. |
2329 | | /// - Returns: the future which will be notified once the operation completes. |
2330 | 0 | public func write(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<Void> { |
2331 | 0 | let promise = self.eventLoop.makePromise(of: Void.self, file: file, line: line) |
2332 | 0 | self.write(data, promise: promise) |
2333 | 0 | return promise.futureResult |
2334 | 0 | } |
2335 | | |
2336 | | /// Shortcut for calling `write` and `flush`. |
2337 | | /// |
2338 | | /// - Parameters: |
2339 | | /// - data: the data to write |
2340 | | /// - file: The file this function was called in, for debugging purposes. |
2341 | | /// - line: The line this function was called on, for debugging purposes. |
2342 | | /// - Returns: the future which will be notified once the `write` operation completes. |
2343 | | public func writeAndFlush(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<Void> |
2344 | 0 | { |
2345 | 0 | let promise = self.eventLoop.makePromise(of: Void.self, file: file, line: line) |
2346 | 0 | self.writeAndFlush(data, promise: promise) |
2347 | 0 | return promise.futureResult |
2348 | 0 | } |
2349 | | |
2350 | | /// Returns this `ChannelHandlerContext` as a `NIOLoopBound`, bound to `self.eventLoop`. |
2351 | | /// |
2352 | | /// This is a shorthand for `NIOLoopBound(self, eventLoop: self.eventLoop)`. |
2353 | | /// |
2354 | | /// Being able to capture `ChannelHandlerContext`s in `EventLoopFuture` callbacks is important in SwiftNIO programs. |
2355 | | /// Of course, this is not always safe because the `EventLoopFuture` callbacks may run on other threads. SwiftNIO |
2356 | | /// programmers therefore always had to manually arrange for those callbacks to run on the correct `EventLoop` |
2357 | | /// (i.e. `context.eventLoop`) which then made that construction safe. |
2358 | | /// |
2359 | | /// Newer Swift versions contain a static feature to automatically detect data races which of course can't detect |
2360 | | /// the only _dynamically_ ``EventLoop`` a ``EventLoopFuture`` callback is running on. ``NIOLoopBound`` can be used |
2361 | | /// to prove to the compiler that this is safe and in case it is not, ``NIOLoopBound`` will trap at runtime. This is |
2362 | | /// therefore dynamically enforce the correct behaviour. |
2363 | 0 | public var loopBound: NIOLoopBound<ChannelHandlerContext> { |
2364 | 0 | NIOLoopBound(self, eventLoop: self.eventLoop) |
2365 | 0 | } |
2366 | | } |
2367 | | |
2368 | | @available(*, unavailable) |
2369 | | extension ChannelHandlerContext: Sendable {} |
2370 | | |
2371 | | extension ChannelHandlerContext { |
2372 | | /// A `RemovalToken` is handed to a `RemovableChannelHandler` when its `removeHandler` function is invoked. A |
2373 | | /// `RemovableChannelHandler` is then required to remove itself from the `ChannelPipeline`. The removal process |
2374 | | /// is finalized by handing the `RemovalToken` to the `ChannelHandlerContext.leavePipeline` function. |
2375 | | public struct RemovalToken: Sendable { |
2376 | | internal let promise: EventLoopPromise<Void>? |
2377 | | } |
2378 | | |
2379 | | /// Synchronously remove the `ChannelHandler` with the given `ChannelHandlerContext`. |
2380 | | /// |
2381 | | /// - Note: This function must only be used from a `RemovableChannelHandler` to remove itself. Calling this method |
2382 | | /// on any other `ChannelHandlerContext` leads to undefined behaviour. |
2383 | | /// |
2384 | | /// - Parameters: |
2385 | | /// - removalToken: The removal token received from `RemovableChannelHandler.removeHandler` |
2386 | 0 | public func leavePipeline(removalToken: RemovalToken) { |
2387 | 0 | self.eventLoop.preconditionInEventLoop() |
2388 | 0 | self.pipeline.removeHandlerFromPipeline(context: self, promise: removalToken.promise) |
2389 | 0 | } |
2390 | | |
2391 | 0 | internal func startUserTriggeredRemoval(promise: EventLoopPromise<Void>?) { |
2392 | 0 | self.eventLoop.assertInEventLoop() |
2393 | 0 | guard !self.userTriggeredRemovalStarted else { |
2394 | 0 | promise?.fail(NIOAttemptedToRemoveHandlerMultipleTimesError()) |
2395 | 0 | return |
2396 | 0 | } |
2397 | 0 | self.userTriggeredRemovalStarted = true |
2398 | 0 | (self.handler as! RemovableChannelHandler).removeHandler( |
2399 | 0 | context: self, |
2400 | 0 | removalToken: .init(promise: promise) |
2401 | 0 | ) |
2402 | 0 | } |
2403 | | } |
2404 | | |
2405 | | extension ChannelHandlerContext { |
2406 | 0 | var sendableView: SendableView { |
2407 | 0 | SendableView(wrapping: self) |
2408 | 0 | } |
2409 | | |
2410 | | /// A wrapper over ``ChannelHandlerContext`` that allows access to the thread-safe API |
2411 | | /// surface on the type. |
2412 | | /// |
2413 | | /// Very little of ``ChannelHandlerContext`` is thread-safe, but in a rare few places |
2414 | | /// there are things we can access. This type makes those available. |
2415 | | struct SendableView: @unchecked Sendable { |
2416 | | private let context: ChannelHandlerContext |
2417 | | |
2418 | 0 | fileprivate init(wrapping context: ChannelHandlerContext) { |
2419 | 0 | self.context = context |
2420 | 0 | } |
2421 | | |
2422 | | /// Whether the ``ChannelHandler`` associated with this context conforms to |
2423 | | /// ``RemovableChannelHandler``. |
2424 | 0 | var channelHandlerIsRemovable: Bool { |
2425 | 0 | // `context.handler` is not mutable, and set at construction, so this access is |
2426 | 0 | // acceptable. The protocol conformance check is also safe. |
2427 | 0 | self.context.handler is RemovableChannelHandler |
2428 | 0 | } |
2429 | | |
2430 | | /// Grabs the underlying ``ChannelHandlerContext``. May only be called on the |
2431 | | /// event loop. |
2432 | 0 | var wrappedValue: ChannelHandlerContext { |
2433 | 0 | // The event loop lookup here is also thread-safe, so we can grab the value out |
2434 | 0 | // and use it. |
2435 | 0 | self.context.eventLoop.preconditionInEventLoop() |
2436 | 0 | return self.context |
2437 | 0 | } |
2438 | | } |
2439 | | } |
2440 | | |
2441 | | extension ChannelPipeline: CustomDebugStringConvertible { |
2442 | 0 | public var debugDescription: String { |
2443 | 0 | // This method forms output in the following format: |
2444 | 0 | // |
2445 | 0 | // ChannelPipeline[0x0000000000000000]: |
2446 | 0 | // [I] ↓↑ [O] |
2447 | 0 | // <incoming handler type> ↓↑ [<name>] |
2448 | 0 | // ↓↑ <outgoing handler type> [<name>] |
2449 | 0 | // <duplex handler type> ↓↑ <duplex handler type> [<name>] |
2450 | 0 | // |
2451 | 0 | var desc = ["ChannelPipeline[\(ObjectIdentifier(self))]:"] |
2452 | 0 | let debugInfos = self.collectHandlerDebugInfos() |
2453 | 0 | let maxIncomingTypeNameCount = |
2454 | 0 | debugInfos.filter { $0.isIncoming } |
2455 | 0 | .map { $0.typeName.count } |
2456 | 0 | .max() ?? 0 |
2457 | 0 | let maxOutgoingTypeNameCount = |
2458 | 0 | debugInfos.filter { $0.isOutgoing } |
2459 | 0 | .map { $0.typeName.count } |
2460 | 0 | .max() ?? 0 |
2461 | 0 |
|
2462 | 0 | func whitespace(count: Int) -> String { |
2463 | 0 | String(repeating: " ", count: count) |
2464 | 0 | } |
2465 | 0 |
|
2466 | 0 | if debugInfos.isEmpty { |
2467 | 0 | desc.append(" <no handlers>") |
2468 | 0 | } else { |
2469 | 0 | desc.append(whitespace(count: maxIncomingTypeNameCount - 2) + "[I] ↓↑ [O]") |
2470 | 0 | for debugInfo in debugInfos { |
2471 | 0 | var line = [String]() |
2472 | 0 | line.append(" ") |
2473 | 0 | if debugInfo.isIncoming { |
2474 | 0 | line.append(whitespace(count: maxIncomingTypeNameCount - debugInfo.typeName.count)) |
2475 | 0 | line.append(debugInfo.typeName) |
2476 | 0 | } else { |
2477 | 0 | line.append(whitespace(count: maxIncomingTypeNameCount)) |
2478 | 0 | } |
2479 | 0 | line.append(" ↓↑ ") |
2480 | 0 | if debugInfo.isOutgoing { |
2481 | 0 | line.append(debugInfo.typeName) |
2482 | 0 | line.append(whitespace(count: maxOutgoingTypeNameCount - debugInfo.typeName.count)) |
2483 | 0 | } else { |
2484 | 0 | line.append(whitespace(count: maxOutgoingTypeNameCount)) |
2485 | 0 | } |
2486 | 0 | line.append(" ") |
2487 | 0 | line.append("[\(debugInfo.name)]") |
2488 | 0 | desc.append(line.joined()) |
2489 | 0 | } |
2490 | 0 | } |
2491 | 0 |
|
2492 | 0 | return desc.joined(separator: "\n") |
2493 | 0 | } |
2494 | | |
2495 | | /// Returns the first `ChannelHandler` of the given type. |
2496 | | /// |
2497 | | /// - Parameters: |
2498 | | /// - type: the type of `ChannelHandler` to return. |
2499 | | @inlinable |
2500 | | @preconcurrency |
2501 | | public func handler<Handler: ChannelHandler & _NIOCoreSendableMetatype>( |
2502 | | type _: Handler.Type |
2503 | 0 | ) -> EventLoopFuture<Handler> { |
2504 | 0 | self.context(handlerType: Handler.self).map { context in |
2505 | 0 | guard let typedContext = context.handler as? Handler else { |
2506 | 0 | preconditionFailure( |
2507 | 0 | "Expected channel handler of type \(Handler.self), got \(type(of: context.handler)) instead." |
2508 | 0 | ) |
2509 | 0 | } |
2510 | 0 |
|
2511 | 0 | return typedContext |
2512 | 0 | } |
2513 | 0 | } |
2514 | | |
2515 | | /// Synchronously finds and returns the first `ChannelHandler` of the given type. |
2516 | | /// |
2517 | | /// - Important: This must be called on the `EventLoop`. |
2518 | | /// - Parameters: |
2519 | | /// - type: the type of `ChannelHandler` to return. |
2520 | | @inlinable // should be fileprivate |
2521 | 0 | internal func _handlerSync<Handler: ChannelHandler>(type _: Handler.Type) -> Result<Handler, Error> { |
2522 | 0 | self._contextSync(handlerType: Handler.self).map { context in |
2523 | 0 | guard let typedContext = context.handler as? Handler else { |
2524 | 0 | preconditionFailure( |
2525 | 0 | "Expected channel handler of type \(Handler.self), got \(type(of: context.handler)) instead." |
2526 | 0 | ) |
2527 | 0 | } |
2528 | 0 | return typedContext |
2529 | 0 | } |
2530 | 0 | } |
2531 | | |
2532 | | private struct ChannelHandlerDebugInfo { |
2533 | | let handler: ChannelHandler |
2534 | | let name: String |
2535 | 0 | var isIncoming: Bool { |
2536 | 0 | self.handler is _ChannelInboundHandler |
2537 | 0 | } |
2538 | 0 | var isOutgoing: Bool { |
2539 | 0 | self.handler is _ChannelOutboundHandler |
2540 | 0 | } |
2541 | 0 | var typeName: String { |
2542 | 0 | "\(type(of: self.handler))" |
2543 | 0 | } |
2544 | | } |
2545 | | |
2546 | 0 | private func collectHandlerDebugInfos() -> [ChannelHandlerDebugInfo] { |
2547 | 0 | var handlers = [ChannelHandlerDebugInfo]() |
2548 | 0 | var node = self.head?.next |
2549 | 0 | while let context = node, context !== self.tail { |
2550 | 0 | handlers.append(.init(handler: context.handler, name: context.name)) |
2551 | 0 | node = context.next |
2552 | 0 | } |
2553 | 0 | return handlers |
2554 | 0 | } |
2555 | | } |
2556 | | |
2557 | | extension ChannelPipeline { |
2558 | | private enum BufferingDirection: Equatable { |
2559 | | case inbound |
2560 | | case outbound |
2561 | | } |
2562 | | |
2563 | | /// Retrieve the total number of bytes buffered for outbound. |
2564 | 0 | public func outboundBufferedBytes() -> EventLoopFuture<Int> { |
2565 | 0 | let future: EventLoopFuture<Int> |
2566 | 0 |
|
2567 | 0 | if self.eventLoop.inEventLoop { |
2568 | 0 | future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .outbound)) |
2569 | 0 | } else { |
2570 | 0 | future = self.eventLoop.submit { |
2571 | 0 | self.countAllBufferedBytes(direction: .outbound) |
2572 | 0 | } |
2573 | 0 | } |
2574 | 0 |
|
2575 | 0 | return future |
2576 | 0 | } |
2577 | | |
2578 | | /// Retrieve the total number of bytes buffered for inbound. |
2579 | 0 | public func inboundBufferedBytes() -> EventLoopFuture<Int> { |
2580 | 0 | let future: EventLoopFuture<Int> |
2581 | 0 |
|
2582 | 0 | if self.eventLoop.inEventLoop { |
2583 | 0 | future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .inbound)) |
2584 | 0 | } else { |
2585 | 0 | future = self.eventLoop.submit { |
2586 | 0 | self.countAllBufferedBytes(direction: .inbound) |
2587 | 0 | } |
2588 | 0 | } |
2589 | 0 |
|
2590 | 0 | return future |
2591 | 0 | } |
2592 | | |
2593 | 0 | private static func countBufferedBytes(context: ChannelHandlerContext, direction: BufferingDirection) -> Int? { |
2594 | 0 | switch direction { |
2595 | 0 | case .inbound: |
2596 | 0 | guard let handler = context.handler as? NIOInboundByteBufferingChannelHandler else { |
2597 | 0 | return nil |
2598 | 0 | } |
2599 | 0 | return handler.inboundBufferedBytes |
2600 | 0 | case .outbound: |
2601 | 0 | guard let handler = context.handler as? NIOOutboundByteBufferingChannelHandler else { |
2602 | 0 | return nil |
2603 | 0 | } |
2604 | 0 | return handler.outboundBufferedBytes |
2605 | 0 | } |
2606 | 0 |
|
2607 | 0 | } |
2608 | | |
2609 | 0 | private func countAllBufferedBytes(direction: BufferingDirection) -> Int { |
2610 | 0 | self.eventLoop.assertInEventLoop() |
2611 | 0 | var total = 0 |
2612 | 0 | var current = self.head?.next |
2613 | 0 | switch direction { |
2614 | 0 | case .inbound: |
2615 | 0 | while let c = current, c !== self.tail { |
2616 | 0 | if let inboundHandler = c.handler as? NIOInboundByteBufferingChannelHandler { |
2617 | 0 | total += inboundHandler.inboundBufferedBytes |
2618 | 0 | } |
2619 | 0 | current = current?.next |
2620 | 0 | } |
2621 | 0 | case .outbound: |
2622 | 0 | while let c = current, c !== self.tail { |
2623 | 0 | if let outboundHandler = c.handler as? NIOOutboundByteBufferingChannelHandler { |
2624 | 0 | total += outboundHandler.outboundBufferedBytes |
2625 | 0 | } |
2626 | 0 | current = current?.next |
2627 | 0 | } |
2628 | 0 | } |
2629 | 0 |
|
2630 | 0 | return total |
2631 | 0 | } |
2632 | | } |
2633 | | |
2634 | | extension ChannelPipeline.SynchronousOperations { |
2635 | | /// Retrieve the total number of bytes buffered for outbound. |
2636 | | /// |
2637 | | /// - Important: This *must* be called on the event loop. |
2638 | 0 | public func outboundBufferedBytes() -> Int { |
2639 | 0 | self.eventLoop.assertInEventLoop() |
2640 | 0 | return self._pipeline.countAllBufferedBytes(direction: .outbound) |
2641 | 0 | } |
2642 | | |
2643 | | /// Retrieve the number of outbound bytes buffered in the `ChannelHandler` associated with the given`ChannelHandlerContext`. |
2644 | | /// |
2645 | | /// - Parameters: |
2646 | | /// - context: the `ChannelHandlerContext` from which the outbound buffered bytes of the `ChannelHandler` will be retrieved. |
2647 | | /// - Important: This *must* be called on the event loop. |
2648 | | /// |
2649 | | /// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`. |
2650 | | /// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to |
2651 | | /// `NIOOutboundByteBufferingChannelHandler`, this method will return `nil`. |
2652 | 0 | public func outboundBufferedBytes(in context: ChannelHandlerContext) -> Int? { |
2653 | 0 | self.eventLoop.assertInEventLoop() |
2654 | 0 | return ChannelPipeline.countBufferedBytes(context: context, direction: .outbound) |
2655 | 0 | } |
2656 | | |
2657 | | /// Retrieve total number of bytes buffered for inbound. |
2658 | | /// |
2659 | | /// - Important: This *must* be called on the event loop. |
2660 | 0 | public func inboundBufferedBytes() -> Int { |
2661 | 0 | self.eventLoop.assertInEventLoop() |
2662 | 0 | return self._pipeline.countAllBufferedBytes(direction: .inbound) |
2663 | 0 | } |
2664 | | |
2665 | | /// Retrieve the number of inbound bytes buffered in the `ChannelHandler` associated with the given `ChannelHandlerContext`. |
2666 | | /// |
2667 | | /// - Parameters: |
2668 | | /// - context: the `ChannelHandlerContext` from which the inbound buffered bytes of the `handler` will be retrieved. |
2669 | | /// - Important: This *must* be called on the event loop. |
2670 | | /// |
2671 | | /// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`. |
2672 | | /// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to |
2673 | | /// `NIOInboundByteBufferingChannelHandler`, this method will return `nil`. |
2674 | 0 | public func inboundBufferedBytes(in context: ChannelHandlerContext) -> Int? { |
2675 | 0 | self.eventLoop.assertInEventLoop() |
2676 | 0 | return ChannelPipeline.countBufferedBytes(context: context, direction: .inbound) |
2677 | 0 | } |
2678 | | } |