Coverage Report

Created: 2025-09-08 06:42

/src/swift-nio/Sources/NIOPosix/BaseSocketChannel.swift
Line
Count
Source (jump to first uncovered line)
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
15
import Atomics
16
import NIOConcurrencyHelpers
17
import NIOCore
18
19
private struct SocketChannelLifecycleManager {
20
    // MARK: Types
21
    private enum State {
22
        case fresh
23
        case preRegistered  // register() has been run but the selector doesn't know about it yet
24
        case fullyRegistered  // fully registered, ie. the selector knows about it
25
        case activated
26
        case closed
27
    }
28
29
    private enum Event {
30
        case activate
31
        case beginRegistration
32
        case finishRegistration
33
        case close
34
    }
35
36
    // MARK: properties
37
    private let eventLoop: EventLoop
38
    // this is queried from the Channel, ie. must be thread-safe
39
    internal let isActiveAtomic: ManagedAtomic<Bool>
40
    // these are only to be accessed on the EventLoop
41
42
    // have we seen the `.readEOF` notification
43
    // note: this can be `false` on a deactivated channel, we might just have torn it down.
44
0
    var hasSeenEOFNotification: Bool = false
45
46
    // Should we support transition from `active` to `active`, used by datagram sockets.
47
    let supportsReconnect: Bool
48
49
0
    private var currentState: State = .fresh {
50
0
        didSet {
51
0
            self.eventLoop.assertInEventLoop()
52
0
            switch (oldValue, self.currentState) {
53
0
            case (_, .activated):
54
0
                self.isActiveAtomic.store(true, ordering: .relaxed)
55
0
            case (.activated, _):
56
0
                self.isActiveAtomic.store(false, ordering: .relaxed)
57
0
            default:
58
0
                ()
59
0
            }
60
0
        }
61
    }
62
63
    // MARK: API
64
    // isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable
65
    internal init(
66
        eventLoop: EventLoop,
67
        isActiveAtomic: ManagedAtomic<Bool>,
68
        supportReconnect: Bool
69
0
    ) {
70
0
        self.eventLoop = eventLoop
71
0
        self.isActiveAtomic = isActiveAtomic
72
0
        self.supportsReconnect = supportReconnect
73
0
    }
74
75
    // this is called from Channel's deinit, so don't assert we're on the EventLoop!
76
0
    internal var canBeDestroyed: Bool {
77
0
        self.currentState == .closed
78
0
    }
79
80
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
81
    @inline(__always)
82
0
    internal mutating func beginRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
83
0
        self.moveState(event: .beginRegistration)
84
0
    }
85
86
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
87
    @inline(__always)
88
0
    internal mutating func finishRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
89
0
        self.moveState(event: .finishRegistration)
90
0
    }
91
92
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
93
    @inline(__always)
94
0
    internal mutating func close() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
95
0
        self.moveState(event: .close)
96
0
    }
97
98
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
99
    @inline(__always)
100
0
    internal mutating func activate() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
101
0
        self.moveState(event: .activate)
102
0
    }
103
104
    // MARK: private API
105
    // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
106
    @inline(__always)
107
0
    private mutating func moveState(event: Event) -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) {
108
0
        self.eventLoop.assertInEventLoop()
109
0
110
0
        switch (self.currentState, event) {
111
0
        // origin: .fresh
112
0
        case (.fresh, .beginRegistration):
113
0
            self.currentState = .preRegistered
114
0
            return { promise, pipeline in
115
0
                promise?.succeed(())
116
0
                pipeline.syncOperations.fireChannelRegistered()
117
0
            }
118
0
119
0
        case (.fresh, .close):
120
0
            self.currentState = .closed
121
0
            return { (promise, _: ChannelPipeline) in
122
0
                promise?.succeed(())
123
0
            }
124
0
125
0
        // origin: .preRegistered
126
0
        case (.preRegistered, .finishRegistration):
127
0
            self.currentState = .fullyRegistered
128
0
            return { (promise, _: ChannelPipeline) in
129
0
                promise?.succeed(())
130
0
            }
131
0
132
0
        // origin: .fullyRegistered
133
0
        case (.fullyRegistered, .activate):
134
0
            self.currentState = .activated
135
0
            return { promise, pipeline in
136
0
                promise?.succeed(())
137
0
                pipeline.syncOperations.fireChannelActive()
138
0
            }
139
0
140
0
        // origin: .preRegistered || .fullyRegistered
141
0
        case (.preRegistered, .close), (.fullyRegistered, .close):
142
0
            self.currentState = .closed
143
0
            return { promise, pipeline in
144
0
                promise?.succeed(())
145
0
                pipeline.syncOperations.fireChannelUnregistered()
146
0
            }
147
0
148
0
        // origin: .activated
149
0
        case (.activated, .close):
150
0
            self.currentState = .closed
151
0
            return { promise, pipeline in
152
0
                promise?.succeed(())
153
0
                pipeline.syncOperations.fireChannelInactive()
154
0
                pipeline.syncOperations.fireChannelUnregistered()
155
0
            }
156
0
157
0
        // origin: .activated
158
0
        case (.activated, .activate) where self.supportsReconnect:
159
0
            return { promise, pipeline in
160
0
                promise?.succeed(())
161
0
            }
162
0
163
0
        // bad transitions
164
0
        case (.fresh, .activate),  // should go through .registered first
165
0
            (.preRegistered, .activate),  // need to first be fully registered
166
0
            (.preRegistered, .beginRegistration),  // already registered
167
0
            (.fullyRegistered, .beginRegistration),  // already registered
168
0
            (.activated, .activate),  // already activated
169
0
            (.activated, .beginRegistration),  // already fully registered (and activated)
170
0
            (.activated, .finishRegistration),  // already fully registered (and activated)
171
0
            (.fullyRegistered, .finishRegistration),  // already fully registered
172
0
            (.fresh, .finishRegistration),  // need to register lazily first
173
0
            (.closed, _):  // already closed
174
0
            self.badTransition(event: event)
175
0
        }
176
0
    }
177
178
0
    private func badTransition(event: Event) -> Never {
179
0
        preconditionFailure("illegal transition: state=\(self.currentState), event=\(event)")
180
0
    }
181
182
    // MARK: convenience properties
183
0
    internal var isActive: Bool {
184
0
        self.eventLoop.assertInEventLoop()
185
0
        return self.currentState == .activated
186
0
    }
187
188
0
    internal var isPreRegistered: Bool {
189
0
        self.eventLoop.assertInEventLoop()
190
0
        switch self.currentState {
191
0
        case .fresh, .closed:
192
0
            return false
193
0
        case .preRegistered, .fullyRegistered, .activated:
194
0
            return true
195
0
        }
196
0
    }
197
198
0
    internal var isRegisteredFully: Bool {
199
0
        self.eventLoop.assertInEventLoop()
200
0
        switch self.currentState {
201
0
        case .fresh, .closed, .preRegistered:
202
0
            return false
203
0
        case .fullyRegistered, .activated:
204
0
            return true
205
0
        }
206
0
    }
207
208
    /// Returns whether the underlying file descriptor is open. This property will always be true (even before registration)
209
    /// until the Channel is closed.
