Coverage Report

Created: 2026-03-12 06:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/swift-nio/Sources/NIOPosix/BaseSocketChannel.swift
Line
Count
Source
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
15
#if !os(WASI)
16
17
import Atomics
18
import NIOConcurrencyHelpers
19
import NIOCore
20
21
private struct SocketChannelLifecycleManager {
22
    // MARK: Types
23
    private enum State {
24
        case fresh
25
        case preRegistered  // register() has been run but the selector doesn't know about it yet
26
        case fullyRegistered  // fully registered, ie. the selector knows about it
27
        case preActivation  // activated, but we have not fired channelActivated yet
28
        case fullyActivated  // fully activated, i.e., the signal has been fired
29
        case closed
30
    }
31
32
    private enum Event {
33
        case beginActivation
34
        case finishActivation
35
        case beginRegistration
36
        case finishRegistration
37
        case close
38
    }
39
40
    // MARK: properties
41
    private let eventLoop: EventLoop
42
    // this is queried from the Channel, ie. must be thread-safe
43
    internal let isActiveAtomic: ManagedAtomic<Bool>
44
    // these are only to be accessed on the EventLoop
45
46
    // have we seen the `.readEOF` notification
47
    // note: this can be `false` on a deactivated channel, we might just have torn it down.
48
0
    var hasSeenEOFNotification: Bool = false
49
50
    // Should we support transition from `active` to `active`, used by datagram sockets.
51
    let supportsReconnect: Bool
52
53
0
    private var currentState: State = .fresh {
54
0
        didSet {
55
0
            self.eventLoop.assertInEventLoop()
56
0
            switch (oldValue, self.currentState) {
57
0
            case (_, .preActivation):
58
0
                self.isActiveAtomic.store(true, ordering: .relaxed)
59
0
            case (.preActivation, .fullyActivated):
60
0
                ()  // Stay active
61
0
            case (.preActivation, _), (.fullyActivated, _):
62
0
                self.isActiveAtomic.store(false, ordering: .relaxed)
63
0
            default:
64
0
                ()
65
0
            }
66
0
        }
67
    }
68
69
    // MARK: API
70
    // isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable
71
    internal init(
72
        eventLoop: EventLoop,
73
        isActiveAtomic: ManagedAtomic<Bool>,
74
        supportReconnect: Bool
75
0
    ) {
76
0
        self.eventLoop = eventLoop
77
0
        self.isActiveAtomic = isActiveAtomic
78
0
        self.supportsReconnect = supportReconnect
79
0
    }
80
81
    // this is called from Channel's deinit, so don't assert we're on the EventLoop!
82
0
    internal var canBeDestroyed: Bool {
83
0
        self.currentState == .closed
84
0
    }
85
86
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
87
    @inline(__always)
88
0
    internal mutating func beginRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
89
0
        self.moveState(event: .beginRegistration)
90
0
    }
91
92
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
93
    @inline(__always)
94
0
    internal mutating func finishRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
95
0
        self.moveState(event: .finishRegistration)
96
0
    }
97
98
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
99
    @inline(__always)
100
0
    internal mutating func close() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
101
0
        self.moveState(event: .close)
102
0
    }
103
104
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
105
    @inline(__always)
106
0
    internal mutating func beginActivation() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
107
0
        self.moveState(event: .beginActivation)
108
0
    }
109
110
    @inline(__always)
111
0
    internal mutating func finishActivation() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
112
0
        self.moveState(event: .finishActivation)
113
0
    }
114
115
    // MARK: private API
116
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
117
    @inline(__always)
118
0
    private mutating func moveState(event: Event) -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
119
0
        self.eventLoop.assertInEventLoop()
120
0
121
0
        switch (self.currentState, event) {
122
0
        // origin: .fresh
123
0
        case (.fresh, .beginRegistration):
124
0
            self.currentState = .preRegistered
125
0
            return { promise, pipeline in
126
0
                promise?.succeed(())
127
0
                pipeline.syncOperations.fireChannelRegistered()
128
0
            }
129
0
130
0
        case (.fresh, .close):
131
0
            self.currentState = .closed
132
0
            return { (promise, _: ChannelPipeline) in
133
0
                promise?.succeed(())
134
0
            }
135
0
136
0
        // origin: .preRegistered
137
0
        case (.preRegistered, .finishRegistration):
138
0
            self.currentState = .fullyRegistered
139
0
            return { (promise, _: ChannelPipeline) in
140
0
                promise?.succeed(())
141
0
            }
142
0
143
0
        // origin: .fullyRegistered
144
0
        case (.fullyRegistered, .beginActivation):
145
0
            self.currentState = .preActivation
146
0
            return { promise, pipeline in
147
0
                promise?.succeed(())
148
0
            }
149
0
150
0
        // origin: .preRegistered || .fullyRegistered
151
0
        case (.preRegistered, .close), (.fullyRegistered, .close):
152
0
            self.currentState = .closed
153
0
            return { promise, pipeline in
154
0
                promise?.succeed(())
155
0
                pipeline.syncOperations.fireChannelUnregistered()
156
0
            }
157
0
158
0
        // origin: .preActivation
159
0
        case (.preActivation, .finishActivation):
160
0
            self.currentState = .fullyActivated
161
0
            return { promise, pipeline in
162
0
                assert(promise == nil)
163
0
                pipeline.syncOperations.fireChannelActive()
164
0
            }
165
0
166
0
        case (.preActivation, .close):
167
0
            self.currentState = .closed
168
0
            return { promise, pipeline in
169
0
                promise?.succeed(())
170
0
                // Don't fire channelInactive here.
171
0
                pipeline.syncOperations.fireChannelUnregistered()
172
0
            }
173
0
174
0
        case (.preActivation, .beginActivation) where self.supportsReconnect:
175
0
            return { promise, pipeline in
176
0
                promise?.succeed(())
177
0
            }
178
0
179
0
        // origin: .fullyActivated
180
0
        case (.fullyActivated, .close):
181
0
            self.currentState = .closed
182
0
            return { promise, pipeline in
183
0
                promise?.succeed(())
184
0
                pipeline.syncOperations.fireChannelInactive()
185
0
                pipeline.syncOperations.fireChannelUnregistered()
186
0
            }
187
0
188
0
        case (.fullyActivated, .beginActivation) where self.supportsReconnect:
189
0
            return { promise, pipeline in
190
0
                promise?.succeed(())
191
0
            }
192
0
193
0
        case (.fullyActivated, .finishActivation) where self.supportsReconnect:
194
0
            return { promise, pipeline in
195
0
                assert(promise == nil)
196
0
            }
197
0
198
0
        // bad transitions
199
0
        case (.fresh, .beginActivation),  // should go through .registered first
200
0
            (.fresh, .finishActivation),  // should go through .registerd first
201
0
            (.preRegistered, .beginActivation),  // need to first be fully registered
202
0
            (.preRegistered, .finishActivation),  // need to first be fully registered
203
0
            (.preRegistered, .beginRegistration),  // already registered
204
0
            (.fullyRegistered, .beginRegistration),  // already registered
205
0
            (.fullyRegistered, .finishActivation),  // need to activate first
206
0
            (.preActivation, .beginActivation),  // already activated
207
0
            (.preActivation, .beginRegistration),  // already fully registered (and activated)
208
0
            (.preActivation, .finishRegistration),  // already fully registered (and activated)
209
0
            (.fullyActivated, .beginActivation),  // need to activate first
210
0
            (.fullyActivated, .beginRegistration),  // already fully registered
211
0
            (.fullyActivated, .finishRegistration),  // already fully registered
212
0
            (.fullyActivated, .finishActivation),  // already signaled
213
0
            (.fullyRegistered, .finishRegistration),  // already fully registered
214
0
            (.fresh, .finishRegistration),  // need to register lazily first
215
0
            (.closed, _):  // already closed
216
0
            self.badTransition(event: event)
217
0
        }
