/src/swift-nio/Sources/NIOPosix/BaseSocketChannel.swift
Line | Count | Source |
1 | | //===----------------------------------------------------------------------===// |
2 | | // |
3 | | // This source file is part of the SwiftNIO open source project |
4 | | // |
5 | | // Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors |
6 | | // Licensed under Apache License v2.0 |
7 | | // |
8 | | // See LICENSE.txt for license information |
9 | | // See CONTRIBUTORS.txt for the list of SwiftNIO project authors |
10 | | // |
11 | | // SPDX-License-Identifier: Apache-2.0 |
12 | | // |
13 | | //===----------------------------------------------------------------------===// |
14 | | |
15 | | #if !os(WASI) |
16 | | |
17 | | import Atomics |
18 | | import NIOConcurrencyHelpers |
19 | | import NIOCore |
20 | | |
21 | | private struct SocketChannelLifecycleManager { |
22 | | // MARK: Types |
23 | | private enum State { |
24 | | case fresh |
25 | | case preRegistered // register() has been run but the selector doesn't know about it yet |
26 | | case fullyRegistered // fully registered, ie. the selector knows about it |
27 | | case preActivation // activated, but we have not fired channelActivated yet |
28 | | case fullyActivated // fully activated, i.e., the signal has been fired |
29 | | case closed |
30 | | } |
31 | | |
32 | | private enum Event { |
33 | | case beginActivation |
34 | | case finishActivation |
35 | | case beginRegistration |
36 | | case finishRegistration |
37 | | case close |
38 | | } |
39 | | |
40 | | // MARK: properties |
41 | | private let eventLoop: EventLoop |
42 | | // this is queried from the Channel, ie. must be thread-safe |
43 | | internal let isActiveAtomic: ManagedAtomic<Bool> |
44 | | // these are only to be accessed on the EventLoop |
45 | | |
46 | | // have we seen the `.readEOF` notification |
47 | | // note: this can be `false` on a deactivated channel, we might just have torn it down. |
48 | 0 | var hasSeenEOFNotification: Bool = false |
49 | | |
50 | | // Should we support transition from `active` to `active`, used by datagram sockets. |
51 | | let supportsReconnect: Bool |
52 | | |
53 | 0 | private var currentState: State = .fresh { |
54 | 0 | didSet { |
55 | 0 | self.eventLoop.assertInEventLoop() |
56 | 0 | switch (oldValue, self.currentState) { |
57 | 0 | case (_, .preActivation): |
58 | 0 | self.isActiveAtomic.store(true, ordering: .relaxed) |
59 | 0 | case (.preActivation, .fullyActivated): |
60 | 0 | () // Stay active |
61 | 0 | case (.preActivation, _), (.fullyActivated, _): |
62 | 0 | self.isActiveAtomic.store(false, ordering: .relaxed) |
63 | 0 | default: |
64 | 0 | () |
65 | 0 | } |
66 | 0 | } |
67 | | } |
68 | | |
69 | | // MARK: API |
70 | | // isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable |
71 | | internal init( |
72 | | eventLoop: EventLoop, |
73 | | isActiveAtomic: ManagedAtomic<Bool>, |
74 | | supportReconnect: Bool |
75 | 0 | ) { |
76 | 0 | self.eventLoop = eventLoop |
77 | 0 | self.isActiveAtomic = isActiveAtomic |
78 | 0 | self.supportsReconnect = supportReconnect |
79 | 0 | } |
80 | | |
81 | | // this is called from Channel's deinit, so don't assert we're on the EventLoop! |
82 | 0 | internal var canBeDestroyed: Bool { |
83 | 0 | self.currentState == .closed |
84 | 0 | } |
85 | | |
86 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
87 | | @inline(__always) |
88 | 0 | internal mutating func beginRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
89 | 0 | self.moveState(event: .beginRegistration) |
90 | 0 | } |
91 | | |
92 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
93 | | @inline(__always) |
94 | 0 | internal mutating func finishRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
95 | 0 | self.moveState(event: .finishRegistration) |
96 | 0 | } |
97 | | |
98 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
99 | | @inline(__always) |
100 | 0 | internal mutating func close() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
101 | 0 | self.moveState(event: .close) |
102 | 0 | } |
103 | | |
104 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
105 | | @inline(__always) |
106 | 0 | internal mutating func beginActivation() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
107 | 0 | self.moveState(event: .beginActivation) |
108 | 0 | } |
109 | | |
110 | | @inline(__always) |
111 | 0 | internal mutating func finishActivation() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
112 | 0 | self.moveState(event: .finishActivation) |
113 | 0 | } |
114 | | |
115 | | // MARK: private API |
116 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
117 | | @inline(__always) |
118 | 0 | private mutating func moveState(event: Event) -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
119 | 0 | self.eventLoop.assertInEventLoop() |
120 | 0 |
|
121 | 0 | switch (self.currentState, event) { |
122 | 0 | // origin: .fresh |
123 | 0 | case (.fresh, .beginRegistration): |
124 | 0 | self.currentState = .preRegistered |
125 | 0 | return { promise, pipeline in |
126 | 0 | promise?.succeed(()) |
127 | 0 | pipeline.syncOperations.fireChannelRegistered() |
128 | 0 | } |
129 | 0 |
|
130 | 0 | case (.fresh, .close): |
131 | 0 | self.currentState = .closed |
132 | 0 | return { (promise, _: ChannelPipeline) in |
133 | 0 | promise?.succeed(()) |
134 | 0 | } |
135 | 0 |
|
136 | 0 | // origin: .preRegistered |
137 | 0 | case (.preRegistered, .finishRegistration): |
138 | 0 | self.currentState = .fullyRegistered |
139 | 0 | return { (promise, _: ChannelPipeline) in |
140 | 0 | promise?.succeed(()) |
141 | 0 | } |
142 | 0 |
|
143 | 0 | // origin: .fullyRegistered |
144 | 0 | case (.fullyRegistered, .beginActivation): |
145 | 0 | self.currentState = .preActivation |
146 | 0 | return { promise, pipeline in |
147 | 0 | promise?.succeed(()) |
148 | 0 | } |
149 | 0 |
|
150 | 0 | // origin: .preRegistered || .fullyRegistered |
151 | 0 | case (.preRegistered, .close), (.fullyRegistered, .close): |
152 | 0 | self.currentState = .closed |
153 | 0 | return { promise, pipeline in |
154 | 0 | promise?.succeed(()) |
155 | 0 | pipeline.syncOperations.fireChannelUnregistered() |
156 | 0 | } |
157 | 0 |
|
158 | 0 | // origin: .preActivation |
159 | 0 | case (.preActivation, .finishActivation): |
160 | 0 | self.currentState = .fullyActivated |
161 | 0 | return { promise, pipeline in |
162 | 0 | assert(promise == nil) |
163 | 0 | pipeline.syncOperations.fireChannelActive() |
164 | 0 | } |
165 | 0 |
|
166 | 0 | case (.preActivation, .close): |
167 | 0 | self.currentState = .closed |
168 | 0 | return { promise, pipeline in |
169 | 0 | promise?.succeed(()) |
170 | 0 | // Don't fire channelInactive here. |
171 | 0 | pipeline.syncOperations.fireChannelUnregistered() |
172 | 0 | } |
173 | 0 |
|
174 | 0 | case (.preActivation, .beginActivation) where self.supportsReconnect: |
175 | 0 | return { promise, pipeline in |
176 | 0 | promise?.succeed(()) |
177 | 0 | } |
178 | 0 |
|
179 | 0 | // origin: .fullyActivated |
180 | 0 | case (.fullyActivated, .close): |
181 | 0 | self.currentState = .closed |
182 | 0 | return { promise, pipeline in |
183 | 0 | promise?.succeed(()) |
184 | 0 | pipeline.syncOperations.fireChannelInactive() |
185 | 0 | pipeline.syncOperations.fireChannelUnregistered() |
186 | 0 | } |
187 | 0 |
|
188 | 0 | case (.fullyActivated, .beginActivation) where self.supportsReconnect: |
189 | 0 | return { promise, pipeline in |
190 | 0 | promise?.succeed(()) |
191 | 0 | } |
192 | 0 |
|
193 | 0 | case (.fullyActivated, .finishActivation) where self.supportsReconnect: |
194 | 0 | return { promise, pipeline in |
195 | 0 | assert(promise == nil) |
196 | 0 | } |
197 | 0 |
|
198 | 0 | // bad transitions |
199 | 0 | case (.fresh, .beginActivation), // should go through .registered first |
200 | 0 | (.fresh, .finishActivation), // should go through .registerd first |
201 | 0 | (.preRegistered, .beginActivation), // need to first be fully registered |
202 | 0 | (.preRegistered, .finishActivation), // need to first be fully registered |
203 | 0 | (.preRegistered, .beginRegistration), // already registered |
204 | 0 | (.fullyRegistered, .beginRegistration), // already registered |
205 | 0 | (.fullyRegistered, .finishActivation), // need to activate first |
206 | 0 | (.preActivation, .beginActivation), // already activated |
207 | 0 | (.preActivation, .beginRegistration), // already fully registered (and activated) |
208 | 0 | (.preActivation, .finishRegistration), // already fully registered (and activated) |
209 | 0 | (.fullyActivated, .beginActivation), // need to activate first |
210 | 0 | (.fullyActivated, .beginRegistration), // already fully registered |
211 | 0 | (.fullyActivated, .finishRegistration), // already fully registered |
212 | 0 | (.fullyActivated, .finishActivation), // already signaled |
213 | 0 | (.fullyRegistered, .finishRegistration), // already fully registered |
214 | 0 | (.fresh, .finishRegistration), // need to register lazily first |
215 | 0 | (.closed, _): // already closed |
216 | 0 | self.badTransition(event: event) |
217 | 0 | } |
218 | 0 | } |
219 | | |
220 | 0 | private func badTransition(event: Event) -> Never { |
221 | 0 | preconditionFailure("illegal transition: state=\(self.currentState), event=\(event)") |
222 | 0 | } |
223 | | |
224 | | // MARK: convenience properties |
225 | 0 | internal var isActive: Bool { |
226 | 0 | self.eventLoop.assertInEventLoop() |
227 | 0 | switch self.currentState { |
228 | 0 | case .preActivation, .fullyActivated: |
229 | 0 | return true |
230 | 0 | case .fresh, .preRegistered, .fullyRegistered, .closed: |
231 | 0 | return false |
232 | 0 | } |
233 | 0 | } |
234 | | |
235 | 0 | internal var isPreRegistered: Bool { |
236 | 0 | self.eventLoop.assertInEventLoop() |
237 | 0 | switch self.currentState { |
238 | 0 | case .fresh, .closed: |
239 | 0 | return false |
240 | 0 | case .preRegistered, .fullyRegistered, .preActivation, .fullyActivated: |
241 | 0 | return true |
242 | 0 | } |
243 | 0 | } |
244 | | |
245 | 0 | internal var isRegisteredFully: Bool { |
246 | 0 | self.eventLoop.assertInEventLoop() |
247 | 0 | switch self.currentState { |
248 | 0 | case .fresh, .closed, .preRegistered: |
249 | 0 | return false |
250 | 0 | case .fullyRegistered, .preActivation, .fullyActivated: |
251 | 0 | return true |
252 | 0 | } |
253 | 0 | } |
254 | | |
255 | | /// Returns whether the underlying file descriptor is open. This property will always be true (even before registration) |
256 | | /// until the Channel is closed. |
257 | 0 | internal var isOpen: Bool { |
258 | 0 | self.eventLoop.assertInEventLoop() |
259 | 0 | return self.currentState != .closed |
260 | 0 | } |
261 | | } |
262 | | |
263 | | /// The base class for all socket-based channels in NIO. |
264 | | /// |
265 | | /// There are many types of specialised socket-based channel in NIO. Each of these |
266 | | /// has different logic regarding how exactly they read from and write to the network. |
267 | | /// However, they share a great deal of common logic around the managing of their |
268 | | /// file descriptors. |
269 | | /// |
270 | | /// For this reason, `BaseSocketChannel` exists to provide a common core implementation of |
271 | | /// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks |
272 | | /// for subclasses to implement the specific logic to handle their writes and reads. |
273 | | class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, ChannelCore, @unchecked Sendable { |
274 | | typealias SelectableType = SocketType.SelectableType |
275 | | |
276 | | struct AddressCache { |
277 | | // deliberately lets because they must always be updated together (so forcing `init` is useful). |
278 | | let local: SocketAddress? |
279 | | let remote: SocketAddress? |
280 | | |
281 | 0 | init(local: SocketAddress?, remote: SocketAddress?) { |
282 | 0 | self.local = local |
283 | 0 | self.remote = remote |
284 | 0 | } |
285 | | } |
286 | | |
287 | | // MARK: - Stored Properties |
288 | | // MARK: Constants & atomics (accessible everywhere) |
289 | | public let parent: Channel? |
290 | | internal let socket: SocketType |
291 | | private let closePromise: EventLoopPromise<Void> |
292 | | internal let selectableEventLoop: SelectableEventLoop |
293 | 0 | private let _offEventLoopLock = NIOLock() |
294 | 0 | private let isActiveAtomic: ManagedAtomic<Bool> = .init(false) |
295 | | // just a thread-safe way of having something to print about the socket from any thread |
296 | | internal let socketDescription: String |
297 | | |
298 | | // MARK: Variables, on EventLoop thread only |
299 | 0 | var readPending = false |
300 | | var pendingConnect: Optional<EventLoopPromise<Void>> |
301 | | var recvBufferPool: NIOPooledRecvBufferAllocator |
302 | 0 | var maxMessagesPerRead: UInt = 4 |
303 | 0 | private var inFlushNow: Bool = false // Guard against re-entrance of flushNow() method. |
304 | 0 | private var autoRead: Bool = true |
305 | | |
306 | | // MARK: Variables that are really constant |
307 | | // this is really a constant (set in .init) but needs `self` to be constructed and |
308 | | // therefore a `var`. Do not change as this needs to accessed from arbitrary threads |
309 | 0 | private var _pipeline: ChannelPipeline! = nil |
310 | | |
311 | | // MARK: Special variables, please read comments. |
312 | | // For reads guarded by _either_ `self._offEventLoopLock` or the EL thread |
313 | | // Writes are guarded by _offEventLoopLock _and_ the EL thread. |
314 | | // PLEASE don't use these directly and use the non-underscored computed properties instead. |
315 | 0 | private var _addressCache = AddressCache(local: nil, remote: nil) // please use `self.addressesCached` instead |
316 | | private var _bufferAllocatorCache: ByteBufferAllocator // please use `self.bufferAllocatorCached` instead. |
317 | | |
318 | | // MARK: - Computed properties |
319 | | // This is called from arbitrary threads. |
320 | | internal var addressesCached: AddressCache { |
321 | 0 | get { |
322 | 0 | if self.eventLoop.inEventLoop { |
323 | 0 | return self._addressCache |
324 | 0 | } else { |
325 | 0 | return self._offEventLoopLock.withLock { |
326 | 0 | self._addressCache |
327 | 0 | } |
328 | 0 | } |
329 | 0 | } |
330 | 0 | set { |
331 | 0 | self.eventLoop.preconditionInEventLoop() |
332 | 0 | self._offEventLoopLock.withLock { |
333 | 0 | self._addressCache = newValue |
334 | 0 | } |
335 | 0 | } |
336 | | } |
337 | | |
338 | | // This is called from arbitrary threads. |
339 | | private var bufferAllocatorCached: ByteBufferAllocator { |
340 | 0 | get { |
341 | 0 | if self.eventLoop.inEventLoop { |
342 | 0 | return self._bufferAllocatorCache |
343 | 0 | } else { |
344 | 0 | return self._offEventLoopLock.withLock { |
345 | 0 | self._bufferAllocatorCache |
346 | 0 | } |
347 | 0 | } |
348 | 0 | } |
349 | 0 | set { |
350 | 0 | self.eventLoop.preconditionInEventLoop() |
351 | 0 | self._offEventLoopLock.withLock { |
352 | 0 | self._bufferAllocatorCache = newValue |
353 | 0 | } |
354 | 0 | } |
355 | | } |
356 | | |
357 | | // We start with the invalid empty set of selector events we're interested in. This is to make sure we later on |
358 | | // (in `becomeFullyRegistered0`) seed the initial event correctly. |
359 | 0 | internal var interestedEvent: SelectorEventSet = [] { |
360 | 0 | didSet { |
361 | 0 | assert(self.interestedEvent.contains(.reset), "impossible to unregister for reset") |
362 | 0 | } |
363 | | } |
364 | | |
365 | | private var lifecycleManager: SocketChannelLifecycleManager { |
366 | 0 | didSet { |
367 | 0 | self.eventLoop.assertInEventLoop() |
368 | 0 | } |
369 | | } |
370 | | |
371 | 0 | private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() { |
372 | 0 | didSet { |
373 | 0 | self.eventLoop.assertInEventLoop() |
374 | 0 | self.bufferAllocatorCached = self.bufferAllocator |
375 | 0 | } |
376 | | } |
377 | | |
378 | 0 | public final var _channelCore: ChannelCore { self } |
379 | | |
380 | | // This is `Channel` API so must be thread-safe. |
381 | 0 | public final var localAddress: SocketAddress? { |
382 | 0 | self.addressesCached.local |
383 | 0 | } |
384 | | |
385 | | // This is `Channel` API so must be thread-safe. |
386 | 0 | public final var remoteAddress: SocketAddress? { |
387 | 0 | self.addressesCached.remote |
388 | 0 | } |
389 | | |
390 | | /// `false` if the whole `Channel` is closed and so no more IO operation can be done. |
391 | 0 | var isOpen: Bool { |
392 | 0 | self.eventLoop.assertInEventLoop() |
393 | 0 | return self.lifecycleManager.isOpen |
394 | 0 | } |
395 | | |
396 | 0 | var isRegistered: Bool { |
397 | 0 | self.eventLoop.assertInEventLoop() |
398 | 0 | return self.lifecycleManager.isPreRegistered |
399 | 0 | } |
400 | | |
401 | | // This is `Channel` API so must be thread-safe. |
402 | 0 | public var isActive: Bool { |
403 | 0 | self.isActiveAtomic.load(ordering: .relaxed) |
404 | 0 | } |
405 | | |
406 | | // This is `Channel` API so must be thread-safe. |
407 | 0 | public final var closeFuture: EventLoopFuture<Void> { |
408 | 0 | self.closePromise.futureResult |
409 | 0 | } |
410 | | |
411 | 0 | public final var eventLoop: EventLoop { |
412 | 0 | selectableEventLoop |
413 | 0 | } |
414 | | |
415 | | // This is `Channel` API so must be thread-safe. |
416 | 0 | public var isWritable: Bool { |
417 | 0 | true |
418 | 0 | } |
419 | | |
420 | | // This is `Channel` API so must be thread-safe. |
421 | 0 | public final var allocator: ByteBufferAllocator { |
422 | 0 | self.bufferAllocatorCached |
423 | 0 | } |
424 | | |
425 | | // This is `Channel` API so must be thread-safe. |
426 | 0 | public final var pipeline: ChannelPipeline { |
427 | 0 | self._pipeline |
428 | 0 | } |
429 | | |
430 | | // MARK: Methods to override in subclasses. |
431 | 0 | func writeToSocket() throws -> OverallWriteResult { |
432 | 0 | fatalError("must be overridden") |
433 | 0 | } |
434 | | |
435 | | /// Read data from the underlying socket and dispatch it to the `ChannelPipeline` |
436 | | /// |
437 | | /// - Returns: `true` if any data was read, `false` otherwise. |
438 | 0 | @discardableResult func readFromSocket() throws -> ReadResult { |
439 | 0 | fatalError("this must be overridden by sub class") |
440 | 0 | } |
441 | | |
442 | | // MARK: - Datatypes |
443 | | |
444 | | /// Indicates if a selectable should registered or not for IO notifications. |
445 | | enum IONotificationState { |
446 | | /// We should be registered for IO notifications. |
447 | | case register |
448 | | |
449 | | /// We should not be registered for IO notifications. |
450 | | case unregister |
451 | | } |
452 | | |
453 | | enum ReadResult { |
454 | | /// Nothing was read by the read operation. |
455 | | case none |
456 | | |
457 | | /// Some data was read by the read operation. |
458 | | case some |
459 | | } |
460 | | |
461 | | /// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream. |
462 | | /// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course) |
463 | | private enum ReadStreamState: Equatable { |
464 | | /// Everything seems normal |
465 | | case normal(ReadResult) |
466 | | |
467 | | /// We saw EOF. |
468 | | case eof |
469 | | |
470 | | /// A read error was received. |
471 | | case error |
472 | | } |
473 | | |
474 | | /// Begin connection of the underlying socket. |
475 | | /// |
476 | | /// - Parameters: |
477 | | /// - to: The `SocketAddress` to connect to. |
478 | | /// - Returns: `true` if the socket connected synchronously, `false` otherwise. |
479 | 0 | func connectSocket(to address: SocketAddress) throws -> Bool { |
480 | 0 | fatalError("this must be overridden by sub class") |
481 | 0 | } |
482 | | |
483 | | /// Begin connection of the underlying socket. |
484 | | /// |
485 | | /// - Parameters: |
486 | | /// - to: The `VsockAddress` to connect to. |
487 | | /// - Returns: `true` if the socket connected synchronously, `false` otherwise. |
488 | 0 | func connectSocket(to address: VsockAddress) throws -> Bool { |
489 | 0 | fatalError("this must be overridden by sub class") |
490 | 0 | } |
491 | | |
492 | | enum ConnectTarget { |
493 | | case socketAddress(SocketAddress) |
494 | | case vsockAddress(VsockAddress) |
495 | | } |
496 | | |
497 | | /// Begin connection of the underlying socket. |
498 | | /// |
499 | | /// - Parameters: |
500 | | /// - to: The target to connect to. |
501 | | /// - Returns: `true` if the socket connected synchronously, `false` otherwise. |
502 | 0 | final func connectSocket(to target: ConnectTarget) throws -> Bool { |
503 | 0 | switch target { |
504 | 0 | case .socketAddress(let address): |
505 | 0 | return try self.connectSocket(to: address) |
506 | 0 | case .vsockAddress(let address): |
507 | 0 | return try self.connectSocket(to: address) |
508 | 0 | } |
509 | 0 | } |
510 | | |
511 | | /// Make any state changes needed to complete the connection process. |
512 | 0 | func finishConnectSocket() throws { |
513 | 0 | fatalError("this must be overridden by sub class") |
514 | 0 | } |
515 | | |
516 | | /// Returns if there are any flushed, pending writes to be sent over the network. |
517 | 0 | func hasFlushedPendingWrites() -> Bool { |
518 | 0 | fatalError("this must be overridden by sub class") |
519 | 0 | } |
520 | | |
521 | | /// Buffer a write in preparation for a flush. |
522 | 0 | func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) { |
523 | 0 | fatalError("this must be overridden by sub class") |
524 | 0 | } |
525 | | |
526 | | /// Mark a flush point. This is called when flush is received, and instructs |
527 | | /// the implementation to record the flush. |
528 | 0 | func markFlushPoint() { |
529 | 0 | fatalError("this must be overridden by sub class") |
530 | 0 | } |
531 | | |
532 | | /// Called when closing, to instruct the specific implementation to discard all pending |
533 | | /// writes. |
534 | 0 | func cancelWritesOnClose(error: Error) { |
535 | 0 | fatalError("this must be overridden by sub class") |
536 | 0 | } |
537 | | |
538 | | // MARK: Common base socket logic. |
539 | | init( |
540 | | socket: SocketType, |
541 | | parent: Channel?, |
542 | | eventLoop: SelectableEventLoop, |
543 | | recvAllocator: RecvByteBufferAllocator, |
544 | | supportReconnect: Bool |
545 | 0 | ) throws { |
546 | 0 | self._bufferAllocatorCache = self.bufferAllocator |
547 | 0 | self.socket = socket |
548 | 0 | self.selectableEventLoop = eventLoop |
549 | 0 | self.closePromise = eventLoop.makePromise() |
550 | 0 | self.parent = parent |
551 | 0 | self.recvBufferPool = .init(capacity: Int(self.maxMessagesPerRead), recvAllocator: recvAllocator) |
552 | 0 | // As the socket may already be connected we should ensure we start with the correct addresses cached. |
553 | 0 | self._addressCache = .init(local: try? socket.localAddress(), remote: try? socket.remoteAddress()) |
554 | 0 | self.lifecycleManager = SocketChannelLifecycleManager( |
555 | 0 | eventLoop: eventLoop, |
556 | 0 | isActiveAtomic: self.isActiveAtomic, |
557 | 0 | supportReconnect: supportReconnect |
558 | 0 | ) |
559 | 0 | self.socketDescription = socket.description |
560 | 0 | self.pendingConnect = nil |
561 | 0 | self._pipeline = ChannelPipeline(channel: self) |
562 | 0 | } |
563 | | |
564 | 0 | deinit { |
565 | 0 | assert( |
566 | 0 | self.lifecycleManager.canBeDestroyed, |
567 | 0 | "leak of open Channel, state: \(String(describing: self.lifecycleManager))" |
568 | 0 | ) |
569 | 0 | } |
570 | | |
571 | 0 | public final func localAddress0() throws -> SocketAddress { |
572 | 0 | self.eventLoop.assertInEventLoop() |
573 | 0 | guard self.isOpen else { |
574 | 0 | throw ChannelError._ioOnClosedChannel |
575 | 0 | } |
576 | 0 | return try self.socket.localAddress() |
577 | 0 | } |
578 | | |
579 | 0 | public final func remoteAddress0() throws -> SocketAddress { |
580 | 0 | self.eventLoop.assertInEventLoop() |
581 | 0 | guard self.isOpen else { |
582 | 0 | throw ChannelError._ioOnClosedChannel |
583 | 0 | } |
584 | 0 | return try self.socket.remoteAddress() |
585 | 0 | } |
586 | | |
587 | | /// Flush data to the underlying socket and return if this socket needs to be registered for write notifications. |
588 | | /// |
589 | | /// This method can be called re-entrantly but it will return immediately because the first call is responsible |
590 | | /// for sending all flushed writes, even the ones that are accumulated whilst `flushNow()` is running. |
591 | | /// |
592 | | /// - Returns: If this socket should be registered for write notifications. Ie. `IONotificationState.register` if |
593 | | /// _not_ all data could be written, so notifications are necessary; and `IONotificationState.unregister` |
594 | | /// if everything was written and we don't need to be notified about writability at the moment. |
595 | 0 | func flushNow() -> IONotificationState { |
596 | 0 | self.eventLoop.assertInEventLoop() |
597 | 0 |
|
598 | 0 | // Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by |
599 | 0 | // `writeToSocket`. |
600 | 0 | guard !self.inFlushNow else { |
601 | 0 | return .unregister |
602 | 0 | } |
603 | 0 |
|
604 | 0 | assert(!self.inFlushNow) |
605 | 0 | self.inFlushNow = true |
606 | 0 | defer { |
607 | 0 | self.inFlushNow = false |
608 | 0 | } |
609 | 0 |
|
610 | 0 | var newWriteRegistrationState: IONotificationState = .unregister |
611 | 0 | while newWriteRegistrationState == .unregister && self.hasFlushedPendingWrites() && self.isOpen { |
612 | 0 | let writeResult: OverallWriteResult |
613 | 0 | do { |
614 | 0 | assert(self.lifecycleManager.isActive) |
615 | 0 | writeResult = try self.writeToSocket() |
616 | 0 | if writeResult.writabilityChange { |
617 | 0 | // We went from not writable to writable. |
618 | 0 | self.pipeline.syncOperations.fireChannelWritabilityChanged() |
619 | 0 | } |
620 | 0 | } catch let err { |
621 | 0 | // If there is a write error we should try drain the inbound before closing the socket as there may be some data pending. |
622 | 0 | // We ignore any error that is thrown as we will use the original err to close the channel and notify the user. |
623 | 0 | if self.readIfNeeded0() { |
624 | 0 | assert(self.lifecycleManager.isActive) |
625 | 0 |
|
626 | 0 | // We need to continue reading until there is nothing more to be read from the socket as we will not have another chance to drain it. |
627 | 0 | var readAtLeastOnce = false |
628 | 0 | while let read = try? self.readFromSocket(), read == .some { |
629 | 0 | readAtLeastOnce = true |
630 | 0 | } |
631 | 0 | if readAtLeastOnce && self.lifecycleManager.isActive { |
632 | 0 | self.pipeline.fireChannelReadComplete() |
633 | 0 | } |
634 | 0 | } |
635 | 0 |
|
636 | 0 | self.close0(error: err, mode: .all, promise: nil) |
637 | 0 |
|
638 | 0 | // we handled all writes |
639 | 0 | return .unregister |
640 | 0 | } |
641 | 0 |
|
642 | 0 | switch writeResult.writeResult { |
643 | 0 | case .couldNotWriteEverything: |
644 | 0 | newWriteRegistrationState = .register |
645 | 0 | case .writtenCompletely(let closeState): |
646 | 0 | newWriteRegistrationState = .unregister |
647 | 0 | switch closeState { |
648 | 0 | case .open: |
649 | 0 | () |
650 | 0 | case .readyForClose: |
651 | 0 | self.close0(error: ChannelError.outputClosed, mode: .output, promise: nil) |
652 | 0 | case .closed: |
653 | 0 | () // we can be flushed before becoming active |
654 | 0 | } |
655 | 0 | } |
656 | 0 |
|
657 | 0 | if !self.isOpen || !self.hasFlushedPendingWrites() { |
658 | 0 | // No further writes, unregister. We won't re-enter the loop as both of these would have to be true. |
659 | 0 | newWriteRegistrationState = .unregister |
660 | 0 | } |
661 | 0 | } |
662 | 0 |
|
663 | 0 | assert( |
664 | 0 | (newWriteRegistrationState == .register && self.hasFlushedPendingWrites()) |
665 | 0 | || (newWriteRegistrationState == .unregister && !self.hasFlushedPendingWrites()), |
666 | 0 | "illegal flushNow decision: \(newWriteRegistrationState) and \(self.hasFlushedPendingWrites())" |
667 | 0 | ) |
668 | 0 | return newWriteRegistrationState |
669 | 0 | } |
670 | | |
671 | 0 | public final func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> { |
672 | 0 | if eventLoop.inEventLoop { |
673 | 0 | let promise = eventLoop.makePromise(of: Void.self) |
674 | 0 | executeAndComplete(promise) { try self.setOption0(option, value: value) } |
675 | 0 | return promise.futureResult |
676 | 0 | } else { |
677 | 0 | return eventLoop.submit { try self.setOption0(option, value: value) } |
678 | 0 | } |
679 | 0 | } |
680 | | |
681 | 0 | func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws { |
682 | 0 | self.eventLoop.assertInEventLoop() |
683 | 0 |
|
684 | 0 | guard isOpen else { |
685 | 0 | throw ChannelError._ioOnClosedChannel |
686 | 0 | } |
687 | 0 |
|
688 | 0 | switch option { |
689 | 0 | case let option as ChannelOptions.Types.SocketOption: |
690 | 0 | try self.setSocketOption0(level: option.optionLevel, name: option.optionName, value: value) |
691 | 0 | case _ as ChannelOptions.Types.AllocatorOption: |
692 | 0 | bufferAllocator = value as! ByteBufferAllocator |
693 | 0 | case _ as ChannelOptions.Types.RecvAllocatorOption: |
694 | 0 | self.recvBufferPool.recvAllocator = value as! RecvByteBufferAllocator |
695 | 0 | case _ as ChannelOptions.Types.AutoReadOption: |
696 | 0 | let auto = value as! Bool |
697 | 0 | let old = self.autoRead |
698 | 0 | self.autoRead = auto |
699 | 0 |
|
700 | 0 | // We only want to call read0() or pauseRead0() if we already registered to the EventLoop if not this will be automatically done |
701 | 0 | // once register0 is called. Beside this we also only need to do it when the value actually change. |
702 | 0 | if self.lifecycleManager.isPreRegistered && old != auto { |
703 | 0 | if auto { |
704 | 0 | read0() |
705 | 0 | } else { |
706 | 0 | pauseRead0() |
707 | 0 | } |
708 | 0 | } |
709 | 0 | case _ as ChannelOptions.Types.MaxMessagesPerReadOption: |
710 | 0 | self.maxMessagesPerRead = value as! UInt |
711 | 0 | self.recvBufferPool.updateCapacity(to: Int(self.maxMessagesPerRead)) |
712 | 0 | default: |
713 | 0 | fatalError("option \(option) not supported") |
714 | 0 | } |
715 | 0 | } |
716 | | |
717 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> { |
718 | 0 | if eventLoop.inEventLoop { |
719 | 0 | do { |
720 | 0 | return self.eventLoop.makeSucceededFuture(try self.getOption0(option)) |
721 | 0 | } catch { |
722 | 0 | return self.eventLoop.makeFailedFuture(error) |
723 | 0 | } |
724 | 0 | } else { |
725 | 0 | return self.eventLoop.submit { try self.getOption0(option) } |
726 | 0 | } |
727 | 0 | } |
728 | | |
729 | 0 | func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value { |
730 | 0 | self.eventLoop.assertInEventLoop() |
731 | 0 |
|
732 | 0 | guard isOpen else { |
733 | 0 | throw ChannelError._ioOnClosedChannel |
734 | 0 | } |
735 | 0 |
|
736 | 0 | switch option { |
737 | 0 | case let option as ChannelOptions.Types.SocketOption: |
738 | 0 | return try self.getSocketOption0(level: option.optionLevel, name: option.optionName) |
739 | 0 | case _ as ChannelOptions.Types.AllocatorOption: |
740 | 0 | return bufferAllocator as! Option.Value |
741 | 0 | case _ as ChannelOptions.Types.RecvAllocatorOption: |
742 | 0 | return self.recvBufferPool.recvAllocator as! Option.Value |
743 | 0 | case _ as ChannelOptions.Types.AutoReadOption: |
744 | 0 | return autoRead as! Option.Value |
745 | 0 | case _ as ChannelOptions.Types.MaxMessagesPerReadOption: |
746 | 0 | return maxMessagesPerRead as! Option.Value |
747 | 0 | default: |
748 | 0 | fatalError("option \(option) not supported") |
749 | 0 | } |
750 | 0 | } |
751 | | |
752 | | /// Triggers a `ChannelPipeline.read()` if `autoRead` is enabled.` |
753 | | /// |
754 | | /// - Returns: `true` if `readPending` is `true`, `false` otherwise. |
755 | 0 | @discardableResult func readIfNeeded0() -> Bool { |
756 | 0 | self.eventLoop.assertInEventLoop() |
757 | 0 | if !self.lifecycleManager.isActive { |
758 | 0 | return false |
759 | 0 | } |
760 | 0 |
|
761 | 0 | if !readPending && autoRead { |
762 | 0 | self.pipeline.syncOperations.read() |
763 | 0 | } |
764 | 0 | return readPending |
765 | 0 | } |
766 | | |
767 | | // Methods invoked from the HeadHandler of the ChannelPipeline |
768 | 0 | public func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
769 | 0 | self.eventLoop.assertInEventLoop() |
770 | 0 |
|
771 | 0 | guard self.isOpen else { |
772 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
773 | 0 | return |
774 | 0 | } |
775 | 0 |
|
776 | 0 | executeAndComplete(promise) { |
777 | 0 | try socket.bind(to: address) |
778 | 0 | self.updateCachedAddressesFromSocket(updateRemote: false) |
779 | 0 | } |
780 | 0 | } |
781 | | |
782 | 0 | public final func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
783 | 0 | self.eventLoop.assertInEventLoop() |
784 | 0 |
|
785 | 0 | guard self.isOpen else { |
786 | 0 | // Channel was already closed, fail the promise and not even queue it. |
787 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
788 | 0 | return |
789 | 0 | } |
790 | 0 |
|
791 | 0 | bufferPendingWrite(data: data, promise: promise) |
792 | 0 | } |
793 | | |
794 | 0 | private func registerForWritable() { |
795 | 0 | self.eventLoop.assertInEventLoop() |
796 | 0 |
|
797 | 0 | guard !self.interestedEvent.contains(.write) else { |
798 | 0 | // nothing to do if we were previously interested in write |
799 | 0 | return |
800 | 0 | } |
801 | 0 | self.safeReregister(interested: self.interestedEvent.union(.write)) |
802 | 0 | } |
803 | | |
804 | 0 | func unregisterForWritable() { |
805 | 0 | self.eventLoop.assertInEventLoop() |
806 | 0 |
|
807 | 0 | guard self.interestedEvent.contains(.write) else { |
808 | 0 | // nothing to do if we were not previously interested in write |
809 | 0 | return |
810 | 0 | } |
811 | 0 | self.safeReregister(interested: self.interestedEvent.subtracting(.write)) |
812 | 0 | } |
813 | | |
814 | 0 | public final func flush0() { |
815 | 0 | self.eventLoop.assertInEventLoop() |
816 | 0 |
|
817 | 0 | guard self.isOpen else { |
818 | 0 | return |
819 | 0 | } |
820 | 0 |
|
821 | 0 | self.markFlushPoint() |
822 | 0 |
|
823 | 0 | guard self.lifecycleManager.isActive else { |
824 | 0 | return |
825 | 0 | } |
826 | 0 |
|
827 | 0 | if !isWritePending() && flushNow() == .register { |
828 | 0 | assert(self.lifecycleManager.isPreRegistered) |
829 | 0 | registerForWritable() |
830 | 0 | } |
831 | 0 | } |
832 | | |
833 | 0 | public func read0() { |
834 | 0 | self.eventLoop.assertInEventLoop() |
835 | 0 |
|
836 | 0 | guard self.isOpen else { |
837 | 0 | return |
838 | 0 | } |
839 | 0 | readPending = true |
840 | 0 |
|
841 | 0 | if self.lifecycleManager.isPreRegistered { |
842 | 0 | registerForReadable() |
843 | 0 | } |
844 | 0 | } |
845 | | |
846 | 0 | private final func pauseRead0() { |
847 | 0 | self.eventLoop.assertInEventLoop() |
848 | 0 |
|
849 | 0 | if self.lifecycleManager.isPreRegistered { |
850 | 0 | unregisterForReadable() |
851 | 0 | } |
852 | 0 | } |
853 | | |
854 | 0 | private final func registerForReadable() { |
855 | 0 | self.eventLoop.assertInEventLoop() |
856 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
857 | 0 |
|
858 | 0 | guard !self.lifecycleManager.hasSeenEOFNotification else { |
859 | 0 | // we have seen an EOF notification before so there's no point in registering for reads |
860 | 0 | return |
861 | 0 | } |
862 | 0 |
|
863 | 0 | guard !self.interestedEvent.contains(.read) else { |
864 | 0 | return |
865 | 0 | } |
866 | 0 |
|
867 | 0 | self.safeReregister(interested: self.interestedEvent.union(.read)) |
868 | 0 | } |
869 | | |
870 | 0 | private final func registerForReadEOF() { |
871 | 0 | self.eventLoop.assertInEventLoop() |
872 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
873 | 0 |
|
874 | 0 | guard !self.lifecycleManager.hasSeenEOFNotification else { |
875 | 0 | // we have seen an EOF notification before so there's no point in registering for reads |
876 | 0 | return |
877 | 0 | } |
878 | 0 |
|
879 | 0 | guard !self.interestedEvent.contains(.readEOF) else { |
880 | 0 | return |
881 | 0 | } |
882 | 0 |
|
883 | 0 | self.safeReregister(interested: self.interestedEvent.union(.readEOF)) |
884 | 0 | } |
885 | | |
886 | 0 | internal final func unregisterForReadable() { |
887 | 0 | self.eventLoop.assertInEventLoop() |
888 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
889 | 0 |
|
890 | 0 | guard self.interestedEvent.contains(.read) else { |
891 | 0 | return |
892 | 0 | } |
893 | 0 |
|
894 | 0 | self.safeReregister(interested: self.interestedEvent.subtracting(.read)) |
895 | 0 | } |
896 | | |
897 | | /// Closes the this `BaseChannelChannel` and fulfills `promise` with the result of the _close_ operation. |
898 | | /// So unless either the deregistration or the close itself fails, `promise` will be succeeded regardless of |
899 | | /// `error`. `error` is used to fail outstanding writes (if any) and the `connectPromise` if set. |
900 | | /// |
901 | | /// - Parameters: |
902 | | /// - error: The error to fail the outstanding (if any) writes/connect with. |
903 | | /// - mode: The close mode, must be `.all` for `BaseSocketChannel` |
904 | | /// - promise: The promise that gets notified about the result of the deregistration/close operations. |
905 | 0 | public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) { |
906 | 0 | self.eventLoop.assertInEventLoop() |
907 | 0 |
|
908 | 0 | guard self.isOpen else { |
909 | 0 | promise?.fail(ChannelError._alreadyClosed) |
910 | 0 | return |
911 | 0 | } |
912 | 0 |
|
913 | 0 | guard mode == .all else { |
914 | 0 | promise?.fail(ChannelError._operationUnsupported) |
915 | 0 | return |
916 | 0 | } |
917 | 0 |
|
918 | 0 | // === BEGIN: No user callouts === |
919 | 0 |
|
920 | 0 | // this is to register all error callouts as all the callouts must happen after we transition out state |
921 | 0 | var errorCallouts: [(ChannelPipeline) -> Void] = [] |
922 | 0 |
|
923 | 0 | self.interestedEvent = .reset |
924 | 0 | do { |
925 | 0 | try selectableEventLoop.deregister(channel: self) |
926 | 0 | } catch let err { |
927 | 0 | errorCallouts.append { pipeline in |
928 | 0 | pipeline.syncOperations.fireErrorCaught(err) |
929 | 0 | } |
930 | 0 | } |
931 | 0 |
|
932 | 0 | let p: EventLoopPromise<Void>? |
933 | 0 | do { |
934 | 0 | try socket.close() |
935 | 0 | p = promise |
936 | 0 | } catch { |
937 | 0 | errorCallouts.append { (_: ChannelPipeline) in |
938 | 0 | promise?.fail(error) |
939 | 0 | // Set p to nil as we want to ensure we pass nil to becomeInactive0(...) so we not try to notify the promise again. |
940 | 0 | } |
941 | 0 | p = nil |
942 | 0 | } |
943 | 0 |
|
944 | 0 | // Transition our internal state. |
945 | 0 | let callouts = self.lifecycleManager.close() |
946 | 0 |
|
947 | 0 | // === END: No user callouts (now that our state is reconciled, we can call out to user code.) === |
948 | 0 |
|
949 | 0 | // this must be the first to call out as it transitions the PendingWritesManager into the closed state |
950 | 0 | // and we assert elsewhere that the PendingWritesManager has the same idea of 'open' as we have in here. |
951 | 0 | self.cancelWritesOnClose(error: error) |
952 | 0 |
|
953 | 0 | // this should be a no-op as we shouldn't have any |
954 | 0 | for callout in errorCallouts { |
955 | 0 | callout(self.pipeline) |
956 | 0 | } |
957 | 0 |
|
958 | 0 | if let connectPromise = self.pendingConnect { |
959 | 0 | self.pendingConnect = nil |
960 | 0 | connectPromise.fail(error) |
961 | 0 | } |
962 | 0 |
|
963 | 0 | callouts(p, self.pipeline) |
964 | 0 |
|
965 | 0 | eventLoop.execute { |
966 | 0 | // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline |
967 | 0 | self.removeHandlers(pipeline: self.pipeline) |
968 | 0 |
|
969 | 0 | self.closePromise.succeed(()) |
970 | 0 |
|
971 | 0 | // Now reset the addresses as we notified all handlers / futures. |
972 | 0 | self.unsetCachedAddressesFromSocket() |
973 | 0 | } |
974 | 0 | } |
975 | | |
976 | 0 | public final func register0(promise: EventLoopPromise<Void>?) { |
977 | 0 | self.eventLoop.assertInEventLoop() |
978 | 0 |
|
979 | 0 | guard self.isOpen else { |
980 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
981 | 0 | return |
982 | 0 | } |
983 | 0 |
|
984 | 0 | guard !self.lifecycleManager.isPreRegistered else { |
985 | 0 | promise?.fail(ChannelError._inappropriateOperationForState) |
986 | 0 | return |
987 | 0 | } |
988 | 0 |
|
989 | 0 | guard self.selectableEventLoop.isOpen else { |
990 | 0 | let error = EventLoopError._shutdown |
991 | 0 | self.pipeline.syncOperations.fireErrorCaught(error) |
992 | 0 | // `close0`'s error is about the result of the `close` operation, ... |
993 | 0 | self.close0(error: error, mode: .all, promise: nil) |
994 | 0 | // ... therefore we need to fail the registration `promise` separately. |
995 | 0 | promise?.fail(error) |
996 | 0 | return |
997 | 0 | } |
998 | 0 |
|
999 | 0 | // we can't fully register yet as epoll would give us EPOLLHUP if bind/connect wasn't called yet. |
1000 | 0 | self.lifecycleManager.beginRegistration()(promise, self.pipeline) |
1001 | 0 | } |
1002 | | |
1003 | 0 | public final func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) { |
1004 | 0 | self.eventLoop.assertInEventLoop() |
1005 | 0 | assert(self.isOpen) |
1006 | 0 | assert(!self.lifecycleManager.isActive) |
1007 | 0 | let registerPromise = self.eventLoop.makePromise(of: Void.self) |
1008 | 0 | self.register0(promise: registerPromise) |
1009 | 0 | registerPromise.futureResult.whenFailure { (_: Error) in |
1010 | 0 | self.close(promise: nil) |
1011 | 0 | } |
1012 | 0 | registerPromise.futureResult.cascadeFailure(to: promise) |
1013 | 0 |
|
1014 | 0 | if self.lifecycleManager.isPreRegistered { |
1015 | 0 | // we expect kqueue/epoll registration to always succeed which is basically true, except for errors that |
1016 | 0 | // should be fatal (EBADF, EFAULT, ESRCH, ENOMEM) and a two 'table full' (EMFILE, ENFILE) error kinds which |
1017 | 0 | // we don't handle yet but might do in the future (#469). |
1018 | 0 | try! becomeFullyRegistered0() |
1019 | 0 | if self.lifecycleManager.isRegisteredFully { |
1020 | 0 | self.becomeActive0(promise: promise) |
1021 | 0 | } |
1022 | 0 | } |
1023 | 0 | } |
1024 | | |
1025 | 0 | public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) { |
1026 | 0 | switch event { |
1027 | 0 | case let event as VsockChannelEvents.ConnectToAddress: |
1028 | 0 | self.connect0(to: .vsockAddress(event.address), promise: promise) |
1029 | 0 | default: |
1030 | 0 | promise?.fail(ChannelError._operationUnsupported) |
1031 | 0 | } |
1032 | 0 | } |
1033 | | |
1034 | | // Methods invoked from the EventLoop itself |
1035 | 0 | public final func writable() { |
1036 | 0 | self.eventLoop.assertInEventLoop() |
1037 | 0 | assert(self.isOpen) |
1038 | 0 |
|
1039 | 0 | self.finishConnect() // If we were connecting, that has finished. |
1040 | 0 |
|
1041 | 0 | switch self.flushNow() { |
1042 | 0 | case .unregister: |
1043 | 0 | // Everything was written or connect was complete, let's unregister from writable. |
1044 | 0 | self.finishWritable() |
1045 | 0 | case .register: |
1046 | 0 | assert(!self.isOpen || self.interestedEvent.contains(.write)) |
1047 | 0 | () // nothing to do because given that we just received `writable`, we're still registered for writable. |
1048 | 0 | } |
1049 | 0 | } |
1050 | | |
1051 | 0 | private func finishConnect() { |
1052 | 0 | self.eventLoop.assertInEventLoop() |
1053 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1054 | 0 |
|
1055 | 0 | if let connectPromise = self.pendingConnect { |
1056 | 0 | assert(!self.lifecycleManager.isActive) |
1057 | 0 |
|
1058 | 0 | do { |
1059 | 0 | try self.finishConnectSocket() |
1060 | 0 | } catch { |
1061 | 0 | assert(!self.lifecycleManager.isActive) |
1062 | 0 | // close0 fails the connectPromise itself so no need to do it here |
1063 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1064 | 0 | return |
1065 | 0 | } |
1066 | 0 | // now this has succeeded, becomeActive0 will actually fulfill this. |
1067 | 0 | self.pendingConnect = nil |
1068 | 0 | // We already know what the local address is. |
1069 | 0 | self.updateCachedAddressesFromSocket(updateLocal: false, updateRemote: true) |
1070 | 0 | self.becomeActive0(promise: connectPromise) |
1071 | 0 | } else { |
1072 | 0 | assert(self.lifecycleManager.isActive) |
1073 | 0 | } |
1074 | 0 | } |
1075 | | |
1076 | 0 | private func finishWritable() { |
1077 | 0 | self.eventLoop.assertInEventLoop() |
1078 | 0 |
|
1079 | 0 | if self.isOpen { |
1080 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1081 | 0 | assert(!self.hasFlushedPendingWrites()) |
1082 | 0 | self.unregisterForWritable() |
1083 | 0 | } |
1084 | 0 | } |
1085 | | |
1086 | 0 | func writeEOF() { |
1087 | 0 | fatalError("\(self) received writeEOF which is unexpected") |
1088 | 0 | } |
1089 | | |
1090 | 0 | func readEOF() { |
1091 | 0 | assert(!self.lifecycleManager.hasSeenEOFNotification) |
1092 | 0 | self.lifecycleManager.hasSeenEOFNotification = true |
1093 | 0 |
|
1094 | 0 | // we can't be not active but still registered here; this would mean that we got a notification about a |
1095 | 0 | // channel before we're ready to receive them. |
1096 | 0 | assert( |
1097 | 0 | self.lifecycleManager.isRegisteredFully, |
1098 | 0 | "illegal state: \(self): active: \(self.lifecycleManager.isActive), registered: \(self.lifecycleManager.isRegisteredFully)" |
1099 | 0 | ) |
1100 | 0 |
|
1101 | 0 | self.readEOF0() |
1102 | 0 |
|
1103 | 0 | assert(!self.interestedEvent.contains(.read)) |
1104 | 0 | assert(!self.interestedEvent.contains(.readEOF)) |
1105 | 0 | } |
1106 | | |
1107 | 0 | final func readEOF0() { |
1108 | 0 | if self.lifecycleManager.isRegisteredFully { |
1109 | 0 | // we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously |
1110 | 0 | // reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting |
1111 | 0 | // and would anyway fire constantly. |
1112 | 0 | self.safeReregister(interested: self.interestedEvent.subtracting(.readEOF)) |
1113 | 0 |
|
1114 | 0 | loop: while self.lifecycleManager.isActive { |
1115 | 0 | switch self.readable0() { |
1116 | 0 | case .eof: |
1117 | 0 | // on EOF we stop the loop and we're done with our processing for `readEOF`. |
1118 | 0 | // we could both be registered & active (if our channel supports half-closure) or unregistered & inactive (if it doesn't). |
1119 | 0 | break loop |
1120 | 0 | case .error: |
1121 | 0 | // we should be unregistered and inactive now (as `readable0` would've called close). |
1122 | 0 | assert(!self.lifecycleManager.isActive) |
1123 | 0 | assert(!self.lifecycleManager.isPreRegistered) |
1124 | 0 | break loop |
1125 | 0 | case .normal(.none): |
1126 | 0 | preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.") |
1127 | 0 | case .normal(.some): |
1128 | 0 | // normal, note that there is no guarantee we're still active (as the user might have closed in callout) |
1129 | 0 | continue loop |
1130 | 0 | } |
1131 | 0 | } |
1132 | 0 | } |
1133 | 0 | } |
1134 | | |
1135 | | // this _needs_ to synchronously cause the fd to be unregistered because we cannot unregister from `reset`. In |
1136 | | // other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering |
1137 | | // the `reset` event. |
1138 | 0 | final func reset() { |
1139 | 0 | self.readEOF0() |
1140 | 0 |
|
1141 | 0 | if self.socket.isOpen { |
1142 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1143 | 0 | let error: IOError |
1144 | 0 | // if the socket is still registered (and therefore open), let's try to get the actual socket error from the socket |
1145 | 0 | do { |
1146 | 0 | let result: Int32 = try self.socket.getOption(level: .socket, name: .so_error) |
1147 | 0 | if result != 0 { |
1148 | 0 | // we have a socket error, let's forward |
1149 | 0 | // this path will be executed on Linux (EPOLLERR) & Darwin (ev.fflags != 0) for |
1150 | 0 | // stream sockets, and most (but not all) errors on datagram sockets |
1151 | 0 | error = IOError(errnoCode: result, reason: "connection reset (error set)") |
1152 | 0 | } else { |
1153 | 0 | // we don't have a socket error, this must be connection reset without an error then |
1154 | 0 | // this path should only be executed on Linux (EPOLLHUP, no EPOLLERR) |
1155 | | #if os(Linux) |
1156 | 0 | let message: String = "connection reset (no error set)" |
1157 | | #else |
1158 | | let message: String = |
1159 | | "BUG IN SwiftNIO (possibly #572), please report! Connection reset (no error set)." |
1160 | | #endif |
1161 | 0 | error = IOError(errnoCode: ECONNRESET, reason: message) |
1162 | 0 | } |
1163 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1164 | 0 | } catch { |
1165 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1166 | 0 | } |
1167 | 0 | } |
1168 | 0 | assert(!self.lifecycleManager.isPreRegistered) |
1169 | 0 | } |
1170 | | |
1171 | 0 | public final func readable() { |
1172 | 0 | assert( |
1173 | 0 | !self.lifecycleManager.hasSeenEOFNotification, |
1174 | 0 | "got a read notification after having already seen .readEOF" |
1175 | 0 | ) |
1176 | 0 | self.readable0() |
1177 | 0 | } |
1178 | | |
1179 | | @discardableResult |
1180 | 0 | private final func readable0() -> ReadStreamState { |
1181 | 0 | self.eventLoop.assertInEventLoop() |
1182 | 0 | assert(self.lifecycleManager.isActive) |
1183 | 0 |
|
1184 | 0 | defer { |
1185 | 0 | if self.isOpen && !self.readPending { |
1186 | 0 | unregisterForReadable() |
1187 | 0 | } |
1188 | 0 | } |
1189 | 0 |
|
1190 | 0 | let readResult: ReadResult |
1191 | 0 | do { |
1192 | 0 | readResult = try self.readFromSocket() |
1193 | 0 | } catch let err { |
1194 | 0 | let readStreamState: ReadStreamState |
1195 | 0 | // ChannelError.eof is not something we want to fire through the pipeline as it just means the remote |
1196 | 0 | // peer closed / shutdown the connection. |
1197 | 0 | if let channelErr = err as? ChannelError, channelErr == ChannelError.eof { |
1198 | 0 | readStreamState = .eof |
1199 | 0 |
|
1200 | 0 | if self.lifecycleManager.isActive { |
1201 | 0 | // Directly call getOption0 as we are already on the EventLoop and so not need to create an extra future. |
1202 | 0 | // |
1203 | 0 | // getOption0 can only fail if the channel is not active anymore but we assert further up that it is. If |
1204 | 0 | // that's not the case this is a precondition failure and we would like to know. |
1205 | 0 | let allowRemoteHalfClosure = try! self.getOption0(.allowRemoteHalfClosure) |
1206 | 0 |
|
1207 | 0 | // For EOF, we always fire read complete. |
1208 | 0 | self.pipeline.syncOperations.fireChannelReadComplete() |
1209 | 0 |
|
1210 | 0 | if allowRemoteHalfClosure { |
1211 | 0 | // If we want to allow half closure we will just mark the input side of the Channel |
1212 | 0 | // as closed. |
1213 | 0 | if self.shouldCloseOnReadError(err) { |
1214 | 0 | self.close0(error: err, mode: .input, promise: nil) |
1215 | 0 | } |
1216 | 0 | self.readPending = false |
1217 | 0 | return .eof |
1218 | 0 | } |
1219 | 0 | } |
1220 | 0 | } else { |
1221 | 0 | readStreamState = .error |
1222 | 0 | self.pipeline.syncOperations.fireErrorCaught(err) |
1223 | 0 | } |
1224 | 0 |
|
1225 | 0 | if self.shouldCloseOnReadError(err) { |
1226 | 0 | self.close0(error: err, mode: .all, promise: nil) |
1227 | 0 | return readStreamState |
1228 | 0 | } else { |
1229 | 0 | // This is non-fatal, so continue as normal. |
1230 | 0 | // This constitutes "some" as we did get at least an error from the socket. |
1231 | 0 | readResult = .some |
1232 | 0 | } |
1233 | 0 | } |
1234 | 0 | // This assert needs to be disabled for io_uring, as the io_uring backend does not have the implicit synchronisation between |
1235 | 0 | // modifications to the poll mask and the actual returned events on the completion queue that kqueue and epoll has. |
1236 | 0 | // For kqueue and epoll, there is an implicit synchronisation point such that after a modification of the poll mask has been |
1237 | 0 | // issued, the next call to reap events will be sure to not include events which does not match the new poll mask. |
1238 | 0 | // Specifically for this assert, it means that we will be guaranteed to never receive a POLLIN notification unless there are |
1239 | 0 | // bytes available to read. |
1240 | 0 |
|
1241 | 0 | // For a fully asynchronous backend like io_uring, there are no such implicit synchronisation point, so after we have |
1242 | 0 | // submitted the asynchronous event to change the poll mask, we may still reap pending asynchronous replies for the old |
1243 | 0 | // poll mask, and thus receive a POLLIN even though we have modified the mask visavi the kernel. |
1244 | 0 | // Which would trigger the assert. |
1245 | 0 |
|
1246 | 0 | // The only way to avoid that race, would be to use heavy handed synchronisation primitives like IOSQE_IO_DRAIN (basically |
1247 | 0 | // flushing all pending requests and wait for a fake event result to sync up) which would be awful for performance, |
1248 | 0 | // so better skip the assert() for io_uring instead. |
1249 | | #if !SWIFTNIO_USE_IO_URING |
1250 | 0 | assert(readResult == .some) |
1251 | | #endif |
1252 | 0 | if self.lifecycleManager.isActive { |
1253 | 0 | self.pipeline.syncOperations.fireChannelReadComplete() |
1254 | 0 | } |
1255 | 0 | self.readIfNeeded0() |
1256 | 0 | return .normal(readResult) |
1257 | 0 | } |
1258 | | |
1259 | | /// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`. |
1260 | | /// |
1261 | | /// - Parameters: |
1262 | | /// - err: The `Error` which was thrown by `readFromSocket`. |
1263 | | /// - Returns: `true` if the `Channel` should be closed, `false` otherwise. |
1264 | 0 | func shouldCloseOnReadError(_ err: Error) -> Bool { |
1265 | 0 | true |
1266 | 0 | } |
1267 | | |
1268 | | /// Handles an error reported by the selector. |
1269 | | /// |
1270 | | /// Default behaviour is to treat this as if it were a reset. |
1271 | 0 | func error() -> ErrorResult { |
1272 | 0 | self.reset() |
1273 | 0 | return .fatal |
1274 | 0 | } |
1275 | | |
1276 | 0 | internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) { |
1277 | 0 | self.eventLoop.assertInEventLoop() |
1278 | 0 | assert(updateLocal || updateRemote) |
1279 | 0 | let cached = self.addressesCached |
1280 | 0 | let local = updateLocal ? try? self.localAddress0() : cached.local |
1281 | 0 | let remote = updateRemote ? try? self.remoteAddress0() : cached.remote |
1282 | 0 | self.addressesCached = AddressCache(local: local, remote: remote) |
1283 | 0 | } |
1284 | | |
1285 | 0 | internal final func unsetCachedAddressesFromSocket() { |
1286 | 0 | self.eventLoop.assertInEventLoop() |
1287 | 0 | self.addressesCached = AddressCache(local: nil, remote: nil) |
1288 | 0 | } |
1289 | | |
1290 | 0 | public final func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1291 | 0 | self.connect0(to: .socketAddress(address), promise: promise) |
1292 | 0 | } |
1293 | | |
1294 | 0 | internal final func connect0(to target: ConnectTarget, promise: EventLoopPromise<Void>?) { |
1295 | 0 | self.eventLoop.assertInEventLoop() |
1296 | 0 |
|
1297 | 0 | guard self.isOpen else { |
1298 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
1299 | 0 | return |
1300 | 0 | } |
1301 | 0 |
|
1302 | 0 | guard pendingConnect == nil else { |
1303 | 0 | promise?.fail(ChannelError._connectPending) |
1304 | 0 | return |
1305 | 0 | } |
1306 | 0 |
|
1307 | 0 | guard self.lifecycleManager.isPreRegistered else { |
1308 | 0 | promise?.fail(ChannelError._inappropriateOperationForState) |
1309 | 0 | return |
1310 | 0 | } |
1311 | 0 |
|
1312 | 0 | do { |
1313 | 0 | if try !self.connectSocket(to: target) { |
1314 | 0 | // We aren't connected, we'll get the remote address later. |
1315 | 0 | self.updateCachedAddressesFromSocket(updateLocal: true, updateRemote: false) |
1316 | 0 | if promise != nil { |
1317 | 0 | self.pendingConnect = promise |
1318 | 0 | } else { |
1319 | 0 | self.pendingConnect = eventLoop.makePromise() |
1320 | 0 | } |
1321 | 0 | try self.becomeFullyRegistered0() |
1322 | 0 | self.registerForWritable() |
1323 | 0 | } else { |
1324 | 0 | self.updateCachedAddressesFromSocket() |
1325 | 0 | self.becomeActive0(promise: promise) |
1326 | 0 | } |
1327 | 0 | } catch let error { |
1328 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1329 | 0 | // We would like to have this assertion here, but we want to be able to go through this |
1330 | 0 | // code path in cases where connect() is being called on channels that are already active. |
1331 | 0 | //assert(!self.lifecycleManager.isActive) |
1332 | 0 | // We're going to set the promise as the pending connect promise, and let close0 fail it for us. |
1333 | 0 | self.pendingConnect = promise |
1334 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1335 | 0 | } |
1336 | 0 | } |
1337 | | |
1338 | 0 | public func channelRead0(_ data: NIOAny) { |
1339 | 0 | // Do nothing by default |
1340 | 0 | // note: we can't assert that we're active here as TailChannelHandler will call this on channelRead |
1341 | 0 | } |
1342 | | |
1343 | 0 | public func errorCaught0(error: Error) { |
1344 | 0 | // Do nothing |
1345 | 0 | } |
1346 | | |
1347 | 0 | private func isWritePending() -> Bool { |
1348 | 0 | self.interestedEvent.contains(.write) |
1349 | 0 | } |
1350 | | |
1351 | 0 | private final func safeReregister(interested: SelectorEventSet) { |
1352 | 0 | self.eventLoop.assertInEventLoop() |
1353 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
1354 | 0 |
|
1355 | 0 | guard self.isOpen else { |
1356 | 0 | assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) even though we're closed") |
1357 | 0 | return |
1358 | 0 | } |
1359 | 0 | if interested == interestedEvent { |
1360 | 0 | // we don't need to update and so cause a syscall if we already are registered with the correct event |
1361 | 0 | return |
1362 | 0 | } |
1363 | 0 | interestedEvent = interested |
1364 | 0 | do { |
1365 | 0 | try selectableEventLoop.reregister(channel: self) |
1366 | 0 | } catch let err { |
1367 | 0 | self.pipeline.syncOperations.fireErrorCaught(err) |
1368 | 0 | self.close0(error: err, mode: .all, promise: nil) |
1369 | 0 | } |
1370 | 0 | } |
1371 | | |
1372 | 0 | private func safeRegister(interested: SelectorEventSet) throws { |
1373 | 0 | self.eventLoop.assertInEventLoop() |
1374 | 0 | assert(!self.lifecycleManager.isRegisteredFully) |
1375 | 0 |
|
1376 | 0 | guard self.isOpen else { |
1377 | 0 | throw ChannelError._ioOnClosedChannel |
1378 | 0 | } |
1379 | 0 |
|
1380 | 0 | self.interestedEvent = interested |
1381 | 0 | do { |
1382 | 0 | try self.selectableEventLoop.register(channel: self) |
1383 | 0 | } catch { |
1384 | 0 | self.pipeline.syncOperations.fireErrorCaught(error) |
1385 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1386 | 0 | throw error |
1387 | 0 | } |
1388 | 0 | } |
1389 | | |
1390 | 0 | final func becomeFullyRegistered0() throws { |
1391 | 0 | self.eventLoop.assertInEventLoop() |
1392 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1393 | 0 | assert(!self.lifecycleManager.isRegisteredFully) |
1394 | 0 |
|
1395 | 0 | // The initial set of interested events must not contain `.readEOF` because when connect doesn't return |
1396 | 0 | // synchronously, kevent might send us a `readEOF` because the `writable` event that marks the connect as completed. |
1397 | 0 | // See SocketChannelTest.testServerClosesTheConnectionImmediately for a regression test. |
1398 | 0 | try self.safeRegister(interested: [.reset, .error]) |
1399 | 0 | self.lifecycleManager.finishRegistration()(nil, self.pipeline) |
1400 | 0 | } |
1401 | | |
1402 | 0 | final func becomeActive0(promise: EventLoopPromise<Void>?) { |
1403 | 0 | self.eventLoop.assertInEventLoop() |
1404 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1405 | 0 | if !self.lifecycleManager.isRegisteredFully { |
1406 | 0 | do { |
1407 | 0 | try self.becomeFullyRegistered0() |
1408 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
1409 | 0 | } catch { |
1410 | 0 | self.close0(error: error, mode: .all, promise: promise) |
1411 | 0 | return |
1412 | 0 | } |
1413 | 0 | } |
1414 | 0 | self.lifecycleManager.beginActivation()(promise, self.pipeline) |
1415 | 0 | guard self.lifecycleManager.isActive else { |
1416 | 0 | // the channel got closed in the promise succeed closure |
1417 | 0 | return |
1418 | 0 | } |
1419 | 0 | // if the channel wasn't closed, continue to fully activate |
1420 | 0 | self.lifecycleManager.finishActivation()(nil, self.pipeline) |
1421 | 0 | guard self.lifecycleManager.isOpen else { |
1422 | 0 | // in the user callout for `channelActive` the channel got closed. |
1423 | 0 | return |
1424 | 0 | } |
1425 | 0 | self.registerForReadEOF() |
1426 | 0 |
|
1427 | 0 | // Flush any pending writes. If after the flush we're still open, make sure |
1428 | 0 | // our registration is appropriate. |
1429 | 0 | switch self.flushNow() { |
1430 | 0 | case .register: |
1431 | 0 | if self.lifecycleManager.isOpen && !self.interestedEvent.contains(.write) { |
1432 | 0 | self.registerForWritable() |
1433 | 0 | } |
1434 | 0 | case .unregister: |
1435 | 0 | if self.lifecycleManager.isOpen && self.interestedEvent.contains(.write) { |
1436 | 0 | self.unregisterForWritable() |
1437 | 0 | } |
1438 | 0 | } |
1439 | 0 |
|
1440 | 0 | self.readIfNeeded0() |
1441 | 0 | } |
1442 | | |
1443 | 0 | func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws { |
1444 | 0 | fatalError("must override") |
1445 | 0 | } |
1446 | | |
1447 | 0 | func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws { |
1448 | 0 | fatalError("must override") |
1449 | 0 | } |
1450 | | |
1451 | 0 | func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws { |
1452 | 0 | fatalError("must override") |
1453 | 0 | } |
1454 | | } |
1455 | | |
1456 | | extension BaseSocketChannel { |
1457 | | public struct SynchronousOptions: NIOSynchronousChannelOptions { |
1458 | | @usableFromInline // should be private |
1459 | | internal let _channel: BaseSocketChannel<SocketType> |
1460 | | |
1461 | | @inlinable // should be fileprivate |
1462 | 0 | internal init(_channel channel: BaseSocketChannel<SocketType>) { |
1463 | 0 | self._channel = channel |
1464 | 0 | } |
1465 | | |
1466 | | @inlinable |
1467 | 0 | public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws { |
1468 | 0 | try self._channel.setOption0(option, value: value) |
1469 | 0 | } |
1470 | | |
1471 | | @inlinable |
1472 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value { |
1473 | 0 | try self._channel.getOption0(option) |
1474 | 0 | } |
1475 | | } |
1476 | | |
1477 | 0 | public final var syncOptions: NIOSynchronousChannelOptions? { |
1478 | 0 | SynchronousOptions(_channel: self) |
1479 | 0 | } |
1480 | | } |
1481 | | |
1482 | | /// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`). |
1483 | 0 | func executeAndComplete<Value: Sendable>(_ promise: EventLoopPromise<Value>?, _ body: () throws -> Value) { |
1484 | 0 | do { |
1485 | 0 | let result = try body() |
1486 | 0 | promise?.succeed(result) |
1487 | 0 | } catch let e { |
1488 | 0 | promise?.fail(e) |
1489 | 0 | } |
1490 | 0 | } |
1491 | | #endif // !os(WASI) |