Coverage Report

Created: 2026-04-29 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/swift-nio/Sources/NIOPosix/PipeChannel.swift
Line
Count
Source
1
//===----------------------------------------------------------------------===//
2
//
3
// This source file is part of the SwiftNIO open source project
4
//
5
// Copyright (c) 2019-2021 Apple Inc. and the SwiftNIO project authors
6
// Licensed under Apache License v2.0
7
//
8
// See LICENSE.txt for license information
9
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10
//
11
// SPDX-License-Identifier: Apache-2.0
12
//
13
//===----------------------------------------------------------------------===//
14
15
#if !os(WASI)
16
17
import NIOCore
18
19
final class PipeChannel: BaseStreamSocketChannel<PipePair>, @unchecked Sendable {
20
    private let pipePair: PipePair
21
22
    internal enum Direction {
23
        case input
24
        case output
25
    }
26
27
    init(
28
        eventLoop: SelectableEventLoop,
29
        input: SelectablePipeHandle?,
30
        output: SelectablePipeHandle?
31
0
    ) throws {
32
0
        self.pipePair = try PipePair(input: input, output: output)
33
0
        try super.init(
34
0
            socket: self.pipePair,
35
0
            parent: nil,
36
0
            eventLoop: eventLoop,
37
0
            recvAllocator: AdaptiveRecvByteBufferAllocator()
38
0
        )
39
0
    }
40
41
0
    func registrationForInput(interested: SelectorEventSet, registrationID: SelectorRegistrationID) -> NIORegistration {
42
0
        NIORegistration(
43
0
            channel: .pipeChannel(self, .input),
44
0
            interested: interested,
45
0
            registrationID: registrationID
46
0
        )
47
0
    }
48
49
    func registrationForOutput(interested: SelectorEventSet, registrationID: SelectorRegistrationID) -> NIORegistration
50
0
    {
51
0
        NIORegistration(
52
0
            channel: .pipeChannel(self, .output),
53
0
            interested: interested,
54
0
            registrationID: registrationID
55
0
        )
56
0
    }
57
58
0
    override func connectSocket(to address: SocketAddress) throws -> Bool {
59
0
        throw ChannelError._operationUnsupported
60
0
    }
61
62
0
    override func connectSocket(to address: VsockAddress) throws -> Bool {
63
0
        throw ChannelError._operationUnsupported
64
0
    }
65
66
0
    override func finishConnectSocket() throws {
67
0
        throw ChannelError._inappropriateOperationForState
68
0
    }
69
70
0
    override func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
71
0
        if let inputSPH = self.pipePair.input {
72
0
            try selector.register(
73
0
                selectable: inputSPH,
74
0
                interested: interested.intersection([.read, .reset, .error]),
75
0
                makeRegistration: self.registrationForInput
Unexecuted instantiation: $s8NIOPosix11PipeChannelC8register8selector10interestedyAA8SelectorCyAA15NIORegistrationVG_AA0G8EventSetVtKFAjM_AA0G14RegistrationIDVtYbcACYbcfu_
Unexecuted instantiation: $s8NIOPosix11PipeChannelC8register8selector10interestedyAA8SelectorCyAA15NIORegistrationVG_AA0G8EventSetVtKFAjM_AA0G14RegistrationIDVtYbcACYbcfu_AjM_AOtYbcfu0_
76
0
            )
77
0
        }
78
0
79
0
        if let outputSPH = self.pipePair.output {
80
0
            try selector.register(
81
0
                selectable: outputSPH,
82
0
                interested: interested.intersection([.write, .reset, .error]),
83
0
                makeRegistration: self.registrationForOutput
Unexecuted instantiation: $s8NIOPosix11PipeChannelC8register8selector10interestedyAA8SelectorCyAA15NIORegistrationVG_AA0G8EventSetVtKFAjM_AA0G14RegistrationIDVtYbcACYbcfu1_
Unexecuted instantiation: $s8NIOPosix11PipeChannelC8register8selector10interestedyAA8SelectorCyAA15NIORegistrationVG_AA0G8EventSetVtKFAjM_AA0G14RegistrationIDVtYbcACYbcfu1_AjM_AOtYbcfu2_
84
0
            )
85
0
        }
86
0
    }
87
88
0
    override func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
89
0
        if let inputSPH = self.pipePair.input, (mode == .all || mode == .input) && inputSPH.isOpen {
90
0
            try selector.deregister(selectable: inputSPH)
91
0
        }
92
0
        if let outputSPH = self.pipePair.output, (mode == .all || mode == .output) && outputSPH.isOpen {
93
0
            try selector.deregister(selectable: outputSPH)
94
0
        }
95
0
    }
96
97
0
    override func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
98
0
        if let inputSPH = self.pipePair.input, inputSPH.isOpen {
99
0
            try selector.reregister(
100
0
                selectable: inputSPH,
101
0
                interested: interested.intersection([.read, .reset, .error])
102
0
            )
103
0
        }
104
0
        if let outputSPH = self.pipePair.output, outputSPH.isOpen {
105
0
            try selector.reregister(
106
0
                selectable: outputSPH,
107
0
                interested: interested.intersection([.write, .reset, .error])
108
0
            )
109
0
        }
110
0
    }
111
112
0
    override func readEOF() {
113
0
        super.readEOF()
114
0
        guard let inputSPH = self.pipePair.input, inputSPH.isOpen else {
115
0
            return
116
0
        }
117
0
        try! self.selectableEventLoop.deregister(channel: self, mode: .input)
118
0
        try! inputSPH.close()
119
0
    }
120
121
0
    override func writeEOF() {
122
0
        guard let outputSPH = self.pipePair.output, outputSPH.isOpen else {
123
0
            return
124
0
        }
125
0
        try! self.selectableEventLoop.deregister(channel: self, mode: .output)
126
0
        try! outputSPH.close()
127
0
128
0
        // Only close the entire channel if the input is already closed.
129
0
        // If input is still open, we can continue half-duplex operation.
130
0
        var inputIsClosed = true
131
0
        if let inputSPH = self.pipePair.input {
132
0
            inputIsClosed = !inputSPH.isOpen
133
0
        }
134
0
        if inputIsClosed {
135
0
            let error = IOError(errnoCode: EPIPE, reason: "Broken pipe")
136
0
            self.close0(error: error, mode: .all, promise: nil)
137
0
        }
138
0
    }
139
140
0
    override func shutdownSocket(mode: CloseMode) throws {
141
0
        switch mode {
142
0
        case .input:
143
0
            try! self.selectableEventLoop.deregister(channel: self, mode: .input)
144
0
        case .output:
145
0
            try! self.selectableEventLoop.deregister(channel: self, mode: .output)
146
0
        case .all:
147
0
            break
148
0
        }
149
0
        try super.shutdownSocket(mode: mode)
150
0
    }
151
}
152
153
extension PipeChannel: CustomStringConvertible {
154
0
    var description: String {
155
0
        "PipeChannel { \(self.socketDescription), active = \(self.isActive), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
156
0
    }
157
}
158
#endif  // !os(WASI)