218
0
    }
219
220
0
    private func badTransition(event: Event) -> Never {
221
0
        preconditionFailure("illegal transition: state=\(self.currentState), event=\(event)")
222
0
    }
223
224
    // MARK: convenience properties
225
0
    internal var isActive: Bool {
226
0
        self.eventLoop.assertInEventLoop()
227
0
        switch self.currentState {
228
0
        case .preActivation, .fullyActivated:
229
0
            return true
230
0
        case .fresh, .preRegistered, .fullyRegistered, .closed:
231
0
            return false
232
0
        }
233
0
    }
234
235
0
    internal var isPreRegistered: Bool {
236
0
        self.eventLoop.assertInEventLoop()
237
0
        switch self.currentState {
238
0
        case .fresh, .closed:
239
0
            return false
240
0
        case .preRegistered, .fullyRegistered, .preActivation, .fullyActivated:
241
0
            return true
242
0
        }
243
0
    }
244
245
0
    internal var isRegisteredFully: Bool {
246
0
        self.eventLoop.assertInEventLoop()
247
0
        switch self.currentState {
248
0
        case .fresh, .closed, .preRegistered:
249
0
            return false
250
0
        case .fullyRegistered, .preActivation, .fullyActivated:
251
0
            return true
252
0
        }
253
0
    }
254
255
    /// Returns whether the underlying file descriptor is open. This property will always be true (even before registration)
256
    /// until the Channel is closed.
257
0
    internal var isOpen: Bool {
258
0
        self.eventLoop.assertInEventLoop()
259
0
        return self.currentState != .closed
260
0
    }
261
}
262
263
/// The base class for all socket-based channels in NIO.
264
///
265
/// There are many types of specialised socket-based channel in NIO. Each of these
266
/// has different logic regarding how exactly they read from and write to the network.
267
/// However, they share a great deal of common logic around the managing of their
268
/// file descriptors.
269
///
270
/// For this reason, `BaseSocketChannel` exists to provide a common core implementation of
271
/// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks
272
/// for subclasses to implement the specific logic to handle their writes and reads.
273
class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, ChannelCore, @unchecked Sendable {
274
    typealias SelectableType = SocketType.SelectableType
275
276
    struct AddressCache {
277
        // deliberately lets because they must always be updated together (so forcing `init` is useful).
278
        let local: SocketAddress?
279
        let remote: SocketAddress?
280
281
0
        init(local: SocketAddress?, remote: SocketAddress?) {
282
0
            self.local = local
283
0
            self.remote = remote
284
0
        }
285
    }
286
287
    // MARK: - Stored Properties
288
    // MARK: Constants & atomics (accessible everywhere)
289
    public let parent: Channel?
290
    internal let socket: SocketType
291
    private let closePromise: EventLoopPromise<Void>
292
    internal let selectableEventLoop: SelectableEventLoop
293
0
    private let _offEventLoopLock = NIOLock()
294
0
    private let isActiveAtomic: ManagedAtomic<Bool> = .init(false)
295
    // just a thread-safe way of having something to print about the socket from any thread
296
    internal let socketDescription: String
297
298
    // MARK: Variables, on EventLoop thread only
299
0
    var readPending = false
300
    var pendingConnect: Optional<EventLoopPromise<Void>>
301
    var recvBufferPool: NIOPooledRecvBufferAllocator
302
0
    var maxMessagesPerRead: UInt = 4
303
0
    private var inFlushNow: Bool = false  // Guard against re-entrance of flushNow() method.
304
0
    private var autoRead: Bool = true
305
306
    // MARK: Variables that are really constant
307
    // this is really a constant (set in .init) but needs `self` to be constructed and
308
    // therefore a `var`. Do not change as this needs to accessed from arbitrary threads
309
0
    private var _pipeline: ChannelPipeline! = nil
310
311
    // MARK: Special variables, please read comments.
312
    // For reads guarded by _either_ `self._offEventLoopLock` or the EL thread
313
    // Writes are guarded by _offEventLoopLock _and_ the EL thread.
314
    // PLEASE don't use these directly and use the non-underscored computed properties instead.
315
0
    private var _addressCache = AddressCache(local: nil, remote: nil)  // please use `self.addressesCached` instead
316
    private var _bufferAllocatorCache: ByteBufferAllocator  // please use `self.bufferAllocatorCached` instead.
317
318
    // MARK: - Computed properties
319
    // This is called from arbitrary threads.
320
    internal var addressesCached: AddressCache {
321
0
        get {
322
0
            if self.eventLoop.inEventLoop {
323
0
                return self._addressCache
324
0
            } else {
325
0
                return self._offEventLoopLock.withLock {
326
0
                    self._addressCache
327
0
                }
328
0
            }
329
0
        }
330
0
        set {
331
0
            self.eventLoop.preconditionInEventLoop()
332
0
            self._offEventLoopLock.withLock {
333
0
                self._addressCache = newValue
334
0
            }
335
0
        }
336
    }
337
338
    // This is called from arbitrary threads.
339
    private var bufferAllocatorCached: ByteBufferAllocator {
340
0
        get {
341
0
            if self.eventLoop.inEventLoop {
342
0
                return self._bufferAllocatorCache
343
0
            } else {
344
0
                return self._offEventLoopLock.withLock {
345
0
                    self._bufferAllocatorCache
346
0
                }
347
0
            }
348
0
        }
349
0
        set {
350
0
            self.eventLoop.preconditionInEventLoop()
351
0
            self._offEventLoopLock.withLock {
352
0
                self._bufferAllocatorCache = newValue
353
0
            }
354
0
        }
355
    }
356
357
    // We start with the invalid empty set of selector events we're interested in. This is to make sure we later on
358
    // (in `becomeFullyRegistered0`) seed the initial event correctly.
359
0
    internal var interestedEvent: SelectorEventSet = [] {
360
0
        didSet {
361
0
            assert(self.interestedEvent.contains(.reset), "impossible to unregister for reset")
362
0
        }
363
    }
364
365
    private var lifecycleManager: SocketChannelLifecycleManager {
366
0
        didSet {
367
0
            self.eventLoop.assertInEventLoop()
368
0
        }
369
    }
370
371
0
    private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() {
372
0
        didSet {
373
0
            self.eventLoop.assertInEventLoop()
374
0
            self.bufferAllocatorCached = self.bufferAllocator
375
0
        }
376
    }
377
378
0
    public final var _channelCore: ChannelCore { self }
379
380
    // This is `Channel` API so must be thread-safe.
381
0
    public final var localAddress: SocketAddress? {
382
0
        self.addressesCached.local
383
0
    }
384
385
    // This is `Channel` API so must be thread-safe.
386
0
    public final var remoteAddress: SocketAddress? {
387
0
        self.addressesCached.remote
388
0
    }
389
390
    /// `false` if the whole `Channel` is closed and so no more IO operation can be done.
391
0
    var isOpen: Bool {
392
0
        self.eventLoop.assertInEventLoop()
393
0
        return self.lifecycleManager.isOpen
394
0
    }
395
396
0
    var isRegistered: Bool {
397
0
        self.eventLoop.assertInEventLoop()
398
0
        return self.lifecycleManager.isPreRegistered
399
0
    }
400
401
    // This is `Channel` API so must be thread-safe.
402
0
    public var isActive: Bool {
403
0
        self.isActiveAtomic.load(ordering: .relaxed)
404
0
    }
405
406
    // This is `Channel` API so must be thread-safe.
407
0
    public final var closeFuture: EventLoopFuture<Void> {
408
0
        self.closePromise.futureResult
409
0
    }
410
411
0
    public final var eventLoop: EventLoop {
412
0
        selectableEventLoop
413
0
    }
414
415
    // This is `Channel` API so must be thread-safe.
416
0
    public var isWritable: Bool {
417
0
        true
418
0
    }
419
420
    // This is `Channel` API so must be thread-safe.
421
0
    public final var allocator: ByteBufferAllocator {
422
0
        self.bufferAllocatorCached
423
0
    }
424
425
    // This is `Channel` API so must be thread-safe.
426
0
    public final var pipeline: ChannelPipeline {
427
0
        self._pipeline
428
0
    }
429
430
    // MARK: Methods to override in subclasses.
431
0
    func writeToSocket() throws -> OverallWriteResult {
432
0
        fatalError("must be overridden")
433
0
    }
434
435
    /// Read data from the underlying socket and dispatch it to the `ChannelPipeline`
436
    ///
437
    /// - Returns: `true` if any data was read, `false` otherwise.
438
0
    @discardableResult func readFromSocket() throws -> ReadResult {
439
0
        fatalError("this must be overridden by sub class")
440
0
    }
441
442
    // MARK: - Datatypes
443
444
    /// Indicates if a selectable should registered or not for IO notifications.
445
    enum IONotificationState {
446
        /// We should be registered for IO notifications.
447
        case register
448
449
        /// We should not be registered for IO notifications.
450
        case unregister
451
    }
452
453
    enum ReadResult {
454
        /// Nothing was read by the read operation.
455
        case none
456
457
        /// Some data was read by the read operation.
458
        case some
459
    }
460
461
    /// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
462
    /// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
463
    private enum ReadStreamState: Equatable {
464
        /// Everything seems normal
465
        case normal(ReadResult)
466
467
        /// We saw EOF.
468
        case eof
469
470
        /// A read error was received.
471
        case error
472
    }
473
474
    /// Begin connection of the underlying socket.
475
    ///
476
    /// - Parameters:
477
    ///   - to: The `SocketAddress` to connect to.
478
    /// - Returns: `true` if the socket connected synchronously, `false` otherwise.
479
0
    func connectSocket(to address: SocketAddress) throws -> Bool {
480
0
        fatalError("this must be overridden by sub class")
481
0
    }
482
483
    /// Begin connection of the underlying socket.
484
    ///
485
    /// - Parameters:
486
    ///   - to: The `VsockAddress` to connect to.
487
    /// - Returns: `true` if the socket connected synchronously, `false` otherwise.
488
0
    func connectSocket(to address: VsockAddress) throws -> Bool {
489
0
        fatalError("this must be overridden by sub class")
490
0
    }
491
492
    enum ConnectTarget {
493
        case socketAddress(SocketAddress)
494
        case vsockAddress(VsockAddress)
495
    }
496
497
    /// Begin connection of the underlying socket.
498
    ///
499
    /// - Parameters:
500
    ///   - to: The target to connect to.
501
    /// - Returns: `true` if the socket connected synchronously, `false` otherwise.
502
0
    final func connectSocket(to target: ConnectTarget) throws -> Bool {
503
0
        switch target {
504
0
        case .socketAddress(let address):
505
0
            return try self.connectSocket(to: address)
506
0
        case .vsockAddress(let address):
507
0
            return try self.connectSocket(to: address)
508
0
        }
509
0
    }
510
511
    /// Make any state changes needed to complete the connection process.
512
0
    func finishConnectSocket() throws {
513
0
        fatalError("this must be overridden by sub class")
514
0
    }
515
516
    /// Returns if there are any flushed, pending writes to be sent over the network.
517
0
    func hasFlushedPendingWrites() -> Bool {
518
0
        fatalError("this must be overridden by sub class")
519
0
    }
520
521
    /// Buffer a write in preparation for a flush.
522
0
    func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
523
0
        fatalError("this must be overridden by sub class")
524
0
    }
