Coverage Report

Created: 2026-06-07 06:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/ClientCalls/ResponseContainers.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOHPACK
18
19
/// A bucket of promises for a unary-response RPC.
20
internal class UnaryResponseParts<Response> {
21
  /// The `EventLoop` we expect to receive these response parts on.
22
  private let eventLoop: EventLoop
23
24
  /// A promise for the `Response` message.
25
  private let responsePromise: EventLoopPromise<Response>
26
27
  /// Lazy promises for the status, initial-, and trailing-metadata.
28
  private var initialMetadataPromise: LazyEventLoopPromise<HPACKHeaders>
29
  private var trailingMetadataPromise: LazyEventLoopPromise<HPACKHeaders>
30
  private var statusPromise: LazyEventLoopPromise<GRPCStatus>
31
32
0
  internal var response: EventLoopFuture<Response> {
33
0
    return self.responsePromise.futureResult
34
0
  }
35
36
0
  internal var initialMetadata: EventLoopFuture<HPACKHeaders> {
37
0
    return self.eventLoop.executeOrFlatSubmit {
38
0
      return self.initialMetadataPromise.getFutureResult()
39
0
    }
40
0
  }
41
42
0
  internal var trailingMetadata: EventLoopFuture<HPACKHeaders> {
43
0
    return self.eventLoop.executeOrFlatSubmit {
44
0
      return self.trailingMetadataPromise.getFutureResult()
45
0
    }
46
0
  }
47
48
0
  internal var status: EventLoopFuture<GRPCStatus> {
49
0
    return self.eventLoop.executeOrFlatSubmit {
50
0
      return self.statusPromise.getFutureResult()
51
0
    }
52
0
  }
53
54
0
  internal init(on eventLoop: EventLoop) {
55
0
    self.eventLoop = eventLoop
56
0
    self.responsePromise = eventLoop.makePromise()
57
0
    self.initialMetadataPromise = eventLoop.makeLazyPromise()
58
0
    self.trailingMetadataPromise = eventLoop.makeLazyPromise()
59
0
    self.statusPromise = eventLoop.makeLazyPromise()
60
0
  }
61
62
  /// Handle the response part, completing any promises as necessary.
63
  /// - Important: This *must* be called on `eventLoop`.
64
0
  internal func handle(_ part: GRPCClientResponsePart<Response>) {
65
0
    self.eventLoop.assertInEventLoop()
66
0
67
0
    switch part {
68
0
    case let .metadata(metadata):
69
0
      self.initialMetadataPromise.succeed(metadata)
70
0
71
0
    case let .message(response):
72
0
      self.responsePromise.succeed(response)
73
0
74
0
    case let .end(status, trailers):
75
0
      // In case of a "Trailers-Only" RPC (i.e. just the trailers and status), fail the initial
76
0
      // metadata and status.
77
0
      self.initialMetadataPromise.fail(status)
78
0
      self.responsePromise.fail(status)
79
0
80
0
      self.trailingMetadataPromise.succeed(trailers)
81
0
      self.statusPromise.succeed(status)
82
0
    }
83
0
  }
84
85
0
  internal func handleError(_ error: Error) {
86
0
    let withoutContext = error.removingContext()
87
0
    let status = withoutContext.makeGRPCStatus()
88
0
    self.initialMetadataPromise.fail(withoutContext)
89
0
    self.responsePromise.fail(withoutContext)
90
0
    self.trailingMetadataPromise.fail(withoutContext)
91
0
    self.statusPromise.succeed(status)
92
0
  }
93
}
94
95
/// A bucket of promises for a streaming-response RPC.
96
internal class StreamingResponseParts<Response> {
97
  /// The `EventLoop` we expect to receive these response parts on.
98
  private let eventLoop: EventLoop
99
100
  /// A callback for response messages.
101
  private var responseCallback: Optional<(Response) -> Void>
102
103
  /// Lazy promises for the status, initial-, and trailing-metadata.
104
  private var initialMetadataPromise: LazyEventLoopPromise<HPACKHeaders>
105
  private var trailingMetadataPromise: LazyEventLoopPromise<HPACKHeaders>
106
  private var statusPromise: LazyEventLoopPromise<GRPCStatus>
107
108
0
  internal var initialMetadata: EventLoopFuture<HPACKHeaders> {
109
0
    return self.eventLoop.executeOrFlatSubmit {
110
0
      return self.initialMetadataPromise.getFutureResult()
111
0
    }
112
0
  }
113
114
0
  internal var trailingMetadata: EventLoopFuture<HPACKHeaders> {
115
0
    return self.eventLoop.executeOrFlatSubmit {
116
0
      return self.trailingMetadataPromise.getFutureResult()
117
0
    }
118
0
  }
119
120
0
  internal var status: EventLoopFuture<GRPCStatus> {
121
0
    return self.eventLoop.executeOrFlatSubmit {
122
0
      return self.statusPromise.getFutureResult()
123
0
    }
124
0
  }
125
126
0
  internal init(on eventLoop: EventLoop, _ responseCallback: @escaping (Response) -> Void) {
127
0
    self.eventLoop = eventLoop
128
0
    self.responseCallback = responseCallback
129
0
    self.initialMetadataPromise = eventLoop.makeLazyPromise()
130
0
    self.trailingMetadataPromise = eventLoop.makeLazyPromise()
131
0
    self.statusPromise = eventLoop.makeLazyPromise()
132
0
  }
133
134
0
  internal func handle(_ part: GRPCClientResponsePart<Response>) {
135
0
    self.eventLoop.assertInEventLoop()
136
0
137
0
    switch part {
138
0
    case let .metadata(metadata):
139
0
      self.initialMetadataPromise.succeed(metadata)
140
0
141
0
    case let .message(response):
142
0
      self.responseCallback?(response)
143
0
144
0
    case let .end(status, trailers):
145
0
      // Once the stream has finished, we must release the callback, to make sure don't
146
0
      // break potential retain cycles (the callback may reference other object's that in
147
0
      // turn reference `StreamingResponseParts`).
148
0
      self.responseCallback = nil
149
0
      self.initialMetadataPromise.fail(status)
150
0
      self.trailingMetadataPromise.succeed(trailers)
151
0
      self.statusPromise.succeed(status)
152
0
    }
153
0
  }
154
155
0
  internal func handleError(_ error: Error) {
156
0
    self.eventLoop.assertInEventLoop()
157
0
158
0
    // Once the stream has finished, we must release the callback, to make sure don't
159
0
    // break potential retain cycles (the callback may reference other object's that in
160
0
    // turn reference `StreamingResponseParts`).
161
0
    self.responseCallback = nil
162
0
    let withoutContext = error.removingContext()
163
0
    let status = withoutContext.makeGRPCStatus()
164
0
    self.initialMetadataPromise.fail(withoutContext)
165
0
    self.trailingMetadataPromise.fail(withoutContext)
166
0
    self.statusPromise.succeed(status)
167
0
  }
168
}
169
170
extension EventLoop {
171
  fileprivate func executeOrFlatSubmit<Result>(
172
    _ body: @escaping () -> EventLoopFuture<Result>
173
0
  ) -> EventLoopFuture<Result> {
174
0
    if self.inEventLoop {
175
0
      return body()
176
0
    } else {
177
0
      return self.flatSubmit {
178
0
        return body()
179
0
      }
180
0
    }
181
0
  }
182
}
183
184
extension Error {
185
0
  fileprivate func removingContext() -> Error {
186
0
    if let withContext = self as? GRPCError.WithContext {
187
0
      return withContext.error
188
0
    } else {
189
0
      return self
190
0
    }
191
0
  }
192
193
0
  fileprivate func makeGRPCStatus() -> GRPCStatus {
194
0
    if let withContext = self as? GRPCError.WithContext {
195
0
      return withContext.error.makeGRPCStatus()
196
0
    } else if let transformable = self as? GRPCStatusTransformable {
197
0
      return transformable.makeGRPCStatus()
198
0
    } else {
199
0
      return GRPCStatus(code: .unknown, message: String(describing: self))
200
0
    }
201
0
  }
202
}
203
204
// @unchecked is ok: all mutable state is accessed/modified from an appropriate event loop.
205
extension UnaryResponseParts: @unchecked Sendable where Response: Sendable {}
206
extension StreamingResponseParts: @unchecked Sendable where Response: Sendable {}