Coverage Report

Created: 2025-07-23 06:54

/src/swift-nio/Sources/NIOCore/ChannelHandlers.swift
Line
Count
Source (jump to first uncovered line)
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
//  Contains ChannelHandler implementations which are generic and can be re-used easily.
15
//
16
//
17
18
/// A `ChannelHandler` that implements a backoff for a `ServerChannel` when accept produces an `IOError`.
19
/// These errors are often recoverable by reducing the rate at which we call accept.
20
public final class AcceptBackoffHandler: ChannelDuplexHandler, RemovableChannelHandler {
21
    public typealias InboundIn = Channel
22
    public typealias OutboundIn = Channel
23
24
    private var nextReadDeadlineNS: Optional<NIODeadline>
25
    private let backoffProvider: (IOError) -> TimeAmount?
26
    private let shouldForwardIOErrorCaught: Bool
27
    private var scheduledRead: Optional<Scheduled<Void>>
28
29
    /// Default implementation used as `backoffProvider` which delays accept by 1 second.
30
0
    public static func defaultBackoffProvider(error: IOError) -> TimeAmount? {
31
0
        .seconds(1)
32
0
    }
33
34
    /// Create a new instance
35
    ///
36
    /// - Parameters:
37
    ///   - backoffProvider: returns a `TimeAmount` which will be the amount of time to wait before attempting another `read`.
38
0
    public init(backoffProvider: @escaping (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider) {
39
0
        self.backoffProvider = backoffProvider
40
0
        self.nextReadDeadlineNS = nil
41
0
        self.scheduledRead = nil
42
0
        self.shouldForwardIOErrorCaught = true
43
0
    }
44
45
    /// Create a new instance
46
    ///
47
    /// - Parameters:
48
    ///   - shouldForwardIOErrorCaught: A boolean indicating if a caught IOError should be forwarded.
49
    ///   - backoffProvider: returns a `TimeAmount` which will be the amount of time to wait before attempting another `read`.
50
    public init(
51
        shouldForwardIOErrorCaught: Bool,
52
0
        backoffProvider: @escaping (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider
53
0
    ) {
54
0
        self.backoffProvider = backoffProvider
55
0
        self.nextReadDeadlineNS = nil
56
0
        self.scheduledRead = nil
57
0
        self.shouldForwardIOErrorCaught = shouldForwardIOErrorCaught
58
0
    }
59
60
0
    public func read(context: ChannelHandlerContext) {
61
0
        // If we already have a read scheduled there is no need to schedule another one.
62
0
        guard scheduledRead == nil else { return }
63
0
64
0
        if let deadline = self.nextReadDeadlineNS {
65
0
            let now = NIODeadline.now()
66
0
            if now >= deadline {
67
0
                // The backoff already expired, just do a read.
68
0
                doRead(context)
69
0
            } else {
70
0
                // Schedule the read to be executed after the backoff time elapsed.
71
0
                scheduleRead(at: deadline, context: context)
72
0
            }
73
0
        } else {
74
0
            context.read()
75
0
        }
76
0
    }
77
78
0
    public func errorCaught(context: ChannelHandlerContext, error: Error) {
79
0
        guard let ioError = error as? IOError else {
80
0
            context.fireErrorCaught(error)
81
0
            return
82
0
        }
83
0
84
0
        if let amount = backoffProvider(ioError) {
85
0
            self.nextReadDeadlineNS = .now() + amount
86
0
            if let scheduled = self.scheduledRead {
87
0
                scheduled.cancel()
88
0
                scheduleRead(at: self.nextReadDeadlineNS!, context: context)
89
0
            }
90
0
        }
91
0
92
0
        if self.shouldForwardIOErrorCaught {
93
0
            context.fireErrorCaught(error)
94
0
        }
95
0
    }
96
97
0
    public func channelInactive(context: ChannelHandlerContext) {
98
0
        if let scheduled = self.scheduledRead {
99
0
            scheduled.cancel()
100
0
            self.scheduledRead = nil
101
0
        }
102
0
        self.nextReadDeadlineNS = nil
103
0
        context.fireChannelInactive()
104
0
    }
105
106
0
    public func handlerRemoved(context: ChannelHandlerContext) {
107
0
        if let scheduled = self.scheduledRead {
108
0
            // Cancel the previous scheduled read and trigger a read directly. This is needed as otherwise we may never read again.
109
0
            scheduled.cancel()
110
0
            self.scheduledRead = nil
111
0
            context.read()
112
0
        }
113
0
        self.nextReadDeadlineNS = nil
114
0
    }
115
116
0
    private func scheduleRead(at: NIODeadline, context: ChannelHandlerContext) {
117
0
        self.scheduledRead = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(deadline: at) {
118
0
            self.doRead(context)
119
0
        }
120
0
    }
121
122
0
    private func doRead(_ context: ChannelHandlerContext) {
123
0
        // Reset the backoff time and read.
124
0
        self.nextReadDeadlineNS = nil
125
0
        self.scheduledRead = nil
126
0
        context.read()
127
0
    }
128
}
129
130
@available(*, unavailable)
131
extension AcceptBackoffHandler: Sendable {}
132
133
/// ChannelHandler implementation which enforces back-pressure by stopping to read from the remote peer when it cannot write back fast enough.
134
/// It will start reading again once pending data was written.
135
public final class BackPressureHandler: ChannelDuplexHandler, RemovableChannelHandler {
136
    public typealias OutboundIn = NIOAny
137
    public typealias InboundIn = ByteBuffer
138
    public typealias InboundOut = ByteBuffer
139
    public typealias OutboundOut = ByteBuffer
140
141
0
    private var pendingRead = false
142
0
    private var writable: Bool = true
143
144
0
    public init() {}
145
146
0
    public func read(context: ChannelHandlerContext) {
147
0
        if writable {
148
0
            context.read()
149
0
        } else {
150
0
            pendingRead = true
151
0
        }
152
0
    }
153
154
0
    public func channelWritabilityChanged(context: ChannelHandlerContext) {
155
0
        self.writable = context.channel.isWritable
156
0
        if writable {
157
0
            mayRead(context: context)
158
0
        } else {
159
0
            context.flush()
160
0
        }
161
0
162
0
        // Propagate the event as the user may still want to do something based on it.
163
0
        context.fireChannelWritabilityChanged()
164
0
    }
165
166
0
    public func handlerRemoved(context: ChannelHandlerContext) {
167
0
        mayRead(context: context)
168
0
    }
169
170
0
    private func mayRead(context: ChannelHandlerContext) {
171
0
        if pendingRead {
172
0
            pendingRead = false
173
0
            context.read()
174
0
        }
175
0
    }
176
}
177
178
@available(*, unavailable)
179
extension BackPressureHandler: Sendable {}
180
181
/// Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
182
public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandler {
183
    public typealias InboundIn = NIOAny
184
    public typealias InboundOut = NIOAny
185
    public typealias OutboundIn = NIOAny
186
    public typealias OutboundOut = NIOAny
187
188
    ///A user event triggered by IdleStateHandler when a Channel is idle.
189
    public enum IdleStateEvent: Sendable {
190
        /// Will be triggered when no write was performed for the specified amount of time
191
        case write
192
        /// Will be triggered when no read was performed for the specified amount of time
193
        case read
194
        /// Will be triggered when neither read nor write was performed for the specified amount of time
195
        case all
196
    }
197
198
    public let readTimeout: TimeAmount?
199
    public let writeTimeout: TimeAmount?
200
    public let allTimeout: TimeAmount?
201
202
0
    private var reading = false
203
0
    private var lastReadTime: NIODeadline = .distantPast
204
0
    private var lastWriteCompleteTime: NIODeadline = .distantPast
205
    private var scheduledReaderTask: Optional<Scheduled<Void>>
206
    private var scheduledWriterTask: Optional<Scheduled<Void>>
207
    private var scheduledAllTask: Optional<Scheduled<Void>>
208
209
0
    public init(readTimeout: TimeAmount? = nil, writeTimeout: TimeAmount? = nil, allTimeout: TimeAmount? = nil) {
210
0
        self.readTimeout = readTimeout
211
0
        self.writeTimeout = writeTimeout
212
0
        self.allTimeout = allTimeout
213
0
        self.scheduledAllTask = nil
214
0
        self.scheduledReaderTask = nil
215
0
        self.scheduledWriterTask = nil
216
0
    }
217
218
0
    public func handlerAdded(context: ChannelHandlerContext) {
219
0
        if context.channel.isActive {
220
0
            initIdleTasks(context)
221
0
        }
222
0
    }
223
224
0
    public func handlerRemoved(context: ChannelHandlerContext) {
225
0
        cancelIdleTasks(context)
226
0
    }
227
228
0
    public func channelActive(context: ChannelHandlerContext) {
229
0
        initIdleTasks(context)
230
0
        context.fireChannelActive()
231
0
    }
232
233
0
    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
234
0
        if readTimeout != nil || allTimeout != nil {
235
0
            reading = true
236
0
        }
237
0
        context.fireChannelRead(data)
238
0
    }
239
240
0
    public func channelReadComplete(context: ChannelHandlerContext) {
241
0
        if (readTimeout != nil || allTimeout != nil) && reading {
242
0
            lastReadTime = .now()
243
0
            reading = false
244
0
        }
245
0
        context.fireChannelReadComplete()
246
0
    }
247
248
0
    public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
249
0
        if writeTimeout == nil && allTimeout == nil {
250
0
            context.write(data, promise: promise)
251
0
            return
252
0
        }
253
0
254
0
        let writePromise = promise ?? context.eventLoop.makePromise()
255
0
        writePromise.futureResult.hop(
256
0
            to: context.eventLoop
257
0
        ).assumeIsolatedUnsafeUnchecked().whenComplete { (_: Result<Void, Error>) in
258
0
            self.lastWriteCompleteTime = .now()
259
0
        }
260
0
        context.write(data, promise: writePromise)
261
0
    }
262
263
0
    private func shouldReschedule(_ context: ChannelHandlerContext) -> Bool {
264
0
        if context.channel.isActive {
265
0
            return true
266
0
        }
267
0
        return false
268
0
    }
269
270
0
    private func makeReadTimeoutTask(_ context: ChannelHandlerContext, _ timeout: TimeAmount) -> (() -> Void) {
271
0
        {
272
0
            guard self.shouldReschedule(context) else {
273
0
                return
274
0
            }
275
0
276
0
            if self.reading {
277
0
                self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
278
0
                    in: timeout,
279
0
                    self.makeReadTimeoutTask(context, timeout)
280
0
                )
281
0
                return
282
0
            }
283
0
284
0
            let diff = .now() - self.lastReadTime
285
0
            if diff >= timeout {
286
0
                // Reader is idle - set a new timeout and trigger an event through the pipeline
287
0
                self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
288
0
                    in: timeout,
289
0
                    self.makeReadTimeoutTask(context, timeout)
290
0
                )
291
0
292
0
                context.fireUserInboundEventTriggered(IdleStateEvent.read)
293
0
            } else {
294
0
                // Read occurred before the timeout - set a new timeout with shorter delay.
295
0
                self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
296
0
                    deadline: self.lastReadTime + timeout,
297
0
                    self.makeReadTimeoutTask(context, timeout)
298
0
                )
299
0
            }