525
526
    /// Mark a flush point. This is called when flush is received, and instructs
527
    /// the implementation to record the flush.
528
0
    func markFlushPoint() {
529
0
        fatalError("this must be overridden by sub class")
530
0
    }
531
532
    /// Called when closing, to instruct the specific implementation to discard all pending
533
    /// writes.
534
0
    func cancelWritesOnClose(error: Error) {
535
0
        fatalError("this must be overridden by sub class")
536
0
    }
537
538
    // MARK: Common base socket logic.
539
    init(
540
        socket: SocketType,
541
        parent: Channel?,
542
        eventLoop: SelectableEventLoop,
543
        recvAllocator: RecvByteBufferAllocator,
544
        supportReconnect: Bool
545
0
    ) throws {
546
0
        self._bufferAllocatorCache = self.bufferAllocator
547
0
        self.socket = socket
548
0
        self.selectableEventLoop = eventLoop
549
0
        self.closePromise = eventLoop.makePromise()
550
0
        self.parent = parent
551
0
        self.recvBufferPool = .init(capacity: Int(self.maxMessagesPerRead), recvAllocator: recvAllocator)
552
0
        // As the socket may already be connected we should ensure we start with the correct addresses cached.
553
0
        self._addressCache = .init(local: try? socket.localAddress(), remote: try? socket.remoteAddress())
554
0
        self.lifecycleManager = SocketChannelLifecycleManager(
555
0
            eventLoop: eventLoop,
556
0
            isActiveAtomic: self.isActiveAtomic,
557
0
            supportReconnect: supportReconnect
558
0
        )
559
0
        self.socketDescription = socket.description
560
0
        self.pendingConnect = nil
561
0
        self._pipeline = ChannelPipeline(channel: self)
562
0
    }
563
564
0
    deinit {
565
0
        assert(
566
0
            self.lifecycleManager.canBeDestroyed,
567
0
            "leak of open Channel, state: \(String(describing: self.lifecycleManager))"
568
0
        )
569
0
    }
570
571
0
    public final func localAddress0() throws -> SocketAddress {
572
0
        self.eventLoop.assertInEventLoop()
573
0
        guard self.isOpen else {
574
0
            throw ChannelError._ioOnClosedChannel
575
0
        }
576
0
        return try self.socket.localAddress()
577
0
    }
578
579
0
    public final func remoteAddress0() throws -> SocketAddress {
580
0
        self.eventLoop.assertInEventLoop()
581
0
        guard self.isOpen else {
582
0
            throw ChannelError._ioOnClosedChannel
583
0
        }
584
0
        return try self.socket.remoteAddress()
585
0
    }
586
587
    /// Flush data to the underlying socket and return if this socket needs to be registered for write notifications.
588
    ///
589
    /// This method can be called re-entrantly but it will return immediately because the first call is responsible
590
    /// for sending all flushed writes, even the ones that are accumulated whilst `flushNow()` is running.
591
    ///
592
    /// - Returns: If this socket should be registered for write notifications. Ie. `IONotificationState.register` if
593
    ///            _not_ all data could be written, so notifications are necessary; and `IONotificationState.unregister`
594
    ///             if everything was written and we don't need to be notified about writability at the moment.