210
0
    internal var isOpen: Bool {
211
0
        self.eventLoop.assertInEventLoop()
212
0
        return self.currentState != .closed
213
0
    }
214
}
215
216
/// The base class for all socket-based channels in NIO.
217
///
218
/// There are many types of specialised socket-based channel in NIO. Each of these
219
/// has different logic regarding how exactly they read from and write to the network.
220
/// However, they share a great deal of common logic around the managing of their
221
/// file descriptors.
222
///
223
/// For this reason, `BaseSocketChannel` exists to provide a common core implementation of
224
/// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks
225
/// for subclasses to implement the specific logic to handle their writes and reads.
226
class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, ChannelCore, @unchecked Sendable {
227
    typealias SelectableType = SocketType.SelectableType
228
229
    struct AddressCache {
230
        // deliberately lets because they must always be updated together (so forcing `init` is useful).
231
        let local: SocketAddress?
232
        let remote: SocketAddress?
233
234
0
        init(local: SocketAddress?, remote: SocketAddress?) {
235
0
            self.local = local
236
0
            self.remote = remote
237
0
        }
238
    }
239
240
    // MARK: - Stored Properties
241
    // MARK: Constants & atomics (accessible everywhere)
242
    public let parent: Channel?
243
    internal let socket: SocketType
244
    private let closePromise: EventLoopPromise<Void>
245
    internal let selectableEventLoop: SelectableEventLoop
246
0
    private let _offEventLoopLock = NIOLock()
247
0
    private let isActiveAtomic: ManagedAtomic<Bool> = .init(false)
248
    // just a thread-safe way of having something to print about the socket from any thread
249
    internal let socketDescription: String
250
251
    // MARK: Variables, on EventLoop thread only
252
0
    var readPending = false
253
    var pendingConnect: Optional<EventLoopPromise<Void>>
254
    var recvBufferPool: NIOPooledRecvBufferAllocator
255
0
    var maxMessagesPerRead: UInt = 4
256
0
    private var inFlushNow: Bool = false  // Guard against re-entrance of flushNow() method.
257
0
    private var autoRead: Bool = true
258
259
    // MARK: Variables that are really constant
260
    // this is really a constant (set in .init) but needs `self` to be constructed and
261
    // therefore a `var`. Do not change as this needs to accessed from arbitrary threads
262
0
    private var _pipeline: ChannelPipeline! = nil
263
264
    // MARK: Special variables, please read comments.
265
    // For reads guarded by _either_ `self._offEventLoopLock` or the EL thread
266
    // Writes are guarded by _offEventLoopLock _and_ the EL thread.
267
    // PLEASE don't use these directly and use the non-underscored computed properties instead.
268
0
    private var _addressCache = AddressCache(local: nil, remote: nil)  // please use `self.addressesCached` instead
269
    private var _bufferAllocatorCache: ByteBufferAllocator  // please use `self.bufferAllocatorCached` instead.
270
271
    // MARK: - Computed properties
272
    // This is called from arbitrary threads.
273
    internal var addressesCached: AddressCache {
274
0
        get {
275
0
            if self.eventLoop.inEventLoop {
276
0
                return self._addressCache
277
0
            } else {
278
0
                return self._offEventLoopLock.withLock {
279
0
                    self._addressCache
280
0
                }
281
0
            }
282
0
        }
283
0
        set {
284
0
            self.eventLoop.preconditionInEventLoop()
285
0
            self._offEventLoopLock.withLock {
286
0
                self._addressCache = newValue
287
0
            }
288
0
        }
289
    }
290
291
    // This is called from arbitrary threads.
292
    private var bufferAllocatorCached: ByteBufferAllocator {
293
0
        get {
294
0
            if self.eventLoop.inEventLoop {
295
0
                return self._bufferAllocatorCache
296
0
            } else {
297
0
                return self._offEventLoopLock.withLock {
298
0
                    self._bufferAllocatorCache
299
0
                }
300
0
            }
301
0
        }
302
0
        set {
303
0
            self.eventLoop.preconditionInEventLoop()
304
0
            self._offEventLoopLock.withLock {
305
0
                self._bufferAllocatorCache = newValue
306
0
            }
307
0
        }
308
    }
309
310
    // We start with the invalid empty set of selector events we're interested in. This is to make sure we later on
311
    // (in `becomeFullyRegistered0`) seed the initial event correctly.
312
0
    internal var interestedEvent: SelectorEventSet = [] {
313
0
        didSet {
314
0
            assert(self.interestedEvent.contains(.reset), "impossible to unregister for reset")
315
0
        }
316
    }
317
318
    private var lifecycleManager: SocketChannelLifecycleManager {
319
0
        didSet {
320
0
            self.eventLoop.assertInEventLoop()
321
0
        }
322
    }
323
324
0
    private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() {
325
0
        didSet {
326
0
            self.eventLoop.assertInEventLoop()
327
0
            self.bufferAllocatorCached = self.bufferAllocator
328
0
        }
329
    }
330
331
0
    public final var _channelCore: ChannelCore { self }
332
333
    // This is `Channel` API so must be thread-safe.
334
0
    public final var localAddress: SocketAddress? {
335
0
        self.addressesCached.local
336
0
    }
337
338
    // This is `Channel` API so must be thread-safe.
339
0
    public final var remoteAddress: SocketAddress? {
340
0
        self.addressesCached.remote
341
0
    }
342
343
    /// `false` if the whole `Channel` is closed and so no more IO operation can be done.
344
0
    var isOpen: Bool {
345
0
        self.eventLoop.assertInEventLoop()
346
0
        return self.lifecycleManager.isOpen
347
0
    }
348
349
0
    var isRegistered: Bool {
350
0
        self.eventLoop.assertInEventLoop()
351
0
        return self.lifecycleManager.isPreRegistered
352
0
    }
353
354
    // This is `Channel` API so must be thread-safe.
355
0
    public var isActive: Bool {
356
0
        self.isActiveAtomic.load(ordering: .relaxed)
357
0
    }
358
359
    // This is `Channel` API so must be thread-safe.
360
0
    public final var closeFuture: EventLoopFuture<Void> {
361
0
        self.closePromise.futureResult
362
0
    }
363
364
0
    public final var eventLoop: EventLoop {
365
0
        selectableEventLoop
366
0
    }
367
368
    // This is `Channel` API so must be thread-safe.
369
0
    public var isWritable: Bool {
370
0
        true
371
0
    }
372
373
    // This is `Channel` API so must be thread-safe.
374
0
    public final var allocator: ByteBufferAllocator {
375
0
        self.bufferAllocatorCached
376
0
    }
377
378
    // This is `Channel` API so must be thread-safe.
379
0
    public final var pipeline: ChannelPipeline {
380
0
        self._pipeline
381
0
    }
382
383
    // MARK: Methods to override in subclasses.
384
0
    func writeToSocket() throws -> OverallWriteResult {
385
0
        fatalError("must be overridden")
386
0
    }
387
388
    /// Read data from the underlying socket and dispatch it to the `ChannelPipeline`
389
    ///
390
    /// - Returns: `true` if any data was read, `false` otherwise.
391
0
    @discardableResult func readFromSocket() throws -> ReadResult {
392
0
        fatalError("this must be overridden by sub class")
393
0
    }
394
395
    // MARK: - Datatypes
396
397
    /// Indicates if a selectable should registered or not for IO notifications.
398
    enum IONotificationState {
399
        /// We should be registered for IO notifications.
400
        case register
401
402
        /// We should not be registered for IO notifications.
403
        case unregister
404
    }
405
406
    enum ReadResult {
407
        /// Nothing was read by the read operation.
408
        case none
409
410
        /// Some data was read by the read operation.
411
        case some
412
    }
413
414
    /// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
415
    /// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
416
    private enum ReadStreamState: Equatable {
417
        /// Everything seems normal
418
        case normal(ReadResult)
419
420
        /// We saw EOF.
421
        case eof
422
423
        /// A read error was received.
424
        case error
425
    }
426
427
    /// Begin connection of the underlying socket.
428
    ///
429
    /// - Parameters:
430
    ///   - to: The `SocketAddress` to connect to.
431
    /// - Returns: `true` if the socket connected synchronously, `false` otherwise.
432
0
    func connectSocket(to address: SocketAddress) throws -> Bool {
433
0
        fatalError("this must be overridden by sub class")
434
0
    }
435
436
    /// Begin connection of the underlying socket.
437
    ///
438
    /// - Parameters:
439
    ///   - to: The `VsockAddress` to connect to.
440
    /// - Returns: `true` if the socket connected synchronously, `false` otherwise.
441
0
    func connectSocket(to address: VsockAddress) throws -> Bool {
442
0
        fatalError("this must be overridden by sub class")
443
0
    }
444
445
    enum ConnectTarget {
446
        case socketAddress(SocketAddress)
447
        case vsockAddress(VsockAddress)
448
    }
449
450
    /// Begin connection of the underlying socket.
451
    ///
452
    /// - Parameters:
453
    ///   - to: The target to connect to.
454
    /// - Returns: `true` if the socket connected synchronously, `false` otherwise.
455
0
    final func connectSocket(to target: ConnectTarget) throws -> Bool {
456
0
        switch target {
457
0
        case .socketAddress(let address):
458
0
            return try self.connectSocket(to: address)
459
0
        case .vsockAddress(let address):
460
0
            return try self.connectSocket(to: address)
461
0
        }
462
0
    }
463
464
    /// Make any state changes needed to complete the connection process.
465
0
    func finishConnectSocket() throws {
466
0
        fatalError("this must be overridden by sub class")
467
0
    }
468
469
    /// Returns if there are any flushed, pending writes to be sent over the network.
470
0
    func hasFlushedPendingWrites() -> Bool {
471
0
        fatalError("this must be overridden by sub class")
472
0
    }
473
474
    /// Buffer a write in preparation for a flush.
475
0
    func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
476
0
        fatalError("this must be overridden by sub class")
477
0
    }