300
0
        }
301
0
    }
302
303
0
    private func makeWriteTimeoutTask(_ context: ChannelHandlerContext, _ timeout: TimeAmount) -> (() -> Void) {
304
0
        {
305
0
            guard self.shouldReschedule(context) else {
306
0
                return
307
0
            }
308
0
309
0
            let lastWriteTime = self.lastWriteCompleteTime
310
0
            let diff = .now() - lastWriteTime
311
0
312
0
            if diff >= timeout {
313
0
                // Writer is idle - set a new timeout and notify the callback.
314
0
                self.scheduledWriterTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
315
0
                    in: timeout,
316
0
                    self.makeWriteTimeoutTask(context, timeout)
317
0
                )
318
0
319
0
                context.fireUserInboundEventTriggered(IdleStateEvent.write)
320
0
            } else {
321
0
                // Write occurred before the timeout - set a new timeout with shorter delay.
322
0
                self.scheduledWriterTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
323
0
                    deadline: self.lastWriteCompleteTime + timeout,
324
0
                    self.makeWriteTimeoutTask(context, timeout)
325
0
                )
326
0
            }
327
0
        }
328
0
    }
329
330
0
    private func makeAllTimeoutTask(_ context: ChannelHandlerContext, _ timeout: TimeAmount) -> (() -> Void) {
331
0
        {
332
0
            guard self.shouldReschedule(context) else {
333
0
                return
334
0
            }
335
0
336
0
            if self.reading {
337
0
                self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
338
0
                    in: timeout,
339
0
                    self.makeAllTimeoutTask(context, timeout)
340
0
                )
341
0
                return
342
0
            }
343
0
            let lastRead = self.lastReadTime
344
0
            let lastWrite = self.lastWriteCompleteTime
345
0
            let latestLast = max(lastRead, lastWrite)
346
0
347
0
            let diff = .now() - latestLast
348
0
            if diff >= timeout {
349
0
                // Reader is idle - set a new timeout and trigger an event through the pipeline
350
0
                self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
351
0
                    in: timeout,
352
0
                    self.makeAllTimeoutTask(context, timeout)
353
0
                )
354
0
355
0
                context.fireUserInboundEventTriggered(IdleStateEvent.all)
356
0
            } else {
357
0
                // Read occurred before the timeout - set a new timeout with shorter delay.
358
0
                self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
359
0
                    deadline: latestLast + timeout,
360
0
                    self.makeAllTimeoutTask(context, timeout)
361
0
                )