595
0
    func flushNow() -> IONotificationState {
596
0
        self.eventLoop.assertInEventLoop()
597
0
598
0
        // Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by
599
0
        // `writeToSocket`.
600
0
        guard !self.inFlushNow else {
601
0
            return .unregister
602
0
        }
603
0
604
0
        assert(!self.inFlushNow)
605
0
        self.inFlushNow = true
606
0
        defer {
607
0
            self.inFlushNow = false
608
0
        }
609
0
610
0
        var newWriteRegistrationState: IONotificationState = .unregister
611
0
        while newWriteRegistrationState == .unregister && self.hasFlushedPendingWrites() && self.isOpen {
612
0
            let writeResult: OverallWriteResult
613
0
            do {
614
0
                assert(self.lifecycleManager.isActive)
615
0
                writeResult = try self.writeToSocket()
616
0
                if writeResult.writabilityChange {
617
0
                    // We went from not writable to writable.
618
0
                    self.pipeline.syncOperations.fireChannelWritabilityChanged()
619
0
                }
620
0
            } catch let err {
621
0
                // If there is a write error we should try drain the inbound before closing the socket as there may be some data pending.
622
0
                // We ignore any error that is thrown as we will use the original err to close the channel and notify the user.
623
0
                if self.readIfNeeded0() {
624
0
                    assert(self.lifecycleManager.isActive)
625
0
626
0
                    // We need to continue reading until there is nothing more to be read from the socket as we will not have another chance to drain it.
627
0
                    var readAtLeastOnce = false
628
0
                    while let read = try? self.readFromSocket(), read == .some {
629
0
                        readAtLeastOnce = true
630
0
                    }
631
0
                    if readAtLeastOnce && self.lifecycleManager.isActive {
632
0
                        self.pipeline.fireChannelReadComplete()
633
0
                    }
634
0
                }
635
0
636
0
                self.close0(error: err, mode: .all, promise: nil)
637
0
638
0
                // we handled all writes
639
0
                return .unregister
640
0
            }
641
0
642
0
            switch writeResult.writeResult {
643
0
            case .couldNotWriteEverything:
644
0
                newWriteRegistrationState = .register
645
0
            case .writtenCompletely(let closeState):
646
0
                newWriteRegistrationState = .unregister
647
0
                switch closeState {
648
0
                case .open:
649
0
                    ()
650
0
                case .readyForClose:
651
0
                    self.close0(error: ChannelError.outputClosed, mode: .output, promise: nil)
652
0
                case .closed:
653
0
                    ()  // we can be flushed before becoming active
654
0
                }
655
0
            }
656
0
657
0
            if !self.isOpen || !self.hasFlushedPendingWrites() {
658
0
                // No further writes, unregister. We won't re-enter the loop as both of these would have to be true.
659
0
                newWriteRegistrationState = .unregister
660
0
            }
661
0
        }
662
0
663
0
        assert(
664
0
            (newWriteRegistrationState == .register && self.hasFlushedPendingWrites())
665
0
                || (newWriteRegistrationState == .unregister && !self.hasFlushedPendingWrites()),
666
0
            "illegal flushNow decision: \(newWriteRegistrationState) and \(self.hasFlushedPendingWrites())"
667
0
        )
668
0
        return newWriteRegistrationState
669
0
    }
670
671
0
    public final func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
672
0
        if eventLoop.inEventLoop {
673
0
            let promise = eventLoop.makePromise(of: Void.self)
674
0
            executeAndComplete(promise) { try self.setOption0(option, value: value) }
675
0
            return promise.futureResult
676
0
        } else {
677
0
            return eventLoop.submit { try self.setOption0(option, value: value) }
678
0
        }
679
0
    }
680
681
0
    func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
682
0
        self.eventLoop.assertInEventLoop()
683
0
684
0
        guard isOpen else {
685
0
            throw ChannelError._ioOnClosedChannel
686
0
        }
687
0
688
0
        switch option {
689
0
        case let option as ChannelOptions.Types.SocketOption:
690
0
            try self.setSocketOption0(level: option.optionLevel, name: option.optionName, value: value)
691
0
        case _ as ChannelOptions.Types.AllocatorOption:
692
0
            bufferAllocator = value as! ByteBufferAllocator
693
0
        case _ as ChannelOptions.Types.RecvAllocatorOption:
694
0
            self.recvBufferPool.recvAllocator = value as! RecvByteBufferAllocator
695
0
        case _ as ChannelOptions.Types.AutoReadOption:
696
0
            let auto = value as! Bool
697
0
            let old = self.autoRead
698
0
            self.autoRead = auto
699
0
700
0
            // We only want to call read0() or pauseRead0() if we already registered to the EventLoop if not this will be automatically done
701
0
            // once register0 is called. Beside this we also only need to do it when the value actually change.
702
0
            if self.lifecycleManager.isPreRegistered && old != auto {
703
0
                if auto {
704
0
                    read0()
705
0
                } else {
706
0
                    pauseRead0()
707
0
                }
708
0
            }
709
0
        case _ as ChannelOptions.Types.MaxMessagesPerReadOption:
710
0
            self.maxMessagesPerRead = value as! UInt
711
0
            self.recvBufferPool.updateCapacity(to: Int(self.maxMessagesPerRead))
712
0
        default:
713
0
            fatalError("option \(option) not supported")
714
0
        }
715
0
    }
716
717
0
    public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
718
0
        if eventLoop.inEventLoop {
719
0
            do {
720
0
                return self.eventLoop.makeSucceededFuture(try self.getOption0(option))
721
0
            } catch {
722
0
                return self.eventLoop.makeFailedFuture(error)
723
0
            }
724
0
        } else {
725
0
            return self.eventLoop.submit { try self.getOption0(option) }
726
0
        }
727
0
    }
728
729
0
    func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
730
0
        self.eventLoop.assertInEventLoop()
731
0
732
0
        guard isOpen else {
733
0
            throw ChannelError._ioOnClosedChannel
734
0
        }
735
0
736
0
        switch option {
737
0
        case let option as ChannelOptions.Types.SocketOption:
738
0
            return try self.getSocketOption0(level: option.optionLevel, name: option.optionName)
739
0
        case _ as ChannelOptions.Types.AllocatorOption:
740
0
            return bufferAllocator as! Option.Value
741
0
        case _ as ChannelOptions.Types.RecvAllocatorOption:
742
0
            return self.recvBufferPool.recvAllocator as! Option.Value
743
0
        case _ as ChannelOptions.Types.AutoReadOption:
744
0
            return autoRead as! Option.Value
745
0
        case _ as ChannelOptions.Types.MaxMessagesPerReadOption:
746
0
            return maxMessagesPerRead as! Option.Value
747
0
        default:
748
0
            fatalError("option \(option) not supported")
749
0
        }
750
0
    }
751
752
    /// Triggers a `ChannelPipeline.read()` if `autoRead` is enabled.`
753
    ///
754
    /// - Returns: `true` if `readPending` is `true`, `false` otherwise.
755
0
    @discardableResult func readIfNeeded0() -> Bool {
756
0
        self.eventLoop.assertInEventLoop()
757
0
        if !self.lifecycleManager.isActive {
758
0
            return false
759
0
        }
760
0
761
0
        if !readPending && autoRead {
762
0
            self.pipeline.syncOperations.read()
763
0
        }
764
0
        return readPending
765
0
    }
766
767
    // Methods invoked from the HeadHandler of the ChannelPipeline
768
0
    public func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
769
0
        self.eventLoop.assertInEventLoop()
770
0
771
0
        guard self.isOpen else {
772
0
            promise?.fail(ChannelError._ioOnClosedChannel)
773
0
            return
774
0
        }