478
479
    /// Mark a flush point. This is called when flush is received, and instructs
480
    /// the implementation to record the flush.
481
0
    func markFlushPoint() {
482
0
        fatalError("this must be overridden by sub class")
483
0
    }
484
485
    /// Called when closing, to instruct the specific implementation to discard all pending
486
    /// writes.
487
0
    func cancelWritesOnClose(error: Error) {
488
0
        fatalError("this must be overridden by sub class")
489
0
    }
490
491
    // MARK: Common base socket logic.
492
    init(
493
        socket: SocketType,
494
        parent: Channel?,
495
        eventLoop: SelectableEventLoop,
496
        recvAllocator: RecvByteBufferAllocator,
497
        supportReconnect: Bool
498
0
    ) throws {
499
0
        self._bufferAllocatorCache = self.bufferAllocator
500
0
        self.socket = socket
501
0
        self.selectableEventLoop = eventLoop
502
0
        self.closePromise = eventLoop.makePromise()
503
0
        self.parent = parent
504
0
        self.recvBufferPool = .init(capacity: Int(self.maxMessagesPerRead), recvAllocator: recvAllocator)
505
0
        // As the socket may already be connected we should ensure we start with the correct addresses cached.
506
0
        self._addressCache = .init(local: try? socket.localAddress(), remote: try? socket.remoteAddress())
507
0
        self.lifecycleManager = SocketChannelLifecycleManager(
508
0
            eventLoop: eventLoop,
509
0
            isActiveAtomic: self.isActiveAtomic,
510
0
            supportReconnect: supportReconnect
511
0
        )
512
0
        self.socketDescription = socket.description
513
0
        self.pendingConnect = nil
514
0
        self._pipeline = ChannelPipeline(channel: self)
515
0
    }
516
517
0
    deinit {
518
0
        assert(
519
0
            self.lifecycleManager.canBeDestroyed,
520
0
            "leak of open Channel, state: \(String(describing: self.lifecycleManager))"
521
0
        )
522
0
    }
523
524
0
    public final func localAddress0() throws -> SocketAddress {
525
0
        self.eventLoop.assertInEventLoop()
526
0
        guard self.isOpen else {
527
0
            throw ChannelError._ioOnClosedChannel
528
0
        }
529
0
        return try self.socket.localAddress()
530
0
    }
531
532
0
    public final func remoteAddress0() throws -> SocketAddress {
533
0
        self.eventLoop.assertInEventLoop()
534
0
        guard self.isOpen else {
535
0
            throw ChannelError._ioOnClosedChannel
536
0
        }
537
0
        return try self.socket.remoteAddress()
538
0
    }
539
540
    /// Flush data to the underlying socket and return if this socket needs to be registered for write notifications.
541
    ///
542
    /// This method can be called re-entrantly but it will return immediately because the first call is responsible
543
    /// for sending all flushed writes, even the ones that are accumulated whilst `flushNow()` is running.
544
    ///
545
    /// - Returns: If this socket should be registered for write notifications. Ie. `IONotificationState.register` if
546
    ///            _not_ all data could be written, so notifications are necessary; and `IONotificationState.unregister`
547
    ///             if everything was written and we don't need to be notified about writability at the moment.