362
0
            }
363
0
        }
364
0
    }
365
366
    private func schedule(
367
        _ context: ChannelHandlerContext,
368
        _ amount: TimeAmount?,
369
        _ body: @escaping (ChannelHandlerContext, TimeAmount) -> (() -> Void)
370
0
    ) -> Scheduled<Void>? {
371
0
        if let timeout = amount {
372
0
            return context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(in: timeout, body(context, timeout))
373
0
        }
374
0
        return nil
375
0
    }
376
377
0
    private func initIdleTasks(_ context: ChannelHandlerContext) {
378
0
        let now = NIODeadline.now()
379
0
        lastReadTime = now
380
0
        lastWriteCompleteTime = now
381
0
        scheduledReaderTask = schedule(context, readTimeout, makeReadTimeoutTask)
Unexecuted instantiation: $s7NIOCore16IdleStateHandlerC04initB5Tasks33_87C0348A78401637B574B38C44A1A639LLyyAA07ChannelD7ContextCFyycAG_AA10TimeAmountVtcACcfu_
Unexecuted instantiation: $s7NIOCore16IdleStateHandlerC04initB5Tasks33_87C0348A78401637B574B38C44A1A639LLyyAA07ChannelD7ContextCFyycAG_AA10TimeAmountVtcACcfu_yycAG_AItcfu0_
382
0
        scheduledWriterTask = schedule(context, writeTimeout, makeWriteTimeoutTask)
Unexecuted instantiation: $s7NIOCore16IdleStateHandlerC04initB5Tasks33_87C0348A78401637B574B38C44A1A639LLyyAA07ChannelD7ContextCFyycAG_AA10TimeAmountVtcACcfu1_
Unexecuted instantiation: $s7NIOCore16IdleStateHandlerC04initB5Tasks33_87C0348A78401637B574B38C44A1A639LLyyAA07ChannelD7ContextCFyycAG_AA10TimeAmountVtcACcfu1_yycAG_AItcfu2_
383
0
        scheduledAllTask = schedule(context, allTimeout, makeAllTimeoutTask)
Unexecuted instantiation: $s7NIOCore16IdleStateHandlerC04initB5Tasks33_87C0348A78401637B574B38C44A1A639LLyyAA07ChannelD7ContextCFyycAG_AA10TimeAmountVtcACcfu3_
Unexecuted instantiation: $s7NIOCore16IdleStateHandlerC04initB5Tasks33_87C0348A78401637B574B38C44A1A639LLyyAA07ChannelD7ContextCFyycAG_AA10TimeAmountVtcACcfu3_yycAG_AItcfu4_
384
0
    }
385
386
0
    private func cancelIdleTasks(_ context: ChannelHandlerContext) {
387
0
        scheduledReaderTask?.cancel()
388
0
        scheduledWriterTask?.cancel()
389
0
        scheduledAllTask?.cancel()
390
0
        scheduledReaderTask = nil
391
0
        scheduledWriterTask = nil
392
0
        scheduledAllTask = nil
393
0
    }
394
}
395
396
@available(*, unavailable)
397
extension IdleStateHandler: Sendable {}