775
0
776
0
        executeAndComplete(promise) {
777
0
            try socket.bind(to: address)
778
0
            self.updateCachedAddressesFromSocket(updateRemote: false)
779
0
        }
780
0
    }
781
782
0
    public final func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
783
0
        self.eventLoop.assertInEventLoop()
784
0
785
0
        guard self.isOpen else {
786
0
            // Channel was already closed, fail the promise and not even queue it.
787
0
            promise?.fail(ChannelError._ioOnClosedChannel)
788
0
            return
789
0
        }
790
0
791
0
        bufferPendingWrite(data: data, promise: promise)
792
0
    }
793
794
0
    private func registerForWritable() {
795
0
        self.eventLoop.assertInEventLoop()
796
0
797
0
        guard !self.interestedEvent.contains(.write) else {
798
0
            // nothing to do if we were previously interested in write
799
0
            return
800
0
        }
801
0
        self.safeReregister(interested: self.interestedEvent.union(.write))
802
0
    }
803
804
0
    func unregisterForWritable() {
805
0
        self.eventLoop.assertInEventLoop()
806
0
807
0
        guard self.interestedEvent.contains(.write) else {
808
0
            // nothing to do if we were not previously interested in write
809
0
            return
810
0
        }
811
0
        self.safeReregister(interested: self.interestedEvent.subtracting(.write))
812
0
    }
813
814
0
    public final func flush0() {
815
0
        self.eventLoop.assertInEventLoop()
816
0
817
0
        guard self.isOpen else {
818
0
            return
819
0
        }
820
0
821
0
        self.markFlushPoint()
822
0
823
0
        guard self.lifecycleManager.isActive else {
824
0
            return
825
0
        }
826
0
827
0
        if !isWritePending() && flushNow() == .register {
828
0
            assert(self.lifecycleManager.isPreRegistered)
829
0
            registerForWritable()
830
0
        }
831
0
    }
832
833
0
    public func read0() {
834
0
        self.eventLoop.assertInEventLoop()
835
0
836
0
        guard self.isOpen else {
837
0
            return
838
0
        }
839
0
        readPending = true
840
0
841
0
        if self.lifecycleManager.isPreRegistered {
842
0
            registerForReadable()
843
0
        }
844
0
    }
845
846
0
    private final func pauseRead0() {
847
0
        self.eventLoop.assertInEventLoop()
848
0
849
0
        if self.lifecycleManager.isPreRegistered {
850
0
            unregisterForReadable()
851
0
        }
852
0
    }
853
854
0
    private final func registerForReadable() {
855
0
        self.eventLoop.assertInEventLoop()
856
0
        assert(self.lifecycleManager.isRegisteredFully)
857
0
858
0
        guard !self.lifecycleManager.hasSeenEOFNotification else {
859
0
            // we have seen an EOF notification before so there's no point in registering for reads
860
0
            return
861
0
        }
862
0
863
0
        guard !self.interestedEvent.contains(.read) else {
864
0
            return
865
0
        }
866
0
867
0
        self.safeReregister(interested: self.interestedEvent.union(.read))
868
0
    }
869
870
0
    private final func registerForReadEOF() {
871
0
        self.eventLoop.assertInEventLoop()
872
0
        assert(self.lifecycleManager.isRegisteredFully)
873
0
874
0
        guard !self.lifecycleManager.hasSeenEOFNotification else {
875
0
            // we have seen an EOF notification before so there's no point in registering for reads
876
0
            return
877
0
        }
878
0
879
0
        guard !self.interestedEvent.contains(.readEOF) else {
880
0
            return
881
0
        }
882
0
883
0
        self.safeReregister(interested: self.interestedEvent.union(.readEOF))
884
0
    }
885
886
0
    internal final func unregisterForReadable() {
887
0
        self.eventLoop.assertInEventLoop()
888
0
        assert(self.lifecycleManager.isRegisteredFully)
889
0
890
0
        guard self.interestedEvent.contains(.read) else {
891
0
            return
892
0
        }
893
0
894
0
        self.safeReregister(interested: self.interestedEvent.subtracting(.read))
895
0
    }
896
897
    /// Closes the this `BaseChannelChannel` and fulfills `promise` with the result of the _close_ operation.
898
    /// So unless either the deregistration or the close itself fails, `promise` will be succeeded regardless of
899
    /// `error`. `error` is used to fail outstanding writes (if any) and the `connectPromise` if set.
900
    ///
901
    /// - Parameters:
902
    ///    - error: The error to fail the outstanding (if any) writes/connect with.
903
    ///    - mode: The close mode, must be `.all` for `BaseSocketChannel`
904
    ///    - promise: The promise that gets notified about the result of the deregistration/close operations.
905
0
    public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
906
0
        self.eventLoop.assertInEventLoop()
907
0
908
0
        guard self.isOpen else {
909
0
            promise?.fail(ChannelError._alreadyClosed)
910
0
            return
911
0
        }
912
0
913
0
        guard mode == .all else {
914
0
            promise?.fail(ChannelError._operationUnsupported)
915
0
            return
916
0
        }
917
0
918
0
        // === BEGIN: No user callouts ===
919
0
920
0
        // this is to register all error callouts as all the callouts must happen after we transition out state
921
0
        var errorCallouts: [(ChannelPipeline) -> Void] = []
922
0
923
0
        self.interestedEvent = .reset
924
0
        do {
925
0
            try selectableEventLoop.deregister(channel: self)
926
0
        } catch let err {
927
0
            errorCallouts.append { pipeline in
928
0
                pipeline.syncOperations.fireErrorCaught(err)
929
0
            }
930
0
        }
931
0
932
0
        let p: EventLoopPromise<Void>?
933
0
        do {
934
0
            try socket.close()
935
0
            p = promise
936
0
        } catch {
937
0
            errorCallouts.append { (_: ChannelPipeline) in
938
0
                promise?.fail(error)
939
0
                // Set p to nil as we want to ensure we pass nil to becomeInactive0(...) so we not try to notify the promise again.
940
0
            }
941
0
            p = nil
942
0
        }
943
0
944
0
        // Transition our internal state.
945
0
        let callouts = self.lifecycleManager.close()
946
0
947
0
        // === END: No user callouts (now that our state is reconciled, we can call out to user code.) ===
948
0
949
0
        // this must be the first to call out as it transitions the PendingWritesManager into the closed state
950
0
        // and we assert elsewhere that the PendingWritesManager has the same idea of 'open' as we have in here.
951
0
        self.cancelWritesOnClose(error: error)
952
0
953
0
        // this should be a no-op as we shouldn't have any
954
0
        for callout in errorCallouts {
955
0
            callout(self.pipeline)
956
0
        }
957
0
958
0
        if let connectPromise = self.pendingConnect {
959
0
            self.pendingConnect = nil
960
0
            connectPromise.fail(error)
961
0
        }
962
0
963
0
        callouts(p, self.pipeline)
964
0
965
0
        eventLoop.execute {
966
0
            // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline
967
0
            self.removeHandlers(pipeline: self.pipeline)
968
0
969
0
            self.closePromise.succeed(())
970
0
971
0
            // Now reset the addresses as we notified all handlers / futures.
972
0
            self.unsetCachedAddressesFromSocket()
973
0
        }
974
0
    }