548
0
    func flushNow() -> IONotificationState {
549
0
        self.eventLoop.assertInEventLoop()
550
0
551
0
        // Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by
552
0
        // `writeToSocket`.
553
0
        guard !self.inFlushNow else {
554
0
            return .unregister
555
0
        }
556
0
557
0
        assert(!self.inFlushNow)
558
0
        self.inFlushNow = true
559
0
        defer {
560
0
            self.inFlushNow = false
561
0
        }
562
0
563
0
        var newWriteRegistrationState: IONotificationState = .unregister
564
0
        while newWriteRegistrationState == .unregister && self.hasFlushedPendingWrites() && self.isOpen {
565
0
            let writeResult: OverallWriteResult
566
0
            do {
567
0
                assert(self.lifecycleManager.isActive)
568
0
                writeResult = try self.writeToSocket()
569
0
                if writeResult.writabilityChange {
570
0
                    // We went from not writable to writable.
571
0
                    self.pipeline.syncOperations.fireChannelWritabilityChanged()
572
0
                }
573
0
            } catch let err {
574
0
                // If there is a write error we should try drain the inbound before closing the socket as there may be some data pending.
575
0
                // We ignore any error that is thrown as we will use the original err to close the channel and notify the user.
576
0
                if self.readIfNeeded0() {
577
0
                    assert(self.lifecycleManager.isActive)
578
0
579
0
                    // We need to continue reading until there is nothing more to be read from the socket as we will not have another chance to drain it.
580
0
                    var readAtLeastOnce = false
581
0
                    while let read = try? self.readFromSocket(), read == .some {
582
0
                        readAtLeastOnce = true
583
0
                    }
584
0
                    if readAtLeastOnce && self.lifecycleManager.isActive {
585
0
                        self.pipeline.fireChannelReadComplete()
586
0
                    }
587
0
                }
588
0
589
0
                self.close0(error: err, mode: .all, promise: nil)
590
0
591
0
                // we handled all writes
592
0
                return .unregister
593
0
            }
594
0
595
0
            switch writeResult.writeResult {
596
0
            case .couldNotWriteEverything:
597
0
                newWriteRegistrationState = .register
598
0
            case .writtenCompletely(let closeState):
599
0
                newWriteRegistrationState = .unregister
600
0
                switch closeState {
601
0
                case .open:
602
0
                    ()
603
0
                case .readyForClose:
604
0
                    self.close0(error: ChannelError.outputClosed, mode: .output, promise: nil)
605
0
                case .closed:
606
0
                    ()  // we can be flushed before becoming active
607
0
                }
608
0
            }
609
0
610
0
            if !self.isOpen || !self.hasFlushedPendingWrites() {
611
0
                // No further writes, unregister. We won't re-enter the loop as both of these would have to be true.
612
0
                newWriteRegistrationState = .unregister
613
0
            }
614
0
        }
615
0
616
0
        assert(
617
0
            (newWriteRegistrationState == .register && self.hasFlushedPendingWrites())
618
0
                || (newWriteRegistrationState == .unregister && !self.hasFlushedPendingWrites()),
619
0
            "illegal flushNow decision: \(newWriteRegistrationState) and \(self.hasFlushedPendingWrites())"
620
0
        )
621
0
        return newWriteRegistrationState
622
0
    }
623
624
0
    public final func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
625
0
        if eventLoop.inEventLoop {
626
0
            let promise = eventLoop.makePromise(of: Void.self)
627
0
            executeAndComplete(promise) { try self.setOption0(option, value: value) }
628
0
            return promise.futureResult
629
0
        } else {
630
0
            return eventLoop.submit { try self.setOption0(option, value: value) }
631
0
        }
632
0
    }
633
634
0
    func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
635
0
        self.eventLoop.assertInEventLoop()
636
0
637
0
        guard isOpen else {
638
0
            throw ChannelError._ioOnClosedChannel
639
0
        }
640
0
641
0
        switch option {
642
0
        case let option as ChannelOptions.Types.SocketOption:
643
0
            try self.setSocketOption0(level: option.optionLevel, name: option.optionName, value: value)
644
0
        case _ as ChannelOptions.Types.AllocatorOption:
645
0
            bufferAllocator = value as! ByteBufferAllocator
646
0
        case _ as ChannelOptions.Types.RecvAllocatorOption:
647
0
            self.recvBufferPool.recvAllocator = value as! RecvByteBufferAllocator
648
0
        case _ as ChannelOptions.Types.AutoReadOption:
649
0
            let auto = value as! Bool
650
0
            let old = self.autoRead
651
0
            self.autoRead = auto
652
0
653
0
            // We only want to call read0() or pauseRead0() if we already registered to the EventLoop if not this will be automatically done
654
0
            // once register0 is called. Beside this we also only need to do it when the value actually change.
655
0
            if self.lifecycleManager.isPreRegistered && old != auto {
656
0
                if auto {
657
0
                    read0()
658
0
                } else {
659
0
                    pauseRead0()
660
0
                }
661
0
            }
662
0
        case _ as ChannelOptions.Types.MaxMessagesPerReadOption:
663
0
            self.maxMessagesPerRead = value as! UInt
664
0
            self.recvBufferPool.updateCapacity(to: Int(self.maxMessagesPerRead))
665
0
        default:
666
0
            fatalError("option \(option) not supported")
667
0
        }
668
0
    }
669
670
0
    public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
671
0
        if eventLoop.inEventLoop {
672
0
            do {
673
0
                return self.eventLoop.makeSucceededFuture(try self.getOption0(option))
674
0
            } catch {
675
0
                return self.eventLoop.makeFailedFuture(error)
676
0
            }
677
0
        } else {
678
0
            return self.eventLoop.submit { try self.getOption0(option) }
679
0
        }
680
0
    }
681
682
0
    func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
683
0
        self.eventLoop.assertInEventLoop()
684
0
685
0
        guard isOpen else {
686
0
            throw ChannelError._ioOnClosedChannel
687
0
        }
688
0
689
0
        switch option {
690
0
        case let option as ChannelOptions.Types.SocketOption:
691
0
            return try self.getSocketOption0(level: option.optionLevel, name: option.optionName)
692
0
        case _ as ChannelOptions.Types.AllocatorOption:
693
0
            return bufferAllocator as! Option.Value
694
0
        case _ as ChannelOptions.Types.RecvAllocatorOption:
695
0
            return self.recvBufferPool.recvAllocator as! Option.Value
696
0
        case _ as ChannelOptions.Types.AutoReadOption:
697
0
            return autoRead as! Option.Value
698
0
        case _ as ChannelOptions.Types.MaxMessagesPerReadOption:
699
0
            return maxMessagesPerRead as! Option.Value
700
0
        default:
701
0
            fatalError("option \(option) not supported")
702
0
        }
703
0
    }
704
705
    /// Triggers a `ChannelPipeline.read()` if `autoRead` is enabled.`
706
    ///
707
    /// - Returns: `true` if `readPending` is `true`, `false` otherwise.
708
0
    @discardableResult func readIfNeeded0() -> Bool {
709
0
        self.eventLoop.assertInEventLoop()
710
0
        if !self.lifecycleManager.isActive {
711
0
            return false
712
0
        }
713
0
714
0
        if !readPending && autoRead {
715
0
            self.pipeline.syncOperations.read()
716
0
        }
717
0
        return readPending
718
0
    }
719
720
    // Methods invoked from the HeadHandler of the ChannelPipeline
721
0
    public func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
722
0
        self.eventLoop.assertInEventLoop()
723
0
724
0
        guard self.isOpen else {
725
0
            promise?.fail(ChannelError._ioOnClosedChannel)
726
0
            return
727
0
        }
728
0
729
0
        executeAndComplete(promise) {
730
0
            try socket.bind(to: address)
731
0
            self.updateCachedAddressesFromSocket(updateRemote: false)
732
0
        }
733
0
    }
734
735
0
    public final func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
736
0
        self.eventLoop.assertInEventLoop()
737
0
738
0
        guard self.isOpen else {
739
0
            // Channel was already closed, fail the promise and not even queue it.
740
0
            promise?.fail(ChannelError._ioOnClosedChannel)
741
0
            return
742
0
        }
743
0
744
0
        bufferPendingWrite(data: data, promise: promise)
745
0
    }
746
747
0
    private func registerForWritable() {
748
0
        self.eventLoop.assertInEventLoop()
749
0
750
0
        guard !self.interestedEvent.contains(.write) else {
751
0
            // nothing to do if we were previously interested in write
752
0
            return
753
0
        }
754
0
        self.safeReregister(interested: self.interestedEvent.union(.write))
755
0
    }
