Coverage Report

Created: 2026-06-07 06:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/_FakeResponseStream.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOEmbedded
18
import NIOHPACK
19
20
public enum FakeRequestPart<Request> {
21
  case metadata(HPACKHeaders)
22
  case message(Request)
23
  case end
24
}
25
26
extension FakeRequestPart: Equatable where Request: Equatable {}
27
28
/// Sending on a fake response stream would have resulted in a protocol violation (such as
29
/// sending initial metadata multiple times or sending messages after the stream has closed).
30
public struct FakeResponseProtocolViolation: Error, Hashable {
31
  /// The reason that sending the message would have resulted in a protocol violation.
32
  public var reason: String
33
34
0
  init(_ reason: String) {
35
0
    self.reason = reason
36
0
  }
37
}
38
39
/// A fake response stream into which users may inject response parts for use in unit tests.
40
///
41
/// Users may not interact with this class directly but may do so via one of its subclasses
42
/// `FakeUnaryResponse` and `FakeStreamingResponse`.
43
public class _FakeResponseStream<Request, Response> {
44
  private enum StreamEvent {
45
    case responsePart(_GRPCClientResponsePart<Response>)
46
    case error(Error)
47
  }
48
49
  /// The channel to use for communication.
50
  internal let channel: EmbeddedChannel
51
52
  /// A buffer to hold responses in before the proxy is activated.
53
  private var responseBuffer: CircularBuffer<StreamEvent>
54
55
  /// The current state of the proxy.
56
  private var activeState: ActiveState
57
58
  /// The state of sending response parts.
59
  private var sendState: SendState
60
61
  private enum ActiveState {
62
    case inactive
63
    case active
64
  }
65
66
  private enum SendState {
67
    // Nothing has been sent; we can send initial metadata to become 'sending' or trailing metadata
68
    // to start 'closing'.
69
    case idle
70
71
    // We're sending messages. We can send more messages in this state or trailing metadata to
72
    // transition to 'closing'.
73
    case sending
74
75
    // We're closing: we've sent trailing metadata, we may only send a status now to close.
76
    case closing
77
78
    // Closed, nothing more can be sent.
79
    case closed
80
  }
81
82
0
  internal init(requestHandler: @escaping (FakeRequestPart<Request>) -> Void) {
83
0
    self.activeState = .inactive
84
0
    self.sendState = .idle
85
0
    self.responseBuffer = CircularBuffer()
86
0
    self.channel = EmbeddedChannel(handler: WriteCapturingHandler(requestHandler: requestHandler))
87
0
  }
88
89
  /// Activate the test proxy; this should be called
90
0
  internal func activate() {
91
0
    switch self.activeState {
92
0
    case .inactive:
93
0
      // Activate the channel. This will allow any request parts to be sent.
94
0
      self.channel.pipeline.fireChannelActive()
95
0
96
0
      // Unbuffer any response parts.
97
0
      while !self.responseBuffer.isEmpty {
98
0
        self.write(self.responseBuffer.removeFirst())
99
0
      }
100
0
101
0
      // Now we're active.
102
0
      self.activeState = .active
103
0
104
0
    case .active:
105
0
      ()
106
0
    }
107
0
  }
108
109
  /// Write or buffer the response part, depending on the our current state.
110
0
  internal func _sendResponsePart(_ part: _GRPCClientResponsePart<Response>) throws {
111
0
    try self.send(.responsePart(part))
112
0
  }
113
114
0
  internal func _sendError(_ error: Error) throws {
115
0
    try self.send(.error(error))
116
0
  }
117
118
0
  private func send(_ event: StreamEvent) throws {
119
0
    switch self.validate(event) {
120
0
    case .valid:
121
0
      self.writeOrBuffer(event)
122
0
123
0
    case let .validIfSentAfter(extraPart):
124
0
      self.writeOrBuffer(extraPart)
125
0
      self.writeOrBuffer(event)
126
0
127
0
    case let .invalid(reason):
128
0
      throw FakeResponseProtocolViolation(reason)
129
0
    }
130
0
  }
131
132
  /// Validate events the user wants to send on the stream.
133
0
  private func validate(_ event: StreamEvent) -> Validation {
134
0
    switch (event, self.sendState) {
135
0
    case (.responsePart(.initialMetadata), .idle):
136
0
      self.sendState = .sending
137
0
      return .valid
138
0
139
0
    case (.responsePart(.initialMetadata), .sending),
140
0
      (.responsePart(.initialMetadata), .closing),
141
0
      (.responsePart(.initialMetadata), .closed):
142
0
      // We can only send initial metadata from '.idle'.
143
0
      return .invalid(reason: "Initial metadata has already been sent")
144
0
145
0
    case (.responsePart(.message), .idle):
146
0
      // This is fine: we don't force the user to specify initial metadata so we send some on their
147
0
      // behalf.
148
0
      self.sendState = .sending
149
0
      return .validIfSentAfter(.responsePart(.initialMetadata([:])))
150
0
151
0
    case (.responsePart(.message), .sending):
152
0
      return .valid
153
0
154
0
    case (.responsePart(.message), .closing),
155
0
      (.responsePart(.message), .closed):
156
0
      // We can't send messages once we're closing or closed.
157
0
      return .invalid(reason: "Messages can't be sent after the stream has been closed")
158
0
159
0
    case (.responsePart(.trailingMetadata), .idle),
160
0
      (.responsePart(.trailingMetadata), .sending):
161
0
      self.sendState = .closing
162
0
      return .valid
163
0
164
0
    case (.responsePart(.trailingMetadata), .closing),
165
0
      (.responsePart(.trailingMetadata), .closed):
166
0
      // We're already closing or closed.
167
0
      return .invalid(reason: "Trailing metadata can't be sent after the stream has been closed")
168
0
169
0
    case (.responsePart(.status), .idle),
170
0
      (.error, .idle),
171
0
      (.responsePart(.status), .sending),
172
0
      (.error, .sending),
173
0
      (.responsePart(.status), .closed),
174
0
      (.error, .closed):
175
0
      // We can only error/close if we're closing (i.e. have already sent trailers which we enforce
176
0
      // from the API in the subclasses).
177
0
      return .invalid(reason: "Status/error can only be sent after trailing metadata has been sent")
178
0
179
0
    case (.responsePart(.status), .closing),
180
0
      (.error, .closing):
181
0
      self.sendState = .closed
182
0
      return .valid
183
0
    }
184
0
  }
185
186
  private enum Validation {
187
    /// Sending the part is valid.
188
    case valid
189
190
    /// Sending the part, if it is sent after the given part.
191
    case validIfSentAfter(_ part: StreamEvent)
192
193
    /// Sending the part would be a protocol violation.
194
    case invalid(reason: String)
195
  }
196
197
0
  private func writeOrBuffer(_ event: StreamEvent) {
198
0
    switch self.activeState {
199
0
    case .inactive:
200
0
      self.responseBuffer.append(event)
201
0
202
0
    case .active:
203
0
      self.write(event)
204
0
    }
205
0
  }
206
207
0
  private func write(_ part: StreamEvent) {
208
0
    switch part {
209
0
    case let .error(error):
210
0
      self.channel.pipeline.fireErrorCaught(error)
211
0
212
0
    case let .responsePart(responsePart):
213
0
      // We tolerate errors here: an error will be thrown if the write results in an error which
214
0
      // isn't caught in the channel. Errors in the channel get funnelled into the transport held
215
0
      // by the actual call object and handled there.
216
0
      _ = try? self.channel.writeInbound(responsePart)
217
0
    }
218
0
  }
219
}
220
221
// MARK: - Unary Response
222
223
/// A fake unary response to be used with a generated test client.
224
///
225
/// Users typically create fake responses via helper methods on their generated test clients
226
/// corresponding to the RPC which they intend to test.
227
///
228
/// For unary responses users may call one of two functions for each RPC:
229
/// - `sendMessage(_:initialMetadata:trailingMetadata:status)`, or
230
/// - `sendError(status:trailingMetadata)`
231
///
232
/// `sendMessage` sends a normal unary response with the provided message and allows the caller to
233
/// also specify initial metadata, trailing metadata and the status. Both metadata arguments are
234
/// empty by default and the status defaults to one with an 'ok' status code.
235
///
236
/// `sendError` may be used to terminate an RPC without providing a response. As for `sendMessage`,
237
/// the `trailingMetadata` defaults to being empty.
238
public class FakeUnaryResponse<Request, Response>: _FakeResponseStream<Request, Response> {
239
0
  override public init(requestHandler: @escaping (FakeRequestPart<Request>) -> Void = { _ in }) {
240
0
    super.init(requestHandler: requestHandler)
241
0
  }
242
243
  /// Send a response message to the client.
244
  ///
245
  /// - Parameters:
246
  ///   - response: The message to send.
247
  ///   - initialMetadata: The initial metadata to send. By default the metadata will be empty.
248
  ///   - trailingMetadata: The trailing metadata to send. By default the metadata will be empty.
249
  ///   - status: The status to send. By default this has an '.ok' status code.
250
  /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC
251
  ///   protocol, e.g. sending messages after the RPC has ended.
252
  public func sendMessage(
253
    _ response: Response,
254
    initialMetadata: HPACKHeaders = [:],
255
    trailingMetadata: HPACKHeaders = [:],
256
    status: GRPCStatus = .ok
257
0
  ) throws {
258
0
    try self._sendResponsePart(.initialMetadata(initialMetadata))
259
0
    try self._sendResponsePart(.message(.init(response, compressed: false)))
260
0
    try self._sendResponsePart(.trailingMetadata(trailingMetadata))
261
0
    try self._sendResponsePart(.status(status))
262
0
  }
263
264
  /// Send an error to the client.
265
  ///
266
  /// - Parameters:
267
  ///   - error: The error to send.
268
  ///   - trailingMetadata: The trailing metadata to send. By default the metadata will be empty.
269
0
  public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws {
270
0
    try self._sendResponsePart(.trailingMetadata(trailingMetadata))
271
0
    try self._sendError(error)
272
0
  }
273
}
274
275
// MARK: - Streaming Response
276
277
/// A fake streaming response to be used with a generated test client.
278
///
279
/// Users typically create fake responses via helper methods on their generated test clients
280
/// corresponding to the RPC which they intend to test.
281
///
282
/// For streaming responses users have a number of methods available to them:
283
/// - `sendInitialMetadata(_:)`
284
/// - `sendMessage(_:)`
285
/// - `sendEnd(status:trailingMetadata:)`
286
/// - `sendError(_:trailingMetadata)`
287
///
288
/// `sendInitialMetadata` may be called to send initial metadata to the client, however, it
289
/// must be called first in order for the metadata to be sent. If it is not called, empty
290
/// metadata will be sent automatically if necessary.
291
///
292
/// `sendMessage` may be called to send a response message on the stream. This may be called
293
/// multiple times. Messages will be ignored if this is called after `sendEnd` or `sendError`.
294
///
295
/// `sendEnd` indicates that the response stream has closed. It – or `sendError` - must be called
296
/// once. The `status` defaults to a value with the `ok` code and `trailingMetadata` is empty by
297
/// default.
298
///
299
/// `sendError` may be called at any time to indicate an error on the response stream.
300
/// Like `sendEnd`, `trailingMetadata` is empty by default.
301
public class FakeStreamingResponse<Request, Response>: _FakeResponseStream<Request, Response> {
302
0
  override public init(requestHandler: @escaping (FakeRequestPart<Request>) -> Void = { _ in }) {
303
0
    super.init(requestHandler: requestHandler)
304
0
  }
305
306
  /// Send initial metadata to the client.
307
  ///
308
  /// Note that calling this function is not required; empty initial metadata will be sent
309
  /// automatically if necessary.
310
  ///
311
  /// - Parameter metadata: The metadata to send
312
  /// - Throws: FakeResponseProtocolViolation if sending initial metadata would violate the gRPC
313
  ///   protocol, e.g. sending metadata too many times, or out of order.
314
0
  public func sendInitialMetadata(_ metadata: HPACKHeaders) throws {
315
0
    try self._sendResponsePart(.initialMetadata(metadata))
316
0
  }
317
318
  /// Send a response message to the client.
319
  ///
320
  /// - Parameter response: The response to send.
321
  /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC
322
  ///   protocol, e.g. sending messages after the RPC has ended.
323
0
  public func sendMessage(_ response: Response) throws {
324
0
    try self._sendResponsePart(.message(.init(response, compressed: false)))
325
0
  }
326
327
  /// Send the RPC status and trailing metadata to the client.
328
  ///
329
  /// - Parameters:
330
  ///   - status: The status to send. By default the status code will be '.ok'.
331
  ///   - trailingMetadata: The trailing metadata to send. Empty by default.
332
  /// - Throws: FakeResponseProtocolViolation if ending the RPC would violate the gRPC
333
  ///   protocol, e.g. sending end after the RPC has already completed.
334
0
  public func sendEnd(status: GRPCStatus = .ok, trailingMetadata: HPACKHeaders = [:]) throws {
335
0
    try self._sendResponsePart(.trailingMetadata(trailingMetadata))
336
0
    try self._sendResponsePart(.status(status))
337
0
  }
338
339
  /// Send an error to the client.
340
  ///
341
  /// - Parameters:
342
  ///   - error: The error to send.
343
  ///   - trailingMetadata: The trailing metadata to send. By default the metadata will be empty.
344
  /// - Throws: FakeResponseProtocolViolation if sending the error would violate the gRPC
345
  ///   protocol, e.g. erroring after the RPC has already completed.
346
0
  public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws {
347
0
    try self._sendResponsePart(.trailingMetadata(trailingMetadata))
348
0
    try self._sendError(error)
349
0
  }
350
}