975
976
0
    public final func register0(promise: EventLoopPromise<Void>?) {
977
0
        self.eventLoop.assertInEventLoop()
978
0
979
0
        guard self.isOpen else {
980
0
            promise?.fail(ChannelError._ioOnClosedChannel)
981
0
            return
982
0
        }
983
0
984
0
        guard !self.lifecycleManager.isPreRegistered else {
985
0
            promise?.fail(ChannelError._inappropriateOperationForState)
986
0
            return
987
0
        }
988
0
989
0
        guard self.selectableEventLoop.isOpen else {
990
0
            let error = EventLoopError._shutdown
991
0
            self.pipeline.syncOperations.fireErrorCaught(error)
992
0
            // `close0`'s error is about the result of the `close` operation, ...
993
0
            self.close0(error: error, mode: .all, promise: nil)
994
0
            // ... therefore we need to fail the registration `promise` separately.
995
0
            promise?.fail(error)
996
0
            return
997
0
        }
998
0
999
0
        // we can't fully register yet as epoll would give us EPOLLHUP if bind/connect wasn't called yet.
1000
0
        self.lifecycleManager.beginRegistration()(promise, self.pipeline)
1001
0
    }
1002
1003
0
    public final func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
1004
0
        self.eventLoop.assertInEventLoop()
1005
0
        assert(self.isOpen)
1006
0
        assert(!self.lifecycleManager.isActive)
1007
0
        let registerPromise = self.eventLoop.makePromise(of: Void.self)
1008
0
        self.register0(promise: registerPromise)
1009
0
        registerPromise.futureResult.whenFailure { (_: Error) in
1010
0
            self.close(promise: nil)
1011
0
        }
1012
0
        registerPromise.futureResult.cascadeFailure(to: promise)
1013
0
1014
0
        if self.lifecycleManager.isPreRegistered {
1015
0
            // we expect kqueue/epoll registration to always succeed which is basically true, except for errors that
1016
0
            // should be fatal (EBADF, EFAULT, ESRCH, ENOMEM) and a two 'table full' (EMFILE, ENFILE) error kinds which
1017
0
            // we don't handle yet but might do in the future (#469).
1018
0
            try! becomeFullyRegistered0()
1019
0
            if self.lifecycleManager.isRegisteredFully {
1020
0
                self.becomeActive0(promise: promise)
1021
0
            }
1022
0
        }
1023
0
    }
1024
1025
0
    public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
1026
0
        switch event {
1027
0
        case let event as VsockChannelEvents.ConnectToAddress:
1028
0
            self.connect0(to: .vsockAddress(event.address), promise: promise)
1029
0
        default:
1030
0
            promise?.fail(ChannelError._operationUnsupported)
1031
0
        }
1032
0
    }
1033
1034
    // Methods invoked from the EventLoop itself
1035
0
    public final func writable() {
1036
0
        self.eventLoop.assertInEventLoop()
1037
0
        assert(self.isOpen)
1038
0
1039
0
        self.finishConnect()  // If we were connecting, that has finished.
1040
0
1041
0
        switch self.flushNow() {
1042
0
        case .unregister:
1043
0
            // Everything was written or connect was complete, let's unregister from writable.
1044
0
            self.finishWritable()
1045
0
        case .register:
1046
0
            assert(!self.isOpen || self.interestedEvent.contains(.write))
1047
0
            ()  // nothing to do because given that we just received `writable`, we're still registered for writable.
1048
0
        }
1049
0
    }
1050
1051
0
    private func finishConnect() {
1052
0
        self.eventLoop.assertInEventLoop()
1053
0
        assert(self.lifecycleManager.isPreRegistered)
1054
0
1055
0
        if let connectPromise = self.pendingConnect {
1056
0
            assert(!self.lifecycleManager.isActive)
1057
0
1058
0
            do {
1059
0
                try self.finishConnectSocket()
1060
0
            } catch {
1061
0
                assert(!self.lifecycleManager.isActive)
1062
0
                // close0 fails the connectPromise itself so no need to do it here
1063
0
                self.close0(error: error, mode: .all, promise: nil)
1064
0
                return
1065
0
            }
1066
0
            // now this has succeeded, becomeActive0 will actually fulfill this.
1067
0
            self.pendingConnect = nil
1068
0
            // We already know what the local address is.
1069
0
            self.updateCachedAddressesFromSocket(updateLocal: false, updateRemote: true)
1070
0
            self.becomeActive0(promise: connectPromise)
1071
0
        } else {
1072
0
            assert(self.lifecycleManager.isActive)
1073
0
        }
1074
0
    }
1075
1076
0
    private func finishWritable() {
1077
0
        self.eventLoop.assertInEventLoop()
1078
0
1079
0
        if self.isOpen {
1080
0
            assert(self.lifecycleManager.isPreRegistered)
1081
0
            assert(!self.hasFlushedPendingWrites())
1082
0
            self.unregisterForWritable()
1083
0
        }
1084
0
    }
1085
1086
0
    func writeEOF() {
1087
0
        fatalError("\(self) received writeEOF which is unexpected")
1088
0
    }
1089
1090
0
    func readEOF() {
1091
0
        assert(!self.lifecycleManager.hasSeenEOFNotification)
1092
0
        self.lifecycleManager.hasSeenEOFNotification = true
1093
0
1094
0
        // we can't be not active but still registered here; this would mean that we got a notification about a
1095
0
        // channel before we're ready to receive them.
1096
0
        assert(
1097
0
            self.lifecycleManager.isRegisteredFully,
1098
0
            "illegal state: \(self): active: \(self.lifecycleManager.isActive), registered: \(self.lifecycleManager.isRegisteredFully)"
1099
0
        )
1100
0
1101
0
        self.readEOF0()
1102
0
1103
0
        assert(!self.interestedEvent.contains(.read))
1104
0
        assert(!self.interestedEvent.contains(.readEOF))
1105
0
    }
1106
1107
0
    final func readEOF0() {
1108
0
        if self.lifecycleManager.isRegisteredFully {
1109
0
            // we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
1110
0
            // reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
1111
0
            // and would anyway fire constantly.
1112
0
            self.safeReregister(interested: self.interestedEvent.subtracting(.readEOF))
1113
0
1114
0
            loop: while self.lifecycleManager.isActive {
1115
0
                switch self.readable0() {
1116
0
                case .eof:
1117
0
                    // on EOF we stop the loop and we're done with our processing for `readEOF`.
1118
0
                    // we could both be registered & active (if our channel supports half-closure) or unregistered & inactive (if it doesn't).
1119
0
                    break loop
1120
0
                case .error:
1121
0
                    // we should be unregistered and inactive now (as `readable0` would've called close).
1122
0
                    assert(!self.lifecycleManager.isActive)
1123
0
                    assert(!self.lifecycleManager.isPreRegistered)
1124
0
                    break loop
1125
0
                case .normal(.none):
1126
0
                    preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.")
1127
0
                case .normal(.some):
1128
0
                    // normal, note that there is no guarantee we're still active (as the user might have closed in callout)
1129
0
                    continue loop
1130
0
                }
1131
0
            }
1132
0
        }
1133
0
    }
1134
1135
    // this _needs_ to synchronously cause the fd to be unregistered because we cannot unregister from `reset`. In
1136
    // other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering
1137
    // the `reset` event.