756
757
0
    func unregisterForWritable() {
758
0
        self.eventLoop.assertInEventLoop()
759
0
760
0
        guard self.interestedEvent.contains(.write) else {
761
0
            // nothing to do if we were not previously interested in write
762
0
            return
763
0
        }
764
0
        self.safeReregister(interested: self.interestedEvent.subtracting(.write))
765
0
    }
766
767
0
    public final func flush0() {
768
0
        self.eventLoop.assertInEventLoop()
769
0
770
0
        guard self.isOpen else {
771
0
            return
772
0
        }
773
0
774
0
        self.markFlushPoint()
775
0
776
0
        guard self.lifecycleManager.isActive else {
777
0
            return
778
0
        }
779
0
780
0
        if !isWritePending() && flushNow() == .register {
781
0
            assert(self.lifecycleManager.isPreRegistered)
782
0
            registerForWritable()
783
0
        }
784
0
    }
785
786
0
    public func read0() {
787
0
        self.eventLoop.assertInEventLoop()
788
0
789
0
        guard self.isOpen else {
790
0
            return
791
0
        }
792
0
        readPending = true
793
0
794
0
        if self.lifecycleManager.isPreRegistered {
795
0
            registerForReadable()
796
0
        }
797
0
    }
798
799
0
    private final func pauseRead0() {
800
0
        self.eventLoop.assertInEventLoop()
801
0
802
0
        if self.lifecycleManager.isPreRegistered {
803
0
            unregisterForReadable()
804
0
        }
805
0
    }
806
807
0
    private final func registerForReadable() {
808
0
        self.eventLoop.assertInEventLoop()
809
0
        assert(self.lifecycleManager.isRegisteredFully)
810
0
811
0
        guard !self.lifecycleManager.hasSeenEOFNotification else {
812
0
            // we have seen an EOF notification before so there's no point in registering for reads
813
0
            return
814
0
        }
815
0
816
0
        guard !self.interestedEvent.contains(.read) else {
817
0
            return
818
0
        }
819
0
820
0
        self.safeReregister(interested: self.interestedEvent.union(.read))
821
0
    }
822
823
0
    private final func registerForReadEOF() {
824
0
        self.eventLoop.assertInEventLoop()
825
0
        assert(self.lifecycleManager.isRegisteredFully)
826
0
827
0
        guard !self.lifecycleManager.hasSeenEOFNotification else {
828
0
            // we have seen an EOF notification before so there's no point in registering for reads
829
0
            return
830
0
        }
831
0
832
0
        guard !self.interestedEvent.contains(.readEOF) else {
833
0
            return
834
0
        }
835
0
836
0
        self.safeReregister(interested: self.interestedEvent.union(.readEOF))
837
0
    }
838
839
0
    internal final func unregisterForReadable() {
840
0
        self.eventLoop.assertInEventLoop()
841
0
        assert(self.lifecycleManager.isRegisteredFully)
842
0
843
0
        guard self.interestedEvent.contains(.read) else {
844
0
            return
845
0
        }
846
0
847
0
        self.safeReregister(interested: self.interestedEvent.subtracting(.read))
848
0
    }
849
850
    /// Closes the this `BaseChannelChannel` and fulfills `promise` with the result of the _close_ operation.
851
    /// So unless either the deregistration or the close itself fails, `promise` will be succeeded regardless of
852
    /// `error`. `error` is used to fail outstanding writes (if any) and the `connectPromise` if set.
853
    ///
854
    /// - Parameters:
855
    ///    - error: The error to fail the outstanding (if any) writes/connect with.
856
    ///    - mode: The close mode, must be `.all` for `BaseSocketChannel`
857
    ///    - promise: The promise that gets notified about the result of the deregistration/close operations.
858
0
    public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
859
0
        self.eventLoop.assertInEventLoop()
860
0
861
0
        guard self.isOpen else {
862
0
            promise?.fail(ChannelError._alreadyClosed)
863
0
            return
864
0
        }
865
0
866
0
        guard mode == .all else {
867
0
            promise?.fail(ChannelError._operationUnsupported)
868
0
            return
869
0
        }
870
0
871
0
        // === BEGIN: No user callouts ===
872
0
873
0
        // this is to register all error callouts as all the callouts must happen after we transition out state
874
0
        var errorCallouts: [(ChannelPipeline) -> Void] = []
875
0
876
0
        self.interestedEvent = .reset
877
0
        do {
878
0
            try selectableEventLoop.deregister(channel: self)
879
0
        } catch let err {
880
0
            errorCallouts.append { pipeline in
881
0
                pipeline.syncOperations.fireErrorCaught(err)
882
0
            }
883
0
        }
884
0
885
0
        let p: EventLoopPromise<Void>?
886
0
        do {
887
0
            try socket.close()
888
0
            p = promise
889
0
        } catch {
890
0
            errorCallouts.append { (_: ChannelPipeline) in
891
0
                promise?.fail(error)
892
0
                // Set p to nil as we want to ensure we pass nil to becomeInactive0(...) so we not try to notify the promise again.
893
0
            }
894
0
            p = nil
895
0
        }
896
0
897
0
        // Transition our internal state.
898
0
        let callouts = self.lifecycleManager.close()
899
0
900
0
        // === END: No user callouts (now that our state is reconciled, we can call out to user code.) ===
901
0
902
0
        // this must be the first to call out as it transitions the PendingWritesManager into the closed state
903
0
        // and we assert elsewhere that the PendingWritesManager has the same idea of 'open' as we have in here.
904
0
        self.cancelWritesOnClose(error: error)
905
0
906
0
        // this should be a no-op as we shouldn't have any
907
0
        for callout in errorCallouts {
908
0
            callout(self.pipeline)
909
0
        }
910
0
911
0
        if let connectPromise = self.pendingConnect {
912
0
            self.pendingConnect = nil
913
0
            connectPromise.fail(error)
914
0
        }
915
0
916
0
        callouts(p, self.pipeline)
917
0
918
0
        eventLoop.execute {
919
0
            // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline
920
0
            self.removeHandlers(pipeline: self.pipeline)
921
0
922
0
            self.closePromise.succeed(())
923
0
924
0
            // Now reset the addresses as we notified all handlers / futures.
925
0
            self.unsetCachedAddressesFromSocket()
926
0
        }
927
0
    }
928
929
0
    public final func register0(promise: EventLoopPromise<Void>?) {
930
0
        self.eventLoop.assertInEventLoop()
931
0
932
0
        guard self.isOpen else {
933
0
            promise?.fail(ChannelError._ioOnClosedChannel)
934
0
            return
935
0
        }
936
0
937
0
        guard !self.lifecycleManager.isPreRegistered else {
938
0
            promise?.fail(ChannelError._inappropriateOperationForState)
939
0
            return
940
0
        }
941
0
942
0
        guard self.selectableEventLoop.isOpen else {
943
0
            let error = EventLoopError._shutdown
944
0
            self.pipeline.syncOperations.fireErrorCaught(error)
945
0
            // `close0`'s error is about the result of the `close` operation, ...
946
0
            self.close0(error: error, mode: .all, promise: nil)
947
0
            // ... therefore we need to fail the registration `promise` separately.
948
0
            promise?.fail(error)
949
0
            return
950
0
        }
951
0
952
0
        // we can't fully register yet as epoll would give us EPOLLHUP if bind/connect wasn't called yet.
953
0
        self.lifecycleManager.beginRegistration()(promise, self.pipeline)
954
0
    }
