/src/swift-nio/Sources/NIOPosix/BaseSocketChannel.swift
Line | Count | Source (jump to first uncovered line) |
1 | | //===----------------------------------------------------------------------===// |
2 | | // |
3 | | // This source file is part of the SwiftNIO open source project |
4 | | // |
5 | | // Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors |
6 | | // Licensed under Apache License v2.0 |
7 | | // |
8 | | // See LICENSE.txt for license information |
9 | | // See CONTRIBUTORS.txt for the list of SwiftNIO project authors |
10 | | // |
11 | | // SPDX-License-Identifier: Apache-2.0 |
12 | | // |
13 | | //===----------------------------------------------------------------------===// |
14 | | |
15 | | import Atomics |
16 | | import NIOConcurrencyHelpers |
17 | | import NIOCore |
18 | | |
19 | | private struct SocketChannelLifecycleManager { |
20 | | // MARK: Types |
21 | | private enum State { |
22 | | case fresh |
23 | | case preRegistered // register() has been run but the selector doesn't know about it yet |
24 | | case fullyRegistered // fully registered, ie. the selector knows about it |
25 | | case activated |
26 | | case closed |
27 | | } |
28 | | |
29 | | private enum Event { |
30 | | case activate |
31 | | case beginRegistration |
32 | | case finishRegistration |
33 | | case close |
34 | | } |
35 | | |
36 | | // MARK: properties |
37 | | private let eventLoop: EventLoop |
38 | | // this is queried from the Channel, ie. must be thread-safe |
39 | | internal let isActiveAtomic: ManagedAtomic<Bool> |
40 | | // these are only to be accessed on the EventLoop |
41 | | |
42 | | // have we seen the `.readEOF` notification |
43 | | // note: this can be `false` on a deactivated channel, we might just have torn it down. |
44 | 0 | var hasSeenEOFNotification: Bool = false |
45 | | |
46 | | // Should we support transition from `active` to `active`, used by datagram sockets. |
47 | | let supportsReconnect: Bool |
48 | | |
49 | 0 | private var currentState: State = .fresh { |
50 | 0 | didSet { |
51 | 0 | self.eventLoop.assertInEventLoop() |
52 | 0 | switch (oldValue, self.currentState) { |
53 | 0 | case (_, .activated): |
54 | 0 | self.isActiveAtomic.store(true, ordering: .relaxed) |
55 | 0 | case (.activated, _): |
56 | 0 | self.isActiveAtomic.store(false, ordering: .relaxed) |
57 | 0 | default: |
58 | 0 | () |
59 | 0 | } |
60 | 0 | } |
61 | | } |
62 | | |
63 | | // MARK: API |
64 | | // isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable |
65 | | internal init( |
66 | | eventLoop: EventLoop, |
67 | | isActiveAtomic: ManagedAtomic<Bool>, |
68 | | supportReconnect: Bool |
69 | 0 | ) { |
70 | 0 | self.eventLoop = eventLoop |
71 | 0 | self.isActiveAtomic = isActiveAtomic |
72 | 0 | self.supportsReconnect = supportReconnect |
73 | 0 | } |
74 | | |
75 | | // this is called from Channel's deinit, so don't assert we're on the EventLoop! |
76 | 0 | internal var canBeDestroyed: Bool { |
77 | 0 | self.currentState == .closed |
78 | 0 | } |
79 | | |
80 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
81 | | @inline(__always) |
82 | 0 | internal mutating func beginRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
83 | 0 | self.moveState(event: .beginRegistration) |
84 | 0 | } |
85 | | |
86 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
87 | | @inline(__always) |
88 | 0 | internal mutating func finishRegistration() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
89 | 0 | self.moveState(event: .finishRegistration) |
90 | 0 | } |
91 | | |
92 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
93 | | @inline(__always) |
94 | 0 | internal mutating func close() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
95 | 0 | self.moveState(event: .close) |
96 | 0 | } |
97 | | |
98 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
99 | | @inline(__always) |
100 | 0 | internal mutating func activate() -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
101 | 0 | self.moveState(event: .activate) |
102 | 0 | } |
103 | | |
104 | | // MARK: private API |
105 | | // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined |
106 | | @inline(__always) |
107 | 0 | private mutating func moveState(event: Event) -> ((EventLoopPromise<Void>?, ChannelPipeline) -> Void) { |
108 | 0 | self.eventLoop.assertInEventLoop() |
109 | 0 |
|
110 | 0 | switch (self.currentState, event) { |
111 | 0 | // origin: .fresh |
112 | 0 | case (.fresh, .beginRegistration): |
113 | 0 | self.currentState = .preRegistered |
114 | 0 | return { promise, pipeline in |
115 | 0 | promise?.succeed(()) |
116 | 0 | pipeline.syncOperations.fireChannelRegistered() |
117 | 0 | } |
118 | 0 |
|
119 | 0 | case (.fresh, .close): |
120 | 0 | self.currentState = .closed |
121 | 0 | return { (promise, _: ChannelPipeline) in |
122 | 0 | promise?.succeed(()) |
123 | 0 | } |
124 | 0 |
|
125 | 0 | // origin: .preRegistered |
126 | 0 | case (.preRegistered, .finishRegistration): |
127 | 0 | self.currentState = .fullyRegistered |
128 | 0 | return { (promise, _: ChannelPipeline) in |
129 | 0 | promise?.succeed(()) |
130 | 0 | } |
131 | 0 |
|
132 | 0 | // origin: .fullyRegistered |
133 | 0 | case (.fullyRegistered, .activate): |
134 | 0 | self.currentState = .activated |
135 | 0 | return { promise, pipeline in |
136 | 0 | promise?.succeed(()) |
137 | 0 | pipeline.syncOperations.fireChannelActive() |
138 | 0 | } |
139 | 0 |
|
140 | 0 | // origin: .preRegistered || .fullyRegistered |
141 | 0 | case (.preRegistered, .close), (.fullyRegistered, .close): |
142 | 0 | self.currentState = .closed |
143 | 0 | return { promise, pipeline in |
144 | 0 | promise?.succeed(()) |
145 | 0 | pipeline.syncOperations.fireChannelUnregistered() |
146 | 0 | } |
147 | 0 |
|
148 | 0 | // origin: .activated |
149 | 0 | case (.activated, .close): |
150 | 0 | self.currentState = .closed |
151 | 0 | return { promise, pipeline in |
152 | 0 | promise?.succeed(()) |
153 | 0 | pipeline.syncOperations.fireChannelInactive() |
154 | 0 | pipeline.syncOperations.fireChannelUnregistered() |
155 | 0 | } |
156 | 0 |
|
157 | 0 | // origin: .activated |
158 | 0 | case (.activated, .activate) where self.supportsReconnect: |
159 | 0 | return { promise, pipeline in |
160 | 0 | promise?.succeed(()) |
161 | 0 | } |
162 | 0 |
|
163 | 0 | // bad transitions |
164 | 0 | case (.fresh, .activate), // should go through .registered first |
165 | 0 | (.preRegistered, .activate), // need to first be fully registered |
166 | 0 | (.preRegistered, .beginRegistration), // already registered |
167 | 0 | (.fullyRegistered, .beginRegistration), // already registered |
168 | 0 | (.activated, .activate), // already activated |
169 | 0 | (.activated, .beginRegistration), // already fully registered (and activated) |
170 | 0 | (.activated, .finishRegistration), // already fully registered (and activated) |
171 | 0 | (.fullyRegistered, .finishRegistration), // already fully registered |
172 | 0 | (.fresh, .finishRegistration), // need to register lazily first |
173 | 0 | (.closed, _): // already closed |
174 | 0 | self.badTransition(event: event) |
175 | 0 | } |
176 | 0 | } |
177 | | |
178 | 0 | private func badTransition(event: Event) -> Never { |
179 | 0 | preconditionFailure("illegal transition: state=\(self.currentState), event=\(event)") |
180 | 0 | } |
181 | | |
182 | | // MARK: convenience properties |
183 | 0 | internal var isActive: Bool { |
184 | 0 | self.eventLoop.assertInEventLoop() |
185 | 0 | return self.currentState == .activated |
186 | 0 | } |
187 | | |
188 | 0 | internal var isPreRegistered: Bool { |
189 | 0 | self.eventLoop.assertInEventLoop() |
190 | 0 | switch self.currentState { |
191 | 0 | case .fresh, .closed: |
192 | 0 | return false |
193 | 0 | case .preRegistered, .fullyRegistered, .activated: |
194 | 0 | return true |
195 | 0 | } |
196 | 0 | } |
197 | | |
198 | 0 | internal var isRegisteredFully: Bool { |
199 | 0 | self.eventLoop.assertInEventLoop() |
200 | 0 | switch self.currentState { |
201 | 0 | case .fresh, .closed, .preRegistered: |
202 | 0 | return false |
203 | 0 | case .fullyRegistered, .activated: |
204 | 0 | return true |
205 | 0 | } |
206 | 0 | } |
207 | | |
208 | | /// Returns whether the underlying file descriptor is open. This property will always be true (even before registration) |
209 | | /// until the Channel is closed. |
210 | 0 | internal var isOpen: Bool { |
211 | 0 | self.eventLoop.assertInEventLoop() |
212 | 0 | return self.currentState != .closed |
213 | 0 | } |
214 | | } |
215 | | |
216 | | /// The base class for all socket-based channels in NIO. |
217 | | /// |
218 | | /// There are many types of specialised socket-based channel in NIO. Each of these |
219 | | /// has different logic regarding how exactly they read from and write to the network. |
220 | | /// However, they share a great deal of common logic around the managing of their |
221 | | /// file descriptors. |
222 | | /// |
223 | | /// For this reason, `BaseSocketChannel` exists to provide a common core implementation of |
224 | | /// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks |
225 | | /// for subclasses to implement the specific logic to handle their writes and reads. |
226 | | class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, ChannelCore, @unchecked Sendable { |
227 | | typealias SelectableType = SocketType.SelectableType |
228 | | |
229 | | struct AddressCache { |
230 | | // deliberately lets because they must always be updated together (so forcing `init` is useful). |
231 | | let local: SocketAddress? |
232 | | let remote: SocketAddress? |
233 | | |
234 | 0 | init(local: SocketAddress?, remote: SocketAddress?) { |
235 | 0 | self.local = local |
236 | 0 | self.remote = remote |
237 | 0 | } |
238 | | } |
239 | | |
240 | | // MARK: - Stored Properties |
241 | | // MARK: Constants & atomics (accessible everywhere) |
242 | | public let parent: Channel? |
243 | | internal let socket: SocketType |
244 | | private let closePromise: EventLoopPromise<Void> |
245 | | internal let selectableEventLoop: SelectableEventLoop |
246 | 0 | private let _offEventLoopLock = NIOLock() |
247 | 0 | private let isActiveAtomic: ManagedAtomic<Bool> = .init(false) |
248 | | // just a thread-safe way of having something to print about the socket from any thread |
249 | | internal let socketDescription: String |
250 | | |
251 | | // MARK: Variables, on EventLoop thread only |
252 | 0 | var readPending = false |
253 | | var pendingConnect: Optional<EventLoopPromise<Void>> |
254 | | var recvBufferPool: NIOPooledRecvBufferAllocator |
255 | 0 | var maxMessagesPerRead: UInt = 4 |
256 | 0 | private var inFlushNow: Bool = false // Guard against re-entrance of flushNow() method. |
257 | 0 | private var autoRead: Bool = true |
258 | | |
259 | | // MARK: Variables that are really constant |
260 | | // this is really a constant (set in .init) but needs `self` to be constructed and |
261 | | // therefore a `var`. Do not change as this needs to accessed from arbitrary threads |
262 | 0 | private var _pipeline: ChannelPipeline! = nil |
263 | | |
264 | | // MARK: Special variables, please read comments. |
265 | | // For reads guarded by _either_ `self._offEventLoopLock` or the EL thread |
266 | | // Writes are guarded by _offEventLoopLock _and_ the EL thread. |
267 | | // PLEASE don't use these directly and use the non-underscored computed properties instead. |
268 | 0 | private var _addressCache = AddressCache(local: nil, remote: nil) // please use `self.addressesCached` instead |
269 | | private var _bufferAllocatorCache: ByteBufferAllocator // please use `self.bufferAllocatorCached` instead. |
270 | | |
271 | | // MARK: - Computed properties |
272 | | // This is called from arbitrary threads. |
273 | | internal var addressesCached: AddressCache { |
274 | 0 | get { |
275 | 0 | if self.eventLoop.inEventLoop { |
276 | 0 | return self._addressCache |
277 | 0 | } else { |
278 | 0 | return self._offEventLoopLock.withLock { |
279 | 0 | self._addressCache |
280 | 0 | } |
281 | 0 | } |
282 | 0 | } |
283 | 0 | set { |
284 | 0 | self.eventLoop.preconditionInEventLoop() |
285 | 0 | self._offEventLoopLock.withLock { |
286 | 0 | self._addressCache = newValue |
287 | 0 | } |
288 | 0 | } |
289 | | } |
290 | | |
291 | | // This is called from arbitrary threads. |
292 | | private var bufferAllocatorCached: ByteBufferAllocator { |
293 | 0 | get { |
294 | 0 | if self.eventLoop.inEventLoop { |
295 | 0 | return self._bufferAllocatorCache |
296 | 0 | } else { |
297 | 0 | return self._offEventLoopLock.withLock { |
298 | 0 | self._bufferAllocatorCache |
299 | 0 | } |
300 | 0 | } |
301 | 0 | } |
302 | 0 | set { |
303 | 0 | self.eventLoop.preconditionInEventLoop() |
304 | 0 | self._offEventLoopLock.withLock { |
305 | 0 | self._bufferAllocatorCache = newValue |
306 | 0 | } |
307 | 0 | } |
308 | | } |
309 | | |
310 | | // We start with the invalid empty set of selector events we're interested in. This is to make sure we later on |
311 | | // (in `becomeFullyRegistered0`) seed the initial event correctly. |
312 | 0 | internal var interestedEvent: SelectorEventSet = [] { |
313 | 0 | didSet { |
314 | 0 | assert(self.interestedEvent.contains(.reset), "impossible to unregister for reset") |
315 | 0 | } |
316 | | } |
317 | | |
318 | | private var lifecycleManager: SocketChannelLifecycleManager { |
319 | 0 | didSet { |
320 | 0 | self.eventLoop.assertInEventLoop() |
321 | 0 | } |
322 | | } |
323 | | |
324 | 0 | private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() { |
325 | 0 | didSet { |
326 | 0 | self.eventLoop.assertInEventLoop() |
327 | 0 | self.bufferAllocatorCached = self.bufferAllocator |
328 | 0 | } |
329 | | } |
330 | | |
331 | 0 | public final var _channelCore: ChannelCore { self } |
332 | | |
333 | | // This is `Channel` API so must be thread-safe. |
334 | 0 | public final var localAddress: SocketAddress? { |
335 | 0 | self.addressesCached.local |
336 | 0 | } |
337 | | |
338 | | // This is `Channel` API so must be thread-safe. |
339 | 0 | public final var remoteAddress: SocketAddress? { |
340 | 0 | self.addressesCached.remote |
341 | 0 | } |
342 | | |
343 | | /// `false` if the whole `Channel` is closed and so no more IO operation can be done. |
344 | 0 | var isOpen: Bool { |
345 | 0 | self.eventLoop.assertInEventLoop() |
346 | 0 | return self.lifecycleManager.isOpen |
347 | 0 | } |
348 | | |
349 | 0 | var isRegistered: Bool { |
350 | 0 | self.eventLoop.assertInEventLoop() |
351 | 0 | return self.lifecycleManager.isPreRegistered |
352 | 0 | } |
353 | | |
354 | | // This is `Channel` API so must be thread-safe. |
355 | 0 | public var isActive: Bool { |
356 | 0 | self.isActiveAtomic.load(ordering: .relaxed) |
357 | 0 | } |
358 | | |
359 | | // This is `Channel` API so must be thread-safe. |
360 | 0 | public final var closeFuture: EventLoopFuture<Void> { |
361 | 0 | self.closePromise.futureResult |
362 | 0 | } |
363 | | |
364 | 0 | public final var eventLoop: EventLoop { |
365 | 0 | selectableEventLoop |
366 | 0 | } |
367 | | |
368 | | // This is `Channel` API so must be thread-safe. |
369 | 0 | public var isWritable: Bool { |
370 | 0 | true |
371 | 0 | } |
372 | | |
373 | | // This is `Channel` API so must be thread-safe. |
374 | 0 | public final var allocator: ByteBufferAllocator { |
375 | 0 | self.bufferAllocatorCached |
376 | 0 | } |
377 | | |
378 | | // This is `Channel` API so must be thread-safe. |
379 | 0 | public final var pipeline: ChannelPipeline { |
380 | 0 | self._pipeline |
381 | 0 | } |
382 | | |
383 | | // MARK: Methods to override in subclasses. |
384 | 0 | func writeToSocket() throws -> OverallWriteResult { |
385 | 0 | fatalError("must be overridden") |
386 | 0 | } |
387 | | |
388 | | /// Read data from the underlying socket and dispatch it to the `ChannelPipeline` |
389 | | /// |
390 | | /// - Returns: `true` if any data was read, `false` otherwise. |
391 | 0 | @discardableResult func readFromSocket() throws -> ReadResult { |
392 | 0 | fatalError("this must be overridden by sub class") |
393 | 0 | } |
394 | | |
395 | | // MARK: - Datatypes |
396 | | |
397 | | /// Indicates if a selectable should registered or not for IO notifications. |
398 | | enum IONotificationState { |
399 | | /// We should be registered for IO notifications. |
400 | | case register |
401 | | |
402 | | /// We should not be registered for IO notifications. |
403 | | case unregister |
404 | | } |
405 | | |
406 | | enum ReadResult { |
407 | | /// Nothing was read by the read operation. |
408 | | case none |
409 | | |
410 | | /// Some data was read by the read operation. |
411 | | case some |
412 | | } |
413 | | |
414 | | /// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream. |
415 | | /// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course) |
416 | | private enum ReadStreamState: Equatable { |
417 | | /// Everything seems normal |
418 | | case normal(ReadResult) |
419 | | |
420 | | /// We saw EOF. |
421 | | case eof |
422 | | |
423 | | /// A read error was received. |
424 | | case error |
425 | | } |
426 | | |
427 | | /// Begin connection of the underlying socket. |
428 | | /// |
429 | | /// - Parameters: |
430 | | /// - to: The `SocketAddress` to connect to. |
431 | | /// - Returns: `true` if the socket connected synchronously, `false` otherwise. |
432 | 0 | func connectSocket(to address: SocketAddress) throws -> Bool { |
433 | 0 | fatalError("this must be overridden by sub class") |
434 | 0 | } |
435 | | |
436 | | /// Begin connection of the underlying socket. |
437 | | /// |
438 | | /// - Parameters: |
439 | | /// - to: The `VsockAddress` to connect to. |
440 | | /// - Returns: `true` if the socket connected synchronously, `false` otherwise. |
441 | 0 | func connectSocket(to address: VsockAddress) throws -> Bool { |
442 | 0 | fatalError("this must be overridden by sub class") |
443 | 0 | } |
444 | | |
445 | | enum ConnectTarget { |
446 | | case socketAddress(SocketAddress) |
447 | | case vsockAddress(VsockAddress) |
448 | | } |
449 | | |
450 | | /// Begin connection of the underlying socket. |
451 | | /// |
452 | | /// - Parameters: |
453 | | /// - to: The target to connect to. |
454 | | /// - Returns: `true` if the socket connected synchronously, `false` otherwise. |
455 | 0 | final func connectSocket(to target: ConnectTarget) throws -> Bool { |
456 | 0 | switch target { |
457 | 0 | case .socketAddress(let address): |
458 | 0 | return try self.connectSocket(to: address) |
459 | 0 | case .vsockAddress(let address): |
460 | 0 | return try self.connectSocket(to: address) |
461 | 0 | } |
462 | 0 | } |
463 | | |
464 | | /// Make any state changes needed to complete the connection process. |
465 | 0 | func finishConnectSocket() throws { |
466 | 0 | fatalError("this must be overridden by sub class") |
467 | 0 | } |
468 | | |
469 | | /// Returns if there are any flushed, pending writes to be sent over the network. |
470 | 0 | func hasFlushedPendingWrites() -> Bool { |
471 | 0 | fatalError("this must be overridden by sub class") |
472 | 0 | } |
473 | | |
474 | | /// Buffer a write in preparation for a flush. |
475 | 0 | func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) { |
476 | 0 | fatalError("this must be overridden by sub class") |
477 | 0 | } |
478 | | |
479 | | /// Mark a flush point. This is called when flush is received, and instructs |
480 | | /// the implementation to record the flush. |
481 | 0 | func markFlushPoint() { |
482 | 0 | fatalError("this must be overridden by sub class") |
483 | 0 | } |
484 | | |
485 | | /// Called when closing, to instruct the specific implementation to discard all pending |
486 | | /// writes. |
487 | 0 | func cancelWritesOnClose(error: Error) { |
488 | 0 | fatalError("this must be overridden by sub class") |
489 | 0 | } |
490 | | |
491 | | // MARK: Common base socket logic. |
492 | | init( |
493 | | socket: SocketType, |
494 | | parent: Channel?, |
495 | | eventLoop: SelectableEventLoop, |
496 | | recvAllocator: RecvByteBufferAllocator, |
497 | | supportReconnect: Bool |
498 | 0 | ) throws { |
499 | 0 | self._bufferAllocatorCache = self.bufferAllocator |
500 | 0 | self.socket = socket |
501 | 0 | self.selectableEventLoop = eventLoop |
502 | 0 | self.closePromise = eventLoop.makePromise() |
503 | 0 | self.parent = parent |
504 | 0 | self.recvBufferPool = .init(capacity: Int(self.maxMessagesPerRead), recvAllocator: recvAllocator) |
505 | 0 | // As the socket may already be connected we should ensure we start with the correct addresses cached. |
506 | 0 | self._addressCache = .init(local: try? socket.localAddress(), remote: try? socket.remoteAddress()) |
507 | 0 | self.lifecycleManager = SocketChannelLifecycleManager( |
508 | 0 | eventLoop: eventLoop, |
509 | 0 | isActiveAtomic: self.isActiveAtomic, |
510 | 0 | supportReconnect: supportReconnect |
511 | 0 | ) |
512 | 0 | self.socketDescription = socket.description |
513 | 0 | self.pendingConnect = nil |
514 | 0 | self._pipeline = ChannelPipeline(channel: self) |
515 | 0 | } |
516 | | |
517 | 0 | deinit { |
518 | 0 | assert( |
519 | 0 | self.lifecycleManager.canBeDestroyed, |
520 | 0 | "leak of open Channel, state: \(String(describing: self.lifecycleManager))" |
521 | 0 | ) |
522 | 0 | } |
523 | | |
524 | 0 | public final func localAddress0() throws -> SocketAddress { |
525 | 0 | self.eventLoop.assertInEventLoop() |
526 | 0 | guard self.isOpen else { |
527 | 0 | throw ChannelError._ioOnClosedChannel |
528 | 0 | } |
529 | 0 | return try self.socket.localAddress() |
530 | 0 | } |
531 | | |
532 | 0 | public final func remoteAddress0() throws -> SocketAddress { |
533 | 0 | self.eventLoop.assertInEventLoop() |
534 | 0 | guard self.isOpen else { |
535 | 0 | throw ChannelError._ioOnClosedChannel |
536 | 0 | } |
537 | 0 | return try self.socket.remoteAddress() |
538 | 0 | } |
539 | | |
540 | | /// Flush data to the underlying socket and return if this socket needs to be registered for write notifications. |
541 | | /// |
542 | | /// This method can be called re-entrantly but it will return immediately because the first call is responsible |
543 | | /// for sending all flushed writes, even the ones that are accumulated whilst `flushNow()` is running. |
544 | | /// |
545 | | /// - Returns: If this socket should be registered for write notifications. Ie. `IONotificationState.register` if |
546 | | /// _not_ all data could be written, so notifications are necessary; and `IONotificationState.unregister` |
547 | | /// if everything was written and we don't need to be notified about writability at the moment. |
548 | 0 | func flushNow() -> IONotificationState { |
549 | 0 | self.eventLoop.assertInEventLoop() |
550 | 0 |
|
551 | 0 | // Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by |
552 | 0 | // `writeToSocket`. |
553 | 0 | guard !self.inFlushNow else { |
554 | 0 | return .unregister |
555 | 0 | } |
556 | 0 |
|
557 | 0 | assert(!self.inFlushNow) |
558 | 0 | self.inFlushNow = true |
559 | 0 | defer { |
560 | 0 | self.inFlushNow = false |
561 | 0 | } |
562 | 0 |
|
563 | 0 | var newWriteRegistrationState: IONotificationState = .unregister |
564 | 0 | while newWriteRegistrationState == .unregister && self.hasFlushedPendingWrites() && self.isOpen { |
565 | 0 | let writeResult: OverallWriteResult |
566 | 0 | do { |
567 | 0 | assert(self.lifecycleManager.isActive) |
568 | 0 | writeResult = try self.writeToSocket() |
569 | 0 | if writeResult.writabilityChange { |
570 | 0 | // We went from not writable to writable. |
571 | 0 | self.pipeline.syncOperations.fireChannelWritabilityChanged() |
572 | 0 | } |
573 | 0 | } catch let err { |
574 | 0 | // If there is a write error we should try drain the inbound before closing the socket as there may be some data pending. |
575 | 0 | // We ignore any error that is thrown as we will use the original err to close the channel and notify the user. |
576 | 0 | if self.readIfNeeded0() { |
577 | 0 | assert(self.lifecycleManager.isActive) |
578 | 0 |
|
579 | 0 | // We need to continue reading until there is nothing more to be read from the socket as we will not have another chance to drain it. |
580 | 0 | var readAtLeastOnce = false |
581 | 0 | while let read = try? self.readFromSocket(), read == .some { |
582 | 0 | readAtLeastOnce = true |
583 | 0 | } |
584 | 0 | if readAtLeastOnce && self.lifecycleManager.isActive { |
585 | 0 | self.pipeline.fireChannelReadComplete() |
586 | 0 | } |
587 | 0 | } |
588 | 0 |
|
589 | 0 | self.close0(error: err, mode: .all, promise: nil) |
590 | 0 |
|
591 | 0 | // we handled all writes |
592 | 0 | return .unregister |
593 | 0 | } |
594 | 0 |
|
595 | 0 | switch writeResult.writeResult { |
596 | 0 | case .couldNotWriteEverything: |
597 | 0 | newWriteRegistrationState = .register |
598 | 0 | case .writtenCompletely(let closeState): |
599 | 0 | newWriteRegistrationState = .unregister |
600 | 0 | switch closeState { |
601 | 0 | case .open: |
602 | 0 | () |
603 | 0 | case .readyForClose: |
604 | 0 | self.close0(error: ChannelError.outputClosed, mode: .output, promise: nil) |
605 | 0 | case .closed: |
606 | 0 | () // we can be flushed before becoming active |
607 | 0 | } |
608 | 0 | } |
609 | 0 |
|
610 | 0 | if !self.isOpen || !self.hasFlushedPendingWrites() { |
611 | 0 | // No further writes, unregister. We won't re-enter the loop as both of these would have to be true. |
612 | 0 | newWriteRegistrationState = .unregister |
613 | 0 | } |
614 | 0 | } |
615 | 0 |
|
616 | 0 | assert( |
617 | 0 | (newWriteRegistrationState == .register && self.hasFlushedPendingWrites()) |
618 | 0 | || (newWriteRegistrationState == .unregister && !self.hasFlushedPendingWrites()), |
619 | 0 | "illegal flushNow decision: \(newWriteRegistrationState) and \(self.hasFlushedPendingWrites())" |
620 | 0 | ) |
621 | 0 | return newWriteRegistrationState |
622 | 0 | } |
623 | | |
624 | 0 | public final func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> { |
625 | 0 | if eventLoop.inEventLoop { |
626 | 0 | let promise = eventLoop.makePromise(of: Void.self) |
627 | 0 | executeAndComplete(promise) { try self.setOption0(option, value: value) } |
628 | 0 | return promise.futureResult |
629 | 0 | } else { |
630 | 0 | return eventLoop.submit { try self.setOption0(option, value: value) } |
631 | 0 | } |
632 | 0 | } |
633 | | |
634 | 0 | func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws { |
635 | 0 | self.eventLoop.assertInEventLoop() |
636 | 0 |
|
637 | 0 | guard isOpen else { |
638 | 0 | throw ChannelError._ioOnClosedChannel |
639 | 0 | } |
640 | 0 |
|
641 | 0 | switch option { |
642 | 0 | case let option as ChannelOptions.Types.SocketOption: |
643 | 0 | try self.setSocketOption0(level: option.optionLevel, name: option.optionName, value: value) |
644 | 0 | case _ as ChannelOptions.Types.AllocatorOption: |
645 | 0 | bufferAllocator = value as! ByteBufferAllocator |
646 | 0 | case _ as ChannelOptions.Types.RecvAllocatorOption: |
647 | 0 | self.recvBufferPool.recvAllocator = value as! RecvByteBufferAllocator |
648 | 0 | case _ as ChannelOptions.Types.AutoReadOption: |
649 | 0 | let auto = value as! Bool |
650 | 0 | let old = self.autoRead |
651 | 0 | self.autoRead = auto |
652 | 0 |
|
653 | 0 | // We only want to call read0() or pauseRead0() if we already registered to the EventLoop if not this will be automatically done |
654 | 0 | // once register0 is called. Beside this we also only need to do it when the value actually change. |
655 | 0 | if self.lifecycleManager.isPreRegistered && old != auto { |
656 | 0 | if auto { |
657 | 0 | read0() |
658 | 0 | } else { |
659 | 0 | pauseRead0() |
660 | 0 | } |
661 | 0 | } |
662 | 0 | case _ as ChannelOptions.Types.MaxMessagesPerReadOption: |
663 | 0 | self.maxMessagesPerRead = value as! UInt |
664 | 0 | self.recvBufferPool.updateCapacity(to: Int(self.maxMessagesPerRead)) |
665 | 0 | default: |
666 | 0 | fatalError("option \(option) not supported") |
667 | 0 | } |
668 | 0 | } |
669 | | |
670 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> { |
671 | 0 | if eventLoop.inEventLoop { |
672 | 0 | do { |
673 | 0 | return self.eventLoop.makeSucceededFuture(try self.getOption0(option)) |
674 | 0 | } catch { |
675 | 0 | return self.eventLoop.makeFailedFuture(error) |
676 | 0 | } |
677 | 0 | } else { |
678 | 0 | return self.eventLoop.submit { try self.getOption0(option) } |
679 | 0 | } |
680 | 0 | } |
681 | | |
682 | 0 | func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value { |
683 | 0 | self.eventLoop.assertInEventLoop() |
684 | 0 |
|
685 | 0 | guard isOpen else { |
686 | 0 | throw ChannelError._ioOnClosedChannel |
687 | 0 | } |
688 | 0 |
|
689 | 0 | switch option { |
690 | 0 | case let option as ChannelOptions.Types.SocketOption: |
691 | 0 | return try self.getSocketOption0(level: option.optionLevel, name: option.optionName) |
692 | 0 | case _ as ChannelOptions.Types.AllocatorOption: |
693 | 0 | return bufferAllocator as! Option.Value |
694 | 0 | case _ as ChannelOptions.Types.RecvAllocatorOption: |
695 | 0 | return self.recvBufferPool.recvAllocator as! Option.Value |
696 | 0 | case _ as ChannelOptions.Types.AutoReadOption: |
697 | 0 | return autoRead as! Option.Value |
698 | 0 | case _ as ChannelOptions.Types.MaxMessagesPerReadOption: |
699 | 0 | return maxMessagesPerRead as! Option.Value |
700 | 0 | default: |
701 | 0 | fatalError("option \(option) not supported") |
702 | 0 | } |
703 | 0 | } |
704 | | |
705 | | /// Triggers a `ChannelPipeline.read()` if `autoRead` is enabled.` |
706 | | /// |
707 | | /// - Returns: `true` if `readPending` is `true`, `false` otherwise. |
708 | 0 | @discardableResult func readIfNeeded0() -> Bool { |
709 | 0 | self.eventLoop.assertInEventLoop() |
710 | 0 | if !self.lifecycleManager.isActive { |
711 | 0 | return false |
712 | 0 | } |
713 | 0 |
|
714 | 0 | if !readPending && autoRead { |
715 | 0 | self.pipeline.syncOperations.read() |
716 | 0 | } |
717 | 0 | return readPending |
718 | 0 | } |
719 | | |
720 | | // Methods invoked from the HeadHandler of the ChannelPipeline |
721 | 0 | public func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
722 | 0 | self.eventLoop.assertInEventLoop() |
723 | 0 |
|
724 | 0 | guard self.isOpen else { |
725 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
726 | 0 | return |
727 | 0 | } |
728 | 0 |
|
729 | 0 | executeAndComplete(promise) { |
730 | 0 | try socket.bind(to: address) |
731 | 0 | self.updateCachedAddressesFromSocket(updateRemote: false) |
732 | 0 | } |
733 | 0 | } |
734 | | |
735 | 0 | public final func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) { |
736 | 0 | self.eventLoop.assertInEventLoop() |
737 | 0 |
|
738 | 0 | guard self.isOpen else { |
739 | 0 | // Channel was already closed, fail the promise and not even queue it. |
740 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
741 | 0 | return |
742 | 0 | } |
743 | 0 |
|
744 | 0 | bufferPendingWrite(data: data, promise: promise) |
745 | 0 | } |
746 | | |
747 | 0 | private func registerForWritable() { |
748 | 0 | self.eventLoop.assertInEventLoop() |
749 | 0 |
|
750 | 0 | guard !self.interestedEvent.contains(.write) else { |
751 | 0 | // nothing to do if we were previously interested in write |
752 | 0 | return |
753 | 0 | } |
754 | 0 | self.safeReregister(interested: self.interestedEvent.union(.write)) |
755 | 0 | } |
756 | | |
757 | 0 | func unregisterForWritable() { |
758 | 0 | self.eventLoop.assertInEventLoop() |
759 | 0 |
|
760 | 0 | guard self.interestedEvent.contains(.write) else { |
761 | 0 | // nothing to do if we were not previously interested in write |
762 | 0 | return |
763 | 0 | } |
764 | 0 | self.safeReregister(interested: self.interestedEvent.subtracting(.write)) |
765 | 0 | } |
766 | | |
767 | 0 | public final func flush0() { |
768 | 0 | self.eventLoop.assertInEventLoop() |
769 | 0 |
|
770 | 0 | guard self.isOpen else { |
771 | 0 | return |
772 | 0 | } |
773 | 0 |
|
774 | 0 | self.markFlushPoint() |
775 | 0 |
|
776 | 0 | guard self.lifecycleManager.isActive else { |
777 | 0 | return |
778 | 0 | } |
779 | 0 |
|
780 | 0 | if !isWritePending() && flushNow() == .register { |
781 | 0 | assert(self.lifecycleManager.isPreRegistered) |
782 | 0 | registerForWritable() |
783 | 0 | } |
784 | 0 | } |
785 | | |
786 | 0 | public func read0() { |
787 | 0 | self.eventLoop.assertInEventLoop() |
788 | 0 |
|
789 | 0 | guard self.isOpen else { |
790 | 0 | return |
791 | 0 | } |
792 | 0 | readPending = true |
793 | 0 |
|
794 | 0 | if self.lifecycleManager.isPreRegistered { |
795 | 0 | registerForReadable() |
796 | 0 | } |
797 | 0 | } |
798 | | |
799 | 0 | private final func pauseRead0() { |
800 | 0 | self.eventLoop.assertInEventLoop() |
801 | 0 |
|
802 | 0 | if self.lifecycleManager.isPreRegistered { |
803 | 0 | unregisterForReadable() |
804 | 0 | } |
805 | 0 | } |
806 | | |
807 | 0 | private final func registerForReadable() { |
808 | 0 | self.eventLoop.assertInEventLoop() |
809 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
810 | 0 |
|
811 | 0 | guard !self.lifecycleManager.hasSeenEOFNotification else { |
812 | 0 | // we have seen an EOF notification before so there's no point in registering for reads |
813 | 0 | return |
814 | 0 | } |
815 | 0 |
|
816 | 0 | guard !self.interestedEvent.contains(.read) else { |
817 | 0 | return |
818 | 0 | } |
819 | 0 |
|
820 | 0 | self.safeReregister(interested: self.interestedEvent.union(.read)) |
821 | 0 | } |
822 | | |
823 | 0 | private final func registerForReadEOF() { |
824 | 0 | self.eventLoop.assertInEventLoop() |
825 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
826 | 0 |
|
827 | 0 | guard !self.lifecycleManager.hasSeenEOFNotification else { |
828 | 0 | // we have seen an EOF notification before so there's no point in registering for reads |
829 | 0 | return |
830 | 0 | } |
831 | 0 |
|
832 | 0 | guard !self.interestedEvent.contains(.readEOF) else { |
833 | 0 | return |
834 | 0 | } |
835 | 0 |
|
836 | 0 | self.safeReregister(interested: self.interestedEvent.union(.readEOF)) |
837 | 0 | } |
838 | | |
839 | 0 | internal final func unregisterForReadable() { |
840 | 0 | self.eventLoop.assertInEventLoop() |
841 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
842 | 0 |
|
843 | 0 | guard self.interestedEvent.contains(.read) else { |
844 | 0 | return |
845 | 0 | } |
846 | 0 |
|
847 | 0 | self.safeReregister(interested: self.interestedEvent.subtracting(.read)) |
848 | 0 | } |
849 | | |
850 | | /// Closes the this `BaseChannelChannel` and fulfills `promise` with the result of the _close_ operation. |
851 | | /// So unless either the deregistration or the close itself fails, `promise` will be succeeded regardless of |
852 | | /// `error`. `error` is used to fail outstanding writes (if any) and the `connectPromise` if set. |
853 | | /// |
854 | | /// - Parameters: |
855 | | /// - error: The error to fail the outstanding (if any) writes/connect with. |
856 | | /// - mode: The close mode, must be `.all` for `BaseSocketChannel` |
857 | | /// - promise: The promise that gets notified about the result of the deregistration/close operations. |
858 | 0 | public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) { |
859 | 0 | self.eventLoop.assertInEventLoop() |
860 | 0 |
|
861 | 0 | guard self.isOpen else { |
862 | 0 | promise?.fail(ChannelError._alreadyClosed) |
863 | 0 | return |
864 | 0 | } |
865 | 0 |
|
866 | 0 | guard mode == .all else { |
867 | 0 | promise?.fail(ChannelError._operationUnsupported) |
868 | 0 | return |
869 | 0 | } |
870 | 0 |
|
871 | 0 | // === BEGIN: No user callouts === |
872 | 0 |
|
873 | 0 | // this is to register all error callouts as all the callouts must happen after we transition out state |
874 | 0 | var errorCallouts: [(ChannelPipeline) -> Void] = [] |
875 | 0 |
|
876 | 0 | self.interestedEvent = .reset |
877 | 0 | do { |
878 | 0 | try selectableEventLoop.deregister(channel: self) |
879 | 0 | } catch let err { |
880 | 0 | errorCallouts.append { pipeline in |
881 | 0 | pipeline.syncOperations.fireErrorCaught(err) |
882 | 0 | } |
883 | 0 | } |
884 | 0 |
|
885 | 0 | let p: EventLoopPromise<Void>? |
886 | 0 | do { |
887 | 0 | try socket.close() |
888 | 0 | p = promise |
889 | 0 | } catch { |
890 | 0 | errorCallouts.append { (_: ChannelPipeline) in |
891 | 0 | promise?.fail(error) |
892 | 0 | // Set p to nil as we want to ensure we pass nil to becomeInactive0(...) so we not try to notify the promise again. |
893 | 0 | } |
894 | 0 | p = nil |
895 | 0 | } |
896 | 0 |
|
897 | 0 | // Transition our internal state. |
898 | 0 | let callouts = self.lifecycleManager.close() |
899 | 0 |
|
900 | 0 | // === END: No user callouts (now that our state is reconciled, we can call out to user code.) === |
901 | 0 |
|
902 | 0 | // this must be the first to call out as it transitions the PendingWritesManager into the closed state |
903 | 0 | // and we assert elsewhere that the PendingWritesManager has the same idea of 'open' as we have in here. |
904 | 0 | self.cancelWritesOnClose(error: error) |
905 | 0 |
|
906 | 0 | // this should be a no-op as we shouldn't have any |
907 | 0 | for callout in errorCallouts { |
908 | 0 | callout(self.pipeline) |
909 | 0 | } |
910 | 0 |
|
911 | 0 | if let connectPromise = self.pendingConnect { |
912 | 0 | self.pendingConnect = nil |
913 | 0 | connectPromise.fail(error) |
914 | 0 | } |
915 | 0 |
|
916 | 0 | callouts(p, self.pipeline) |
917 | 0 |
|
918 | 0 | eventLoop.execute { |
919 | 0 | // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline |
920 | 0 | self.removeHandlers(pipeline: self.pipeline) |
921 | 0 |
|
922 | 0 | self.closePromise.succeed(()) |
923 | 0 |
|
924 | 0 | // Now reset the addresses as we notified all handlers / futures. |
925 | 0 | self.unsetCachedAddressesFromSocket() |
926 | 0 | } |
927 | 0 | } |
928 | | |
929 | 0 | public final func register0(promise: EventLoopPromise<Void>?) { |
930 | 0 | self.eventLoop.assertInEventLoop() |
931 | 0 |
|
932 | 0 | guard self.isOpen else { |
933 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
934 | 0 | return |
935 | 0 | } |
936 | 0 |
|
937 | 0 | guard !self.lifecycleManager.isPreRegistered else { |
938 | 0 | promise?.fail(ChannelError._inappropriateOperationForState) |
939 | 0 | return |
940 | 0 | } |
941 | 0 |
|
942 | 0 | guard self.selectableEventLoop.isOpen else { |
943 | 0 | let error = EventLoopError._shutdown |
944 | 0 | self.pipeline.syncOperations.fireErrorCaught(error) |
945 | 0 | // `close0`'s error is about the result of the `close` operation, ... |
946 | 0 | self.close0(error: error, mode: .all, promise: nil) |
947 | 0 | // ... therefore we need to fail the registration `promise` separately. |
948 | 0 | promise?.fail(error) |
949 | 0 | return |
950 | 0 | } |
951 | 0 |
|
952 | 0 | // we can't fully register yet as epoll would give us EPOLLHUP if bind/connect wasn't called yet. |
953 | 0 | self.lifecycleManager.beginRegistration()(promise, self.pipeline) |
954 | 0 | } |
955 | | |
956 | 0 | public final func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) { |
957 | 0 | self.eventLoop.assertInEventLoop() |
958 | 0 | assert(self.isOpen) |
959 | 0 | assert(!self.lifecycleManager.isActive) |
960 | 0 | let registerPromise = self.eventLoop.makePromise(of: Void.self) |
961 | 0 | self.register0(promise: registerPromise) |
962 | 0 | registerPromise.futureResult.whenFailure { (_: Error) in |
963 | 0 | self.close(promise: nil) |
964 | 0 | } |
965 | 0 | registerPromise.futureResult.cascadeFailure(to: promise) |
966 | 0 |
|
967 | 0 | if self.lifecycleManager.isPreRegistered { |
968 | 0 | // we expect kqueue/epoll registration to always succeed which is basically true, except for errors that |
969 | 0 | // should be fatal (EBADF, EFAULT, ESRCH, ENOMEM) and a two 'table full' (EMFILE, ENFILE) error kinds which |
970 | 0 | // we don't handle yet but might do in the future (#469). |
971 | 0 | try! becomeFullyRegistered0() |
972 | 0 | if self.lifecycleManager.isRegisteredFully { |
973 | 0 | self.becomeActive0(promise: promise) |
974 | 0 | } |
975 | 0 | } |
976 | 0 | } |
977 | | |
978 | 0 | public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) { |
979 | 0 | switch event { |
980 | 0 | case let event as VsockChannelEvents.ConnectToAddress: |
981 | 0 | self.connect0(to: .vsockAddress(event.address), promise: promise) |
982 | 0 | default: |
983 | 0 | promise?.fail(ChannelError._operationUnsupported) |
984 | 0 | } |
985 | 0 | } |
986 | | |
987 | | // Methods invoked from the EventLoop itself |
988 | 0 | public final func writable() { |
989 | 0 | self.eventLoop.assertInEventLoop() |
990 | 0 | assert(self.isOpen) |
991 | 0 |
|
992 | 0 | self.finishConnect() // If we were connecting, that has finished. |
993 | 0 |
|
994 | 0 | switch self.flushNow() { |
995 | 0 | case .unregister: |
996 | 0 | // Everything was written or connect was complete, let's unregister from writable. |
997 | 0 | self.finishWritable() |
998 | 0 | case .register: |
999 | 0 | assert(!self.isOpen || self.interestedEvent.contains(.write)) |
1000 | 0 | () // nothing to do because given that we just received `writable`, we're still registered for writable. |
1001 | 0 | } |
1002 | 0 | } |
1003 | | |
1004 | 0 | private func finishConnect() { |
1005 | 0 | self.eventLoop.assertInEventLoop() |
1006 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1007 | 0 |
|
1008 | 0 | if let connectPromise = self.pendingConnect { |
1009 | 0 | assert(!self.lifecycleManager.isActive) |
1010 | 0 |
|
1011 | 0 | do { |
1012 | 0 | try self.finishConnectSocket() |
1013 | 0 | } catch { |
1014 | 0 | assert(!self.lifecycleManager.isActive) |
1015 | 0 | // close0 fails the connectPromise itself so no need to do it here |
1016 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1017 | 0 | return |
1018 | 0 | } |
1019 | 0 | // now this has succeeded, becomeActive0 will actually fulfill this. |
1020 | 0 | self.pendingConnect = nil |
1021 | 0 | // We already know what the local address is. |
1022 | 0 | self.updateCachedAddressesFromSocket(updateLocal: false, updateRemote: true) |
1023 | 0 | self.becomeActive0(promise: connectPromise) |
1024 | 0 | } else { |
1025 | 0 | assert(self.lifecycleManager.isActive) |
1026 | 0 | } |
1027 | 0 | } |
1028 | | |
1029 | 0 | private func finishWritable() { |
1030 | 0 | self.eventLoop.assertInEventLoop() |
1031 | 0 |
|
1032 | 0 | if self.isOpen { |
1033 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1034 | 0 | assert(!self.hasFlushedPendingWrites()) |
1035 | 0 | self.unregisterForWritable() |
1036 | 0 | } |
1037 | 0 | } |
1038 | | |
1039 | 0 | func writeEOF() { |
1040 | 0 | fatalError("\(self) received writeEOF which is unexpected") |
1041 | 0 | } |
1042 | | |
1043 | 0 | func readEOF() { |
1044 | 0 | assert(!self.lifecycleManager.hasSeenEOFNotification) |
1045 | 0 | self.lifecycleManager.hasSeenEOFNotification = true |
1046 | 0 |
|
1047 | 0 | // we can't be not active but still registered here; this would mean that we got a notification about a |
1048 | 0 | // channel before we're ready to receive them. |
1049 | 0 | assert( |
1050 | 0 | self.lifecycleManager.isRegisteredFully, |
1051 | 0 | "illegal state: \(self): active: \(self.lifecycleManager.isActive), registered: \(self.lifecycleManager.isRegisteredFully)" |
1052 | 0 | ) |
1053 | 0 |
|
1054 | 0 | self.readEOF0() |
1055 | 0 |
|
1056 | 0 | assert(!self.interestedEvent.contains(.read)) |
1057 | 0 | assert(!self.interestedEvent.contains(.readEOF)) |
1058 | 0 | } |
1059 | | |
1060 | 0 | final func readEOF0() { |
1061 | 0 | if self.lifecycleManager.isRegisteredFully { |
1062 | 0 | // we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously |
1063 | 0 | // reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting |
1064 | 0 | // and would anyway fire constantly. |
1065 | 0 | self.safeReregister(interested: self.interestedEvent.subtracting(.readEOF)) |
1066 | 0 |
|
1067 | 0 | loop: while self.lifecycleManager.isActive { |
1068 | 0 | switch self.readable0() { |
1069 | 0 | case .eof: |
1070 | 0 | // on EOF we stop the loop and we're done with our processing for `readEOF`. |
1071 | 0 | // we could both be registered & active (if our channel supports half-closure) or unregistered & inactive (if it doesn't). |
1072 | 0 | break loop |
1073 | 0 | case .error: |
1074 | 0 | // we should be unregistered and inactive now (as `readable0` would've called close). |
1075 | 0 | assert(!self.lifecycleManager.isActive) |
1076 | 0 | assert(!self.lifecycleManager.isPreRegistered) |
1077 | 0 | break loop |
1078 | 0 | case .normal(.none): |
1079 | 0 | preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.") |
1080 | 0 | case .normal(.some): |
1081 | 0 | // normal, note that there is no guarantee we're still active (as the user might have closed in callout) |
1082 | 0 | continue loop |
1083 | 0 | } |
1084 | 0 | } |
1085 | 0 | } |
1086 | 0 | } |
1087 | | |
1088 | | // this _needs_ to synchronously cause the fd to be unregistered because we cannot unregister from `reset`. In |
1089 | | // other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering |
1090 | | // the `reset` event. |
1091 | 0 | final func reset() { |
1092 | 0 | self.readEOF0() |
1093 | 0 |
|
1094 | 0 | if self.socket.isOpen { |
1095 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1096 | 0 | let error: IOError |
1097 | 0 | // if the socket is still registered (and therefore open), let's try to get the actual socket error from the socket |
1098 | 0 | do { |
1099 | 0 | let result: Int32 = try self.socket.getOption(level: .socket, name: .so_error) |
1100 | 0 | if result != 0 { |
1101 | 0 | // we have a socket error, let's forward |
1102 | 0 | // this path will be executed on Linux (EPOLLERR) & Darwin (ev.fflags != 0) for |
1103 | 0 | // stream sockets, and most (but not all) errors on datagram sockets |
1104 | 0 | error = IOError(errnoCode: result, reason: "connection reset (error set)") |
1105 | 0 | } else { |
1106 | 0 | // we don't have a socket error, this must be connection reset without an error then |
1107 | 0 | // this path should only be executed on Linux (EPOLLHUP, no EPOLLERR) |
1108 | 0 | #if os(Linux) |
1109 | 0 | let message: String = "connection reset (no error set)" |
1110 | 0 | #else |
1111 | 0 | let message: String = |
1112 | 0 | "BUG IN SwiftNIO (possibly #572), please report! Connection reset (no error set)." |
1113 | 0 | #endif |
1114 | 0 | error = IOError(errnoCode: ECONNRESET, reason: message) |
1115 | 0 | } |
1116 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1117 | 0 | } catch { |
1118 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1119 | 0 | } |
1120 | 0 | } |
1121 | 0 | assert(!self.lifecycleManager.isPreRegistered) |
1122 | 0 | } |
1123 | | |
1124 | 0 | public final func readable() { |
1125 | 0 | assert( |
1126 | 0 | !self.lifecycleManager.hasSeenEOFNotification, |
1127 | 0 | "got a read notification after having already seen .readEOF" |
1128 | 0 | ) |
1129 | 0 | self.readable0() |
1130 | 0 | } |
1131 | | |
1132 | | @discardableResult |
1133 | 0 | private final func readable0() -> ReadStreamState { |
1134 | 0 | self.eventLoop.assertInEventLoop() |
1135 | 0 | assert(self.lifecycleManager.isActive) |
1136 | 0 |
|
1137 | 0 | defer { |
1138 | 0 | if self.isOpen && !self.readPending { |
1139 | 0 | unregisterForReadable() |
1140 | 0 | } |
1141 | 0 | } |
1142 | 0 |
|
1143 | 0 | let readResult: ReadResult |
1144 | 0 | do { |
1145 | 0 | readResult = try self.readFromSocket() |
1146 | 0 | } catch let err { |
1147 | 0 | let readStreamState: ReadStreamState |
1148 | 0 | // ChannelError.eof is not something we want to fire through the pipeline as it just means the remote |
1149 | 0 | // peer closed / shutdown the connection. |
1150 | 0 | if let channelErr = err as? ChannelError, channelErr == ChannelError.eof { |
1151 | 0 | readStreamState = .eof |
1152 | 0 |
|
1153 | 0 | if self.lifecycleManager.isActive { |
1154 | 0 | // Directly call getOption0 as we are already on the EventLoop and so not need to create an extra future. |
1155 | 0 | // |
1156 | 0 | // getOption0 can only fail if the channel is not active anymore but we assert further up that it is. If |
1157 | 0 | // that's not the case this is a precondition failure and we would like to know. |
1158 | 0 | let allowRemoteHalfClosure = try! self.getOption0(.allowRemoteHalfClosure) |
1159 | 0 |
|
1160 | 0 | // For EOF, we always fire read complete. |
1161 | 0 | self.pipeline.syncOperations.fireChannelReadComplete() |
1162 | 0 |
|
1163 | 0 | if allowRemoteHalfClosure { |
1164 | 0 | // If we want to allow half closure we will just mark the input side of the Channel |
1165 | 0 | // as closed. |
1166 | 0 | if self.shouldCloseOnReadError(err) { |
1167 | 0 | self.close0(error: err, mode: .input, promise: nil) |
1168 | 0 | } |
1169 | 0 | self.readPending = false |
1170 | 0 | return .eof |
1171 | 0 | } |
1172 | 0 | } |
1173 | 0 | } else { |
1174 | 0 | readStreamState = .error |
1175 | 0 | self.pipeline.syncOperations.fireErrorCaught(err) |
1176 | 0 | } |
1177 | 0 |
|
1178 | 0 | if self.shouldCloseOnReadError(err) { |
1179 | 0 | self.close0(error: err, mode: .all, promise: nil) |
1180 | 0 | return readStreamState |
1181 | 0 | } else { |
1182 | 0 | // This is non-fatal, so continue as normal. |
1183 | 0 | // This constitutes "some" as we did get at least an error from the socket. |
1184 | 0 | readResult = .some |
1185 | 0 | } |
1186 | 0 | } |
1187 | 0 | // This assert needs to be disabled for io_uring, as the io_uring backend does not have the implicit synchronisation between |
1188 | 0 | // modifications to the poll mask and the actual returned events on the completion queue that kqueue and epoll has. |
1189 | 0 | // For kqueue and epoll, there is an implicit synchronisation point such that after a modification of the poll mask has been |
1190 | 0 | // issued, the next call to reap events will be sure to not include events which does not match the new poll mask. |
1191 | 0 | // Specifically for this assert, it means that we will be guaranteed to never receive a POLLIN notification unless there are |
1192 | 0 | // bytes available to read. |
1193 | 0 |
|
1194 | 0 | // For a fully asynchronous backend like io_uring, there are no such implicit synchronisation point, so after we have |
1195 | 0 | // submitted the asynchronous event to change the poll mask, we may still reap pending asynchronous replies for the old |
1196 | 0 | // poll mask, and thus receive a POLLIN even though we have modified the mask visavi the kernel. |
1197 | 0 | // Which would trigger the assert. |
1198 | 0 |
|
1199 | 0 | // The only way to avoid that race, would be to use heavy handed synchronisation primitives like IOSQE_IO_DRAIN (basically |
1200 | 0 | // flushing all pending requests and wait for a fake event result to sync up) which would be awful for performance, |
1201 | 0 | // so better skip the assert() for io_uring instead. |
1202 | 0 | #if !SWIFTNIO_USE_IO_URING |
1203 | 0 | assert(readResult == .some) |
1204 | 0 | #endif |
1205 | 0 | if self.lifecycleManager.isActive { |
1206 | 0 | self.pipeline.syncOperations.fireChannelReadComplete() |
1207 | 0 | } |
1208 | 0 | self.readIfNeeded0() |
1209 | 0 | return .normal(readResult) |
1210 | 0 | } |
1211 | | |
1212 | | /// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`. |
1213 | | /// |
1214 | | /// - Parameters: |
1215 | | /// - err: The `Error` which was thrown by `readFromSocket`. |
1216 | | /// - Returns: `true` if the `Channel` should be closed, `false` otherwise. |
1217 | 0 | func shouldCloseOnReadError(_ err: Error) -> Bool { |
1218 | 0 | true |
1219 | 0 | } |
1220 | | |
1221 | | /// Handles an error reported by the selector. |
1222 | | /// |
1223 | | /// Default behaviour is to treat this as if it were a reset. |
1224 | 0 | func error() -> ErrorResult { |
1225 | 0 | self.reset() |
1226 | 0 | return .fatal |
1227 | 0 | } |
1228 | | |
1229 | 0 | internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) { |
1230 | 0 | self.eventLoop.assertInEventLoop() |
1231 | 0 | assert(updateLocal || updateRemote) |
1232 | 0 | let cached = self.addressesCached |
1233 | 0 | let local = updateLocal ? try? self.localAddress0() : cached.local |
1234 | 0 | let remote = updateRemote ? try? self.remoteAddress0() : cached.remote |
1235 | 0 | self.addressesCached = AddressCache(local: local, remote: remote) |
1236 | 0 | } |
1237 | | |
1238 | 0 | internal final func unsetCachedAddressesFromSocket() { |
1239 | 0 | self.eventLoop.assertInEventLoop() |
1240 | 0 | self.addressesCached = AddressCache(local: nil, remote: nil) |
1241 | 0 | } |
1242 | | |
1243 | 0 | public final func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) { |
1244 | 0 | self.connect0(to: .socketAddress(address), promise: promise) |
1245 | 0 | } |
1246 | | |
1247 | 0 | internal final func connect0(to target: ConnectTarget, promise: EventLoopPromise<Void>?) { |
1248 | 0 | self.eventLoop.assertInEventLoop() |
1249 | 0 |
|
1250 | 0 | guard self.isOpen else { |
1251 | 0 | promise?.fail(ChannelError._ioOnClosedChannel) |
1252 | 0 | return |
1253 | 0 | } |
1254 | 0 |
|
1255 | 0 | guard pendingConnect == nil else { |
1256 | 0 | promise?.fail(ChannelError._connectPending) |
1257 | 0 | return |
1258 | 0 | } |
1259 | 0 |
|
1260 | 0 | guard self.lifecycleManager.isPreRegistered else { |
1261 | 0 | promise?.fail(ChannelError._inappropriateOperationForState) |
1262 | 0 | return |
1263 | 0 | } |
1264 | 0 |
|
1265 | 0 | do { |
1266 | 0 | if try !self.connectSocket(to: target) { |
1267 | 0 | // We aren't connected, we'll get the remote address later. |
1268 | 0 | self.updateCachedAddressesFromSocket(updateLocal: true, updateRemote: false) |
1269 | 0 | if promise != nil { |
1270 | 0 | self.pendingConnect = promise |
1271 | 0 | } else { |
1272 | 0 | self.pendingConnect = eventLoop.makePromise() |
1273 | 0 | } |
1274 | 0 | try self.becomeFullyRegistered0() |
1275 | 0 | self.registerForWritable() |
1276 | 0 | } else { |
1277 | 0 | self.updateCachedAddressesFromSocket() |
1278 | 0 | self.becomeActive0(promise: promise) |
1279 | 0 | } |
1280 | 0 | } catch let error { |
1281 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1282 | 0 | // We would like to have this assertion here, but we want to be able to go through this |
1283 | 0 | // code path in cases where connect() is being called on channels that are already active. |
1284 | 0 | //assert(!self.lifecycleManager.isActive) |
1285 | 0 | // We're going to set the promise as the pending connect promise, and let close0 fail it for us. |
1286 | 0 | self.pendingConnect = promise |
1287 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1288 | 0 | } |
1289 | 0 | } |
1290 | | |
1291 | 0 | public func channelRead0(_ data: NIOAny) { |
1292 | 0 | // Do nothing by default |
1293 | 0 | // note: we can't assert that we're active here as TailChannelHandler will call this on channelRead |
1294 | 0 | } |
1295 | | |
1296 | 0 | public func errorCaught0(error: Error) { |
1297 | 0 | // Do nothing |
1298 | 0 | } |
1299 | | |
1300 | 0 | private func isWritePending() -> Bool { |
1301 | 0 | self.interestedEvent.contains(.write) |
1302 | 0 | } |
1303 | | |
1304 | 0 | private final func safeReregister(interested: SelectorEventSet) { |
1305 | 0 | self.eventLoop.assertInEventLoop() |
1306 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
1307 | 0 |
|
1308 | 0 | guard self.isOpen else { |
1309 | 0 | assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) even though we're closed") |
1310 | 0 | return |
1311 | 0 | } |
1312 | 0 | if interested == interestedEvent { |
1313 | 0 | // we don't need to update and so cause a syscall if we already are registered with the correct event |
1314 | 0 | return |
1315 | 0 | } |
1316 | 0 | interestedEvent = interested |
1317 | 0 | do { |
1318 | 0 | try selectableEventLoop.reregister(channel: self) |
1319 | 0 | } catch let err { |
1320 | 0 | self.pipeline.syncOperations.fireErrorCaught(err) |
1321 | 0 | self.close0(error: err, mode: .all, promise: nil) |
1322 | 0 | } |
1323 | 0 | } |
1324 | | |
1325 | 0 | private func safeRegister(interested: SelectorEventSet) throws { |
1326 | 0 | self.eventLoop.assertInEventLoop() |
1327 | 0 | assert(!self.lifecycleManager.isRegisteredFully) |
1328 | 0 |
|
1329 | 0 | guard self.isOpen else { |
1330 | 0 | throw ChannelError._ioOnClosedChannel |
1331 | 0 | } |
1332 | 0 |
|
1333 | 0 | self.interestedEvent = interested |
1334 | 0 | do { |
1335 | 0 | try self.selectableEventLoop.register(channel: self) |
1336 | 0 | } catch { |
1337 | 0 | self.pipeline.syncOperations.fireErrorCaught(error) |
1338 | 0 | self.close0(error: error, mode: .all, promise: nil) |
1339 | 0 | throw error |
1340 | 0 | } |
1341 | 0 | } |
1342 | | |
1343 | 0 | final func becomeFullyRegistered0() throws { |
1344 | 0 | self.eventLoop.assertInEventLoop() |
1345 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1346 | 0 | assert(!self.lifecycleManager.isRegisteredFully) |
1347 | 0 |
|
1348 | 0 | // The initial set of interested events must not contain `.readEOF` because when connect doesn't return |
1349 | 0 | // synchronously, kevent might send us a `readEOF` because the `writable` event that marks the connect as completed. |
1350 | 0 | // See SocketChannelTest.testServerClosesTheConnectionImmediately for a regression test. |
1351 | 0 | try self.safeRegister(interested: [.reset, .error]) |
1352 | 0 | self.lifecycleManager.finishRegistration()(nil, self.pipeline) |
1353 | 0 | } |
1354 | | |
1355 | 0 | final func becomeActive0(promise: EventLoopPromise<Void>?) { |
1356 | 0 | self.eventLoop.assertInEventLoop() |
1357 | 0 | assert(self.lifecycleManager.isPreRegistered) |
1358 | 0 | if !self.lifecycleManager.isRegisteredFully { |
1359 | 0 | do { |
1360 | 0 | try self.becomeFullyRegistered0() |
1361 | 0 | assert(self.lifecycleManager.isRegisteredFully) |
1362 | 0 | } catch { |
1363 | 0 | self.close0(error: error, mode: .all, promise: promise) |
1364 | 0 | return |
1365 | 0 | } |
1366 | 0 | } |
1367 | 0 | self.lifecycleManager.activate()(promise, self.pipeline) |
1368 | 0 | guard self.lifecycleManager.isOpen else { |
1369 | 0 | // in the user callout for `channelActive` the channel got closed. |
1370 | 0 | return |
1371 | 0 | } |
1372 | 0 | self.registerForReadEOF() |
1373 | 0 |
|
1374 | 0 | // Flush any pending writes. If after the flush we're still open, make sure |
1375 | 0 | // our registration is appropriate. |
1376 | 0 | switch self.flushNow() { |
1377 | 0 | case .register: |
1378 | 0 | if self.lifecycleManager.isOpen && !self.interestedEvent.contains(.write) { |
1379 | 0 | self.registerForWritable() |
1380 | 0 | } |
1381 | 0 | case .unregister: |
1382 | 0 | if self.lifecycleManager.isOpen && self.interestedEvent.contains(.write) { |
1383 | 0 | self.unregisterForWritable() |
1384 | 0 | } |
1385 | 0 | } |
1386 | 0 |
|
1387 | 0 | self.readIfNeeded0() |
1388 | 0 | } |
1389 | | |
1390 | 0 | func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws { |
1391 | 0 | fatalError("must override") |
1392 | 0 | } |
1393 | | |
1394 | 0 | func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws { |
1395 | 0 | fatalError("must override") |
1396 | 0 | } |
1397 | | |
1398 | 0 | func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws { |
1399 | 0 | fatalError("must override") |
1400 | 0 | } |
1401 | | } |
1402 | | |
1403 | | extension BaseSocketChannel { |
1404 | | public struct SynchronousOptions: NIOSynchronousChannelOptions { |
1405 | | @usableFromInline // should be private |
1406 | | internal let _channel: BaseSocketChannel<SocketType> |
1407 | | |
1408 | | @inlinable // should be fileprivate |
1409 | 0 | internal init(_channel channel: BaseSocketChannel<SocketType>) { |
1410 | 0 | self._channel = channel |
1411 | 0 | } |
1412 | | |
1413 | | @inlinable |
1414 | 0 | public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws { |
1415 | 0 | try self._channel.setOption0(option, value: value) |
1416 | 0 | } |
1417 | | |
1418 | | @inlinable |
1419 | 0 | public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value { |
1420 | 0 | try self._channel.getOption0(option) |
1421 | 0 | } |
1422 | | } |
1423 | | |
1424 | 0 | public final var syncOptions: NIOSynchronousChannelOptions? { |
1425 | 0 | SynchronousOptions(_channel: self) |
1426 | 0 | } |
1427 | | } |
1428 | | |
1429 | | /// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`). |
1430 | 0 | func executeAndComplete<Value: Sendable>(_ promise: EventLoopPromise<Value>?, _ body: () throws -> Value) { |
1431 | 0 | do { |
1432 | 0 | let result = try body() |
1433 | 0 | promise?.succeed(result) |
1434 | 0 | } catch let e { |
1435 | 0 | promise?.fail(e) |
1436 | 0 | } |
1437 | 0 | } |