1138
0
    final func reset() {
1139
0
        self.readEOF0()
1140
0
1141
0
        if self.socket.isOpen {
1142
0
            assert(self.lifecycleManager.isPreRegistered)
1143
0
            let error: IOError
1144
0
            // if the socket is still registered (and therefore open), let's try to get the actual socket error from the socket
1145
0
            do {
1146
0
                let result: Int32 = try self.socket.getOption(level: .socket, name: .so_error)
1147
0
                if result != 0 {
1148
0
                    // we have a socket error, let's forward
1149
0
                    // this path will be executed on Linux (EPOLLERR) & Darwin (ev.fflags != 0) for
1150
0
                    // stream sockets, and most (but not all) errors on datagram sockets
1151
0
                    error = IOError(errnoCode: result, reason: "connection reset (error set)")
1152
0
                } else {
1153
0
                    // we don't have a socket error, this must be connection reset without an error then
1154
0
                    // this path should only be executed on Linux (EPOLLHUP, no EPOLLERR)
1155
                    #if os(Linux)
1156
0
                    let message: String = "connection reset (no error set)"
1157
                    #else
1158
                    let message: String =
1159
                        "BUG IN SwiftNIO (possibly #572), please report! Connection reset (no error set)."
1160
                    #endif
1161
0
                    error = IOError(errnoCode: ECONNRESET, reason: message)
1162
0
                }
1163
0
                self.close0(error: error, mode: .all, promise: nil)
1164
0
            } catch {
1165
0
                self.close0(error: error, mode: .all, promise: nil)
1166
0
            }
1167
0
        }
1168
0
        assert(!self.lifecycleManager.isPreRegistered)
1169
0
    }
1170
1171
0
    public final func readable() {
1172
0
        assert(
1173
0
            !self.lifecycleManager.hasSeenEOFNotification,
1174
0
            "got a read notification after having already seen .readEOF"
1175
0
        )
1176
0
        self.readable0()
1177
0
    }
1178
1179
    @discardableResult
1180
0
    private final func readable0() -> ReadStreamState {
1181
0
        self.eventLoop.assertInEventLoop()
1182
0
        assert(self.lifecycleManager.isActive)
1183
0
1184
0
        defer {
1185
0
            if self.isOpen && !self.readPending {
1186
0
                unregisterForReadable()
1187
0
            }
1188
0
        }
1189
0
1190
0
        let readResult: ReadResult
1191
0
        do {
1192
0
            readResult = try self.readFromSocket()
1193
0
        } catch let err {
1194
0
            let readStreamState: ReadStreamState
1195
0
            // ChannelError.eof is not something we want to fire through the pipeline as it just means the remote
1196
0
            // peer closed / shutdown the connection.
1197
0
            if let channelErr = err as? ChannelError, channelErr == ChannelError.eof {
1198
0
                readStreamState = .eof
1199
0
1200
0
                if self.lifecycleManager.isActive {
1201
0
                    // Directly call getOption0 as we are already on the EventLoop and so not need to create an extra future.
1202
0
                    //
1203
0
                    // getOption0 can only fail if the channel is not active anymore but we assert further up that it is. If
1204
0
                    // that's not the case this is a precondition failure and we would like to know.
1205
0
                    let allowRemoteHalfClosure = try! self.getOption0(.allowRemoteHalfClosure)
1206
0
1207
0
                    // For EOF, we always fire read complete.
1208
0
                    self.pipeline.syncOperations.fireChannelReadComplete()
1209
0
1210
0
                    if allowRemoteHalfClosure {
1211
0
                        // If we want to allow half closure we will just mark the input side of the Channel
1212
0
                        // as closed.
1213
0
                        if self.shouldCloseOnReadError(err) {
1214
0
                            self.close0(error: err, mode: .input, promise: nil)
1215
0
                        }
1216
0
                        self.readPending = false
1217
0
                        return .eof
1218
0
                    }
1219
0
                }
1220
0
            } else {
1221
0
                readStreamState = .error
1222
0
                self.pipeline.syncOperations.fireErrorCaught(err)
1223
0
            }
1224
0
1225
0
            if self.shouldCloseOnReadError(err) {
1226
0
                self.close0(error: err, mode: .all, promise: nil)
1227
0
                return readStreamState
1228
0
            } else {
1229
0
                // This is non-fatal, so continue as normal.
1230
0
                // This constitutes "some" as we did get at least an error from the socket.
1231
0
                readResult = .some
1232
0
            }
1233
0
        }
1234
0
        // This assert needs to be disabled for io_uring, as the io_uring backend does not have the implicit synchronisation between
1235
0
        // modifications to the poll mask and the actual returned events on the completion queue that kqueue and epoll has.
1236
0
        // For kqueue and epoll, there is an implicit synchronisation point such that after a modification of the poll mask has been
1237
0
        // issued, the next call to reap events will be sure to not include events which does not match the new poll mask.
1238
0
        // Specifically for this assert, it means that we will be guaranteed to never receive a POLLIN notification unless there are
1239
0
        // bytes available to read.
1240
0
1241
0
        // For a fully asynchronous backend like io_uring, there are no such implicit synchronisation point, so after we have
1242
0
        // submitted the asynchronous event to change the poll mask, we may still reap pending asynchronous replies for the old
1243
0
        // poll mask, and thus receive a POLLIN even though we have modified the mask visavi the kernel.
1244
0
        // Which would trigger the assert.
1245
0
1246
0
        // The only way to avoid that race, would be to use heavy handed synchronisation primitives like IOSQE_IO_DRAIN (basically
1247
0
        // flushing all pending requests and wait for a fake event result to sync up) which would be awful for performance,
1248
0
        // so better skip the assert() for io_uring instead.
1249
        #if !SWIFTNIO_USE_IO_URING
1250
0
        assert(readResult == .some)
1251
        #endif
1252
0
        if self.lifecycleManager.isActive {
1253
0
            self.pipeline.syncOperations.fireChannelReadComplete()
1254
0
        }
1255
0
        self.readIfNeeded0()
1256
0
        return .normal(readResult)
1257
0
    }
1258
1259
    /// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`.
1260
    ///
1261
    /// - Parameters:
1262
    ///   - err: The `Error` which was thrown by `readFromSocket`.
1263
    /// - Returns: `true` if the `Channel` should be closed, `false` otherwise.
1264
0
    func shouldCloseOnReadError(_ err: Error) -> Bool {
1265
0
        true
1266
0
    }
1267
1268
    /// Handles an error reported by the selector.
1269
    ///
1270
    /// Default behaviour is to treat this as if it were a reset.
1271
0
    func error() -> ErrorResult {
1272
0
        self.reset()
1273
0
        return .fatal
1274
0
    }
1275
1276
0
    internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) {
1277
0
        self.eventLoop.assertInEventLoop()
1278
0
        assert(updateLocal || updateRemote)
1279
0
        let cached = self.addressesCached
1280
0
        let local = updateLocal ? try? self.localAddress0() : cached.local
1281
0
        let remote = updateRemote ? try? self.remoteAddress0() : cached.remote
1282
0
        self.addressesCached = AddressCache(local: local, remote: remote)
1283
0
    }
1284
1285
0
    internal final func unsetCachedAddressesFromSocket() {
1286
0
        self.eventLoop.assertInEventLoop()
1287
0
        self.addressesCached = AddressCache(local: nil, remote: nil)
1288
0
    }
1289
1290
0
    public final func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1291
0
        self.connect0(to: .socketAddress(address), promise: promise)
1292
0
    }