955
956
0
    public final func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
957
0
        self.eventLoop.assertInEventLoop()
958
0
        assert(self.isOpen)
959
0
        assert(!self.lifecycleManager.isActive)
960
0
        let registerPromise = self.eventLoop.makePromise(of: Void.self)
961
0
        self.register0(promise: registerPromise)
962
0
        registerPromise.futureResult.whenFailure { (_: Error) in
963
0
            self.close(promise: nil)
964
0
        }
965
0
        registerPromise.futureResult.cascadeFailure(to: promise)
966
0
967
0
        if self.lifecycleManager.isPreRegistered {
968
0
            // we expect kqueue/epoll registration to always succeed which is basically true, except for errors that
969
0
            // should be fatal (EBADF, EFAULT, ESRCH, ENOMEM) and a two 'table full' (EMFILE, ENFILE) error kinds which
970
0
            // we don't handle yet but might do in the future (#469).
971
0
            try! becomeFullyRegistered0()
972
0
            if self.lifecycleManager.isRegisteredFully {
973
0
                self.becomeActive0(promise: promise)
974
0
            }
975
0
        }
976
0
    }
977
978
0
    public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
979
0
        switch event {
980
0
        case let event as VsockChannelEvents.ConnectToAddress:
981
0
            self.connect0(to: .vsockAddress(event.address), promise: promise)
982
0
        default:
983
0
            promise?.fail(ChannelError._operationUnsupported)
984
0
        }
985
0
    }
986
987
    // Methods invoked from the EventLoop itself
988
0
    public final func writable() {
989
0
        self.eventLoop.assertInEventLoop()
990
0
        assert(self.isOpen)
991
0
992
0
        self.finishConnect()  // If we were connecting, that has finished.
993
0
994
0
        switch self.flushNow() {
995
0
        case .unregister:
996
0
            // Everything was written or connect was complete, let's unregister from writable.
997
0
            self.finishWritable()
998
0
        case .register:
999
0
            assert(!self.isOpen || self.interestedEvent.contains(.write))
1000
0
            ()  // nothing to do because given that we just received `writable`, we're still registered for writable.
1001
0
        }
1002
0
    }
1003
1004
0
    private func finishConnect() {
1005
0
        self.eventLoop.assertInEventLoop()
1006
0
        assert(self.lifecycleManager.isPreRegistered)
1007
0
1008
0
        if let connectPromise = self.pendingConnect {
1009
0
            assert(!self.lifecycleManager.isActive)
1010
0
1011
0
            do {
1012
0
                try self.finishConnectSocket()
1013
0
            } catch {
1014
0
                assert(!self.lifecycleManager.isActive)
1015
0
                // close0 fails the connectPromise itself so no need to do it here
1016
0
                self.close0(error: error, mode: .all, promise: nil)
1017
0
                return
1018
0
            }
1019
0
            // now this has succeeded, becomeActive0 will actually fulfill this.
1020
0
            self.pendingConnect = nil
1021
0
            // We already know what the local address is.
1022
0
            self.updateCachedAddressesFromSocket(updateLocal: false, updateRemote: true)
1023
0
            self.becomeActive0(promise: connectPromise)
1024
0
        } else {
1025
0
            assert(self.lifecycleManager.isActive)
1026
0
        }
1027
0
    }
1028
1029
0
    private func finishWritable() {
1030
0
        self.eventLoop.assertInEventLoop()
1031
0
1032
0
        if self.isOpen {
1033
0
            assert(self.lifecycleManager.isPreRegistered)
1034
0
            assert(!self.hasFlushedPendingWrites())
1035
0
            self.unregisterForWritable()
1036
0
        }
1037
0
    }
1038
1039
0
    func writeEOF() {
1040
0
        fatalError("\(self) received writeEOF which is unexpected")
1041
0
    }
1042
1043
0
    func readEOF() {
1044
0
        assert(!self.lifecycleManager.hasSeenEOFNotification)
1045
0
        self.lifecycleManager.hasSeenEOFNotification = true
1046
0
1047
0
        // we can't be not active but still registered here; this would mean that we got a notification about a
1048
0
        // channel before we're ready to receive them.
1049
0
        assert(
1050
0
            self.lifecycleManager.isRegisteredFully,
1051
0
            "illegal state: \(self): active: \(self.lifecycleManager.isActive), registered: \(self.lifecycleManager.isRegisteredFully)"
1052
0
        )
1053
0
1054
0
        self.readEOF0()
1055
0
1056
0
        assert(!self.interestedEvent.contains(.read))
1057
0
        assert(!self.interestedEvent.contains(.readEOF))
1058
0
    }
1059
1060
0
    final func readEOF0() {
1061
0
        if self.lifecycleManager.isRegisteredFully {
1062
0
            // we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
1063
0
            // reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
1064
0
            // and would anyway fire constantly.
1065
0
            self.safeReregister(interested: self.interestedEvent.subtracting(.readEOF))
1066
0
1067
0
            loop: while self.lifecycleManager.isActive {
1068
0
                switch self.readable0() {
1069
0
                case .eof:
1070
0
                    // on EOF we stop the loop and we're done with our processing for `readEOF`.
1071
0
                    // we could both be registered & active (if our channel supports half-closure) or unregistered & inactive (if it doesn't).
1072
0
                    break loop
1073
0
                case .error:
1074
0
                    // we should be unregistered and inactive now (as `readable0` would've called close).
1075
0
                    assert(!self.lifecycleManager.isActive)
1076
0
                    assert(!self.lifecycleManager.isPreRegistered)
1077
0
                    break loop
1078
0
                case .normal(.none):
1079
0
                    preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.")
1080
0
                case .normal(.some):
1081
0
                    // normal, note that there is no guarantee we're still active (as the user might have closed in callout)
1082
0
                    continue loop
1083
0
                }
1084
0
            }
1085
0
        }
1086
0
    }
1087
1088
    // this _needs_ to synchronously cause the fd to be unregistered because we cannot unregister from `reset`. In
1089
    // other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering
1090
    // the `reset` event.
1091
0
    final func reset() {
1092
0
        self.readEOF0()
1093
0
1094
0
        if self.socket.isOpen {
1095
0
            assert(self.lifecycleManager.isPreRegistered)
1096
0
            let error: IOError
1097
0
            // if the socket is still registered (and therefore open), let's try to get the actual socket error from the socket
1098
0
            do {
1099
0
                let result: Int32 = try self.socket.getOption(level: .socket, name: .so_error)
1100
0
                if result != 0 {
1101
0
                    // we have a socket error, let's forward
1102
0
                    // this path will be executed on Linux (EPOLLERR) & Darwin (ev.fflags != 0) for
1103
0
                    // stream sockets, and most (but not all) errors on datagram sockets
1104
0
                    error = IOError(errnoCode: result, reason: "connection reset (error set)")
1105
0
                } else {
1106
0
                    // we don't have a socket error, this must be connection reset without an error then
1107
0
                    // this path should only be executed on Linux (EPOLLHUP, no EPOLLERR)
1108
0
                    #if os(Linux)
1109
0
                    let message: String = "connection reset (no error set)"
1110
0
                    #else
1111
0
                    let message: String =
1112
0
                        "BUG IN SwiftNIO (possibly #572), please report! Connection reset (no error set)."
1113
0
                    #endif
1114
0
                    error = IOError(errnoCode: ECONNRESET, reason: message)
1115
0
                }
1116
0
                self.close0(error: error, mode: .all, promise: nil)
1117
0
            } catch {
1118
0
                self.close0(error: error, mode: .all, promise: nil)
1119
0
            }
1120
0
        }
1121
0
        assert(!self.lifecycleManager.isPreRegistered)
1122
0
    }