1293
1294
0
    internal final func connect0(to target: ConnectTarget, promise: EventLoopPromise<Void>?) {
1295
0
        self.eventLoop.assertInEventLoop()
1296
0
1297
0
        guard self.isOpen else {
1298
0
            promise?.fail(ChannelError._ioOnClosedChannel)
1299
0
            return
1300
0
        }
1301
0
1302
0
        guard pendingConnect == nil else {
1303
0
            promise?.fail(ChannelError._connectPending)
1304
0
            return
1305
0
        }
1306
0
1307
0
        guard self.lifecycleManager.isPreRegistered else {
1308
0
            promise?.fail(ChannelError._inappropriateOperationForState)
1309
0
            return
1310
0
        }
1311
0
1312
0
        do {
1313
0
            if try !self.connectSocket(to: target) {
1314
0
                // We aren't connected, we'll get the remote address later.
1315
0
                self.updateCachedAddressesFromSocket(updateLocal: true, updateRemote: false)
1316
0
                if promise != nil {
1317
0
                    self.pendingConnect = promise
1318
0
                } else {
1319
0
                    self.pendingConnect = eventLoop.makePromise()
1320
0
                }
1321
0
                try self.becomeFullyRegistered0()
1322
0
                self.registerForWritable()
1323
0
            } else {
1324
0
                self.updateCachedAddressesFromSocket()
1325
0
                self.becomeActive0(promise: promise)
1326
0
            }
1327
0
        } catch let error {
1328
0
            assert(self.lifecycleManager.isPreRegistered)
1329
0
            // We would like to have this assertion here, but we want to be able to go through this
1330
0
            // code path in cases where connect() is being called on channels that are already active.
1331
0
            //assert(!self.lifecycleManager.isActive)
1332
0
            // We're going to set the promise as the pending connect promise, and let close0 fail it for us.
1333
0
            self.pendingConnect = promise
1334
0
            self.close0(error: error, mode: .all, promise: nil)
1335
0
        }
1336
0
    }
1337
1338
0
    public func channelRead0(_ data: NIOAny) {
1339
0
        // Do nothing by default
1340
0
        // note: we can't assert that we're active here as TailChannelHandler will call this on channelRead
1341
0
    }
1342
1343
0
    public func errorCaught0(error: Error) {
1344
0
        // Do nothing
1345
0
    }
1346
1347
0
    private func isWritePending() -> Bool {
1348
0
        self.interestedEvent.contains(.write)
1349
0
    }
1350
1351
0
    private final func safeReregister(interested: SelectorEventSet) {
1352
0
        self.eventLoop.assertInEventLoop()
1353
0
        assert(self.lifecycleManager.isRegisteredFully)
1354
0
1355
0
        guard self.isOpen else {
1356
0
            assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) even though we're closed")
1357
0
            return
1358
0
        }
1359
0
        if interested == interestedEvent {
1360
0
            // we don't need to update and so cause a syscall if we already are registered with the correct event
1361
0
            return
1362
0
        }
1363
0
        interestedEvent = interested
1364
0
        do {
1365
0
            try selectableEventLoop.reregister(channel: self)
1366
0
        } catch let err {
1367
0
            self.pipeline.syncOperations.fireErrorCaught(err)
1368
0
            self.close0(error: err, mode: .all, promise: nil)
1369
0
        }
1370
0
    }
1371
1372
0
    private func safeRegister(interested: SelectorEventSet) throws {
1373
0
        self.eventLoop.assertInEventLoop()
1374
0
        assert(!self.lifecycleManager.isRegisteredFully)
1375
0
1376
0
        guard self.isOpen else {
1377
0
            throw ChannelError._ioOnClosedChannel
1378
0
        }
1379
0
1380
0
        self.interestedEvent = interested
1381
0
        do {
1382
0
            try self.selectableEventLoop.register(channel: self)
1383
0
        } catch {
1384
0
            self.pipeline.syncOperations.fireErrorCaught(error)
1385
0
            self.close0(error: error, mode: .all, promise: nil)
1386
0
            throw error
1387
0
        }
1388
0
    }
1389
1390
0
    final func becomeFullyRegistered0() throws {
1391
0
        self.eventLoop.assertInEventLoop()
1392
0
        assert(self.lifecycleManager.isPreRegistered)
1393
0
        assert(!self.lifecycleManager.isRegisteredFully)
1394
0
1395
0
        // The initial set of interested events must not contain `.readEOF` because when connect doesn't return
1396
0
        // synchronously, kevent might send us a `readEOF` because the `writable` event that marks the connect as completed.
1397
0
        // See SocketChannelTest.testServerClosesTheConnectionImmediately for a regression test.
1398
0
        try self.safeRegister(interested: [.reset, .error])
1399
0
        self.lifecycleManager.finishRegistration()(nil, self.pipeline)
1400
0
    }
1401
1402
0
    final func becomeActive0(promise: EventLoopPromise<Void>?) {
1403
0
        self.eventLoop.assertInEventLoop()
1404
0
        assert(self.lifecycleManager.isPreRegistered)
1405
0
        if !self.lifecycleManager.isRegisteredFully {
1406
0
            do {
1407
0
                try self.becomeFullyRegistered0()
1408
0
                assert(self.lifecycleManager.isRegisteredFully)
1409
0
            } catch {
1410
0
                self.close0(error: error, mode: .all, promise: promise)
1411
0
                return
1412
0
            }
1413
0
        }
1414
0
        self.lifecycleManager.beginActivation()(promise, self.pipeline)
1415
0
        guard self.lifecycleManager.isActive else {
1416
0
            // the channel got closed in the promise succeed closure
1417
0
            return
1418
0
        }
1419
0
        // if the channel wasn't closed, continue to fully activate
1420
0
        self.lifecycleManager.finishActivation()(nil, self.pipeline)
1421
0
        guard self.lifecycleManager.isOpen else {
1422
0
            // in the user callout for `channelActive` the channel got closed.
1423
0
            return
1424
0
        }
1425
0
        self.registerForReadEOF()
1426
0
1427
0
        // Flush any pending writes. If after the flush we're still open, make sure
1428
0
        // our registration is appropriate.
1429
0
        switch self.flushNow() {
1430
0
        case .register:
1431
0
            if self.lifecycleManager.isOpen && !self.interestedEvent.contains(.write) {
1432
0
                self.registerForWritable()
1433
0
            }
1434
0
        case .unregister:
1435
0
            if self.lifecycleManager.isOpen && self.interestedEvent.contains(.write) {
1436
0
                self.unregisterForWritable()
1437
0
            }
1438
0
        }
1439
0
1440
0
        self.readIfNeeded0()
1441
0
    }
1442
1443
0
    func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
1444
0
        fatalError("must override")
1445
0
    }
1446
1447
0
    func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
1448
0
        fatalError("must override")
1449
0
    }
1450
1451
0
    func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
1452
0
        fatalError("must override")
1453
0
    }
1454
}
1455
1456
extension BaseSocketChannel {
1457
    public struct SynchronousOptions: NIOSynchronousChannelOptions {
1458
        @usableFromInline  // should be private
1459
        internal let _channel: BaseSocketChannel<SocketType>
1460
1461
        @inlinable  // should be fileprivate
1462
0
        internal init(_channel channel: BaseSocketChannel<SocketType>) {
1463
0
            self._channel = channel
1464
0
        }
1465
1466
        @inlinable
1467
0
        public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
1468
0
            try self._channel.setOption0(option, value: value)
1469
0
        }
1470
1471
        @inlinable
1472
0
        public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
1473
0
            try self._channel.getOption0(option)
1474
0
        }
1475
    }
1476
1477
0
    public final var syncOptions: NIOSynchronousChannelOptions? {
1478
0
        SynchronousOptions(_channel: self)
1479
0
    }
1480
}
1481
1482
/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`).
1483
0
func executeAndComplete<Value: Sendable>(_ promise: EventLoopPromise<Value>?, _ body: () throws -> Value) {
1484
0
    do {
1485
0
        let result = try body()
1486
0
        promise?.succeed(result)
1487
0
    } catch let e {
1488
0
        promise?.fail(e)
1489
0
    }
1490
0
}
1491
#endif  // !os(WASI)