1123
1124
0
    public final func readable() {
1125
0
        assert(
1126
0
            !self.lifecycleManager.hasSeenEOFNotification,
1127
0
            "got a read notification after having already seen .readEOF"
1128
0
        )
1129
0
        self.readable0()
1130
0
    }
1131
1132
    @discardableResult
1133
0
    private final func readable0() -> ReadStreamState {
1134
0
        self.eventLoop.assertInEventLoop()
1135
0
        assert(self.lifecycleManager.isActive)
1136
0
1137
0
        defer {
1138
0
            if self.isOpen && !self.readPending {
1139
0
                unregisterForReadable()
1140
0
            }
1141
0
        }
1142
0
1143
0
        let readResult: ReadResult
1144
0
        do {
1145
0
            readResult = try self.readFromSocket()
1146
0
        } catch let err {
1147
0
            let readStreamState: ReadStreamState
1148
0
            // ChannelError.eof is not something we want to fire through the pipeline as it just means the remote
1149
0
            // peer closed / shutdown the connection.
1150
0
            if let channelErr = err as? ChannelError, channelErr == ChannelError.eof {
1151
0
                readStreamState = .eof
1152
0
1153
0
                if self.lifecycleManager.isActive {
1154
0
                    // Directly call getOption0 as we are already on the EventLoop and so not need to create an extra future.
1155
0
                    //
1156
0
                    // getOption0 can only fail if the channel is not active anymore but we assert further up that it is. If
1157
0
                    // that's not the case this is a precondition failure and we would like to know.
1158
0
                    let allowRemoteHalfClosure = try! self.getOption0(.allowRemoteHalfClosure)
1159
0
1160
0
                    // For EOF, we always fire read complete.
1161
0
                    self.pipeline.syncOperations.fireChannelReadComplete()
1162
0
1163
0
                    if allowRemoteHalfClosure {
1164
0
                        // If we want to allow half closure we will just mark the input side of the Channel
1165
0
                        // as closed.
1166
0
                        if self.shouldCloseOnReadError(err) {
1167
0
                            self.close0(error: err, mode: .input, promise: nil)
1168
0
                        }
1169
0
                        self.readPending = false
1170
0
                        return .eof
1171
0
                    }
1172
0
                }
1173
0
            } else {
1174
0
                readStreamState = .error
1175
0
                self.pipeline.syncOperations.fireErrorCaught(err)
1176
0
            }
1177
0
1178
0
            if self.shouldCloseOnReadError(err) {
1179
0
                self.close0(error: err, mode: .all, promise: nil)
1180
0
                return readStreamState
1181
0
            } else {
1182
0
                // This is non-fatal, so continue as normal.
1183
0
                // This constitutes "some" as we did get at least an error from the socket.
1184
0
                readResult = .some
1185
0
            }
1186
0
        }
1187
0
        // This assert needs to be disabled for io_uring, as the io_uring backend does not have the implicit synchronisation between
1188
0
        // modifications to the poll mask and the actual returned events on the completion queue that kqueue and epoll has.
1189
0
        // For kqueue and epoll, there is an implicit synchronisation point such that after a modification of the poll mask has been
1190
0
        // issued, the next call to reap events will be sure to not include events which does not match the new poll mask.
1191
0
        // Specifically for this assert, it means that we will be guaranteed to never receive a POLLIN notification unless there are
1192
0
        // bytes available to read.
1193
0
1194
0
        // For a fully asynchronous backend like io_uring, there are no such implicit synchronisation point, so after we have
1195
0
        // submitted the asynchronous event to change the poll mask, we may still reap pending asynchronous replies for the old
1196
0
        // poll mask, and thus receive a POLLIN even though we have modified the mask visavi the kernel.
1197
0
        // Which would trigger the assert.
1198
0
1199
0
        // The only way to avoid that race, would be to use heavy handed synchronisation primitives like IOSQE_IO_DRAIN (basically
1200
0
        // flushing all pending requests and wait for a fake event result to sync up) which would be awful for performance,
1201
0
        // so better skip the assert() for io_uring instead.
1202
0
        #if !SWIFTNIO_USE_IO_URING
1203
0
        assert(readResult == .some)
1204
0
        #endif
1205
0
        if self.lifecycleManager.isActive {
1206
0
            self.pipeline.syncOperations.fireChannelReadComplete()
1207
0
        }
1208
0
        self.readIfNeeded0()
1209
0
        return .normal(readResult)
1210
0
    }
1211
1212
    /// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`.
1213
    ///
1214
    /// - Parameters:
1215
    ///   - err: The `Error` which was thrown by `readFromSocket`.
1216
    /// - Returns: `true` if the `Channel` should be closed, `false` otherwise.
1217
0
    func shouldCloseOnReadError(_ err: Error) -> Bool {
1218
0
        true
1219
0
    }
1220
1221
    /// Handles an error reported by the selector.
1222
    ///
1223
    /// Default behaviour is to treat this as if it were a reset.
1224
0
    func error() -> ErrorResult {
1225
0
        self.reset()
1226
0
        return .fatal
1227
0
    }
1228
1229
0
    internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) {
1230
0
        self.eventLoop.assertInEventLoop()
1231
0
        assert(updateLocal || updateRemote)
1232
0
        let cached = self.addressesCached
1233
0
        let local = updateLocal ? try? self.localAddress0() : cached.local
1234
0
        let remote = updateRemote ? try? self.remoteAddress0() : cached.remote
1235
0
        self.addressesCached = AddressCache(local: local, remote: remote)
1236
0
    }
1237
1238
0
    internal final func unsetCachedAddressesFromSocket() {
1239
0
        self.eventLoop.assertInEventLoop()
1240
0
        self.addressesCached = AddressCache(local: nil, remote: nil)
1241
0
    }
1242
1243
0
    public final func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
1244
0
        self.connect0(to: .socketAddress(address), promise: promise)
1245
0
    }
1246
1247
0
    internal final func connect0(to target: ConnectTarget, promise: EventLoopPromise<Void>?) {
1248
0
        self.eventLoop.assertInEventLoop()
1249
0
1250
0
        guard self.isOpen else {
1251
0
            promise?.fail(ChannelError._ioOnClosedChannel)
1252
0
            return
1253
0
        }
1254
0
1255
0
        guard pendingConnect == nil else {
1256
0
            promise?.fail(ChannelError._connectPending)
1257
0
            return
1258
0
        }
1259
0
1260
0
        guard self.lifecycleManager.isPreRegistered else {
1261
0
            promise?.fail(ChannelError._inappropriateOperationForState)
1262
0
            return
1263
0
        }
1264
0
1265
0
        do {
1266
0
            if try !self.connectSocket(to: target) {
1267
0
                // We aren't connected, we'll get the remote address later.
1268
0
                self.updateCachedAddressesFromSocket(updateLocal: true, updateRemote: false)
1269
0
                if promise != nil {
1270
0
                    self.pendingConnect = promise
1271
0
                } else {
1272
0
                    self.pendingConnect = eventLoop.makePromise()
1273
0
                }
1274
0
                try self.becomeFullyRegistered0()
1275
0
                self.registerForWritable()
1276
0
            } else {
1277
0
                self.updateCachedAddressesFromSocket()
1278
0
                self.becomeActive0(promise: promise)
1279
0
            }
1280
0
        } catch let error {
1281
0
            assert(self.lifecycleManager.isPreRegistered)
1282
0
            // We would like to have this assertion here, but we want to be able to go through this
1283
0
            // code path in cases where connect() is being called on channels that are already active.
1284
0
            //assert(!self.lifecycleManager.isActive)
1285
0
            // We're going to set the promise as the pending connect promise, and let close0 fail it for us.
1286
0
            self.pendingConnect = promise
1287
0
            self.close0(error: error, mode: .all, promise: nil)
1288
0
        }
1289
0
    }
1290
1291
0
    public func channelRead0(_ data: NIOAny) {
1292
0
        // Do nothing by default
1293
0
        // note: we can't assert that we're active here as TailChannelHandler will call this on channelRead
1294
0
    }
1295
1296
0
    public func errorCaught0(error: Error) {
1297
0
        // Do nothing
1298
0
    }
1299
1300
0
    private func isWritePending() -> Bool {
1301
0
        self.interestedEvent.contains(.write)
1302
0
    }
1303
1304
0
    private final func safeReregister(interested: SelectorEventSet) {
1305
0
        self.eventLoop.assertInEventLoop()
1306
0
        assert(self.lifecycleManager.isRegisteredFully)
1307
0
1308
0
        guard self.isOpen else {
1309
0
            assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) even though we're closed")
1310
0
            return
1311
0
        }
1312
0
        if interested == interestedEvent {
1313
0
            // we don't need to update and so cause a syscall if we already are registered with the correct event
1314
0
            return
1315
0
        }
1316
0
        interestedEvent = interested
1317
0
        do {
1318
0
            try selectableEventLoop.reregister(channel: self)
1319
0
        } catch let err {
1320
0
            self.pipeline.syncOperations.fireErrorCaught(err)
1321
0
            self.close0(error: err, mode: .all, promise: nil)
1322
0
        }
1323
0
    }
1324
1325
0
    private func safeRegister(interested: SelectorEventSet) throws {
1326
0
        self.eventLoop.assertInEventLoop()
1327
0
        assert(!self.lifecycleManager.isRegisteredFully)
1328
0
1329
0
        guard self.isOpen else {
1330
0
            throw ChannelError._ioOnClosedChannel
1331
0
        }
1332
0
1333
0
        self.interestedEvent = interested
1334
0
        do {
1335
0
            try self.selectableEventLoop.register(channel: self)
1336
0
        } catch {
1337
0
            self.pipeline.syncOperations.fireErrorCaught(error)
1338
0
            self.close0(error: error, mode: .all, promise: nil)
1339
0
            throw error
1340
0
        }
1341
0
    }
1342
1343
0
    final func becomeFullyRegistered0() throws {
1344
0
        self.eventLoop.assertInEventLoop()
1345
0
        assert(self.lifecycleManager.isPreRegistered)
1346
0
        assert(!self.lifecycleManager.isRegisteredFully)
1347
0
1348
0
        // The initial set of interested events must not contain `.readEOF` because when connect doesn't return
1349
0
        // synchronously, kevent might send us a `readEOF` because the `writable` event that marks the connect as completed.
1350
0
        // See SocketChannelTest.testServerClosesTheConnectionImmediately for a regression test.
1351
0
        try self.safeRegister(interested: [.reset, .error])
1352
0
        self.lifecycleManager.finishRegistration()(nil, self.pipeline)
1353
0
    }
1354
1355
0
    final func becomeActive0(promise: EventLoopPromise<Void>?) {
1356
0
        self.eventLoop.assertInEventLoop()
1357
0
        assert(self.lifecycleManager.isPreRegistered)
1358
0
        if !self.lifecycleManager.isRegisteredFully {
1359
0
            do {
1360
0
                try self.becomeFullyRegistered0()
1361
0
                assert(self.lifecycleManager.isRegisteredFully)
1362
0
            } catch {
1363
0
                self.close0(error: error, mode: .all, promise: promise)
1364
0
                return
1365
0
            }
1366
0
        }
1367
0
        self.lifecycleManager.activate()(promise, self.pipeline)
1368
0
        guard self.lifecycleManager.isOpen else {
1369
0
            // in the user callout for `channelActive` the channel got closed.
1370
0
            return
1371
0
        }
1372
0
        self.registerForReadEOF()
1373
0
1374
0
        // Flush any pending writes. If after the flush we're still open, make sure
1375
0
        // our registration is appropriate.
1376
0
        switch self.flushNow() {
1377
0
        case .register:
1378
0
            if self.lifecycleManager.isOpen && !self.interestedEvent.contains(.write) {
1379
0
                self.registerForWritable()
1380
0
            }
1381
0
        case .unregister:
1382
0
            if self.lifecycleManager.isOpen && self.interestedEvent.contains(.write) {
1383
0
                self.unregisterForWritable()
1384
0
            }
1385
0
        }
1386
0
1387
0
        self.readIfNeeded0()
1388
0
    }
1389
1390
0
    func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
1391
0
        fatalError("must override")
1392
0
    }
1393
1394
0
    func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
1395
0
        fatalError("must override")
1396
0
    }
1397
1398
0
    func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
1399
0
        fatalError("must override")
1400
0
    }
1401
}
1402
1403
extension BaseSocketChannel {
1404
    public struct SynchronousOptions: NIOSynchronousChannelOptions {
1405
        @usableFromInline  // should be private
1406
        internal let _channel: BaseSocketChannel<SocketType>
1407
1408
        @inlinable  // should be fileprivate
1409
0
        internal init(_channel channel: BaseSocketChannel<SocketType>) {
1410
0
            self._channel = channel
1411
0
        }
1412
1413
        @inlinable
1414
0
        public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
1415
0
            try self._channel.setOption0(option, value: value)
1416
0
        }
1417
1418
        @inlinable
1419
0
        public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
1420
0
            try self._channel.getOption0(option)
1421
0
        }
1422
    }
1423
1424
0
    public final var syncOptions: NIOSynchronousChannelOptions? {
1425
0
        SynchronousOptions(_channel: self)
1426
0
    }
1427
}
1428
1429
/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`).
1430
0
func executeAndComplete<Value: Sendable>(_ promise: EventLoopPromise<Value>?, _ body: () throws -> Value) {
1431
0
    do {
1432
0
        let result = try body()
1433
0
        promise?.succeed(result)
1434
0
    } catch let e {
1435
0
        promise?.fail(e)
1436
0
    }
1437
0
}