/src/grpc-swift/Sources/GRPC/_FakeResponseStream.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import NIOCore |
17 | | import NIOEmbedded |
18 | | import NIOHPACK |
19 | | |
20 | | public enum FakeRequestPart<Request> { |
21 | | case metadata(HPACKHeaders) |
22 | | case message(Request) |
23 | | case end |
24 | | } |
25 | | |
26 | | extension FakeRequestPart: Equatable where Request: Equatable {} |
27 | | |
28 | | /// Sending on a fake response stream would have resulted in a protocol violation (such as |
29 | | /// sending initial metadata multiple times or sending messages after the stream has closed). |
30 | | public struct FakeResponseProtocolViolation: Error, Hashable { |
31 | | /// The reason that sending the message would have resulted in a protocol violation. |
32 | | public var reason: String |
33 | | |
34 | 0 | init(_ reason: String) { |
35 | 0 | self.reason = reason |
36 | 0 | } |
37 | | } |
38 | | |
39 | | /// A fake response stream into which users may inject response parts for use in unit tests. |
40 | | /// |
41 | | /// Users may not interact with this class directly but may do so via one of its subclasses |
42 | | /// `FakeUnaryResponse` and `FakeStreamingResponse`. |
43 | | public class _FakeResponseStream<Request, Response> { |
44 | | private enum StreamEvent { |
45 | | case responsePart(_GRPCClientResponsePart<Response>) |
46 | | case error(Error) |
47 | | } |
48 | | |
49 | | /// The channel to use for communication. |
50 | | internal let channel: EmbeddedChannel |
51 | | |
52 | | /// A buffer to hold responses in before the proxy is activated. |
53 | | private var responseBuffer: CircularBuffer<StreamEvent> |
54 | | |
55 | | /// The current state of the proxy. |
56 | | private var activeState: ActiveState |
57 | | |
58 | | /// The state of sending response parts. |
59 | | private var sendState: SendState |
60 | | |
61 | | private enum ActiveState { |
62 | | case inactive |
63 | | case active |
64 | | } |
65 | | |
66 | | private enum SendState { |
67 | | // Nothing has been sent; we can send initial metadata to become 'sending' or trailing metadata |
68 | | // to start 'closing'. |
69 | | case idle |
70 | | |
71 | | // We're sending messages. We can send more messages in this state or trailing metadata to |
72 | | // transition to 'closing'. |
73 | | case sending |
74 | | |
75 | | // We're closing: we've sent trailing metadata, we may only send a status now to close. |
76 | | case closing |
77 | | |
78 | | // Closed, nothing more can be sent. |
79 | | case closed |
80 | | } |
81 | | |
82 | 0 | internal init(requestHandler: @escaping (FakeRequestPart<Request>) -> Void) { |
83 | 0 | self.activeState = .inactive |
84 | 0 | self.sendState = .idle |
85 | 0 | self.responseBuffer = CircularBuffer() |
86 | 0 | self.channel = EmbeddedChannel(handler: WriteCapturingHandler(requestHandler: requestHandler)) |
87 | 0 | } |
88 | | |
89 | | /// Activate the test proxy; this should be called |
90 | 0 | internal func activate() { |
91 | 0 | switch self.activeState { |
92 | 0 | case .inactive: |
93 | 0 | // Activate the channel. This will allow any request parts to be sent. |
94 | 0 | self.channel.pipeline.fireChannelActive() |
95 | 0 |
|
96 | 0 | // Unbuffer any response parts. |
97 | 0 | while !self.responseBuffer.isEmpty { |
98 | 0 | self.write(self.responseBuffer.removeFirst()) |
99 | 0 | } |
100 | 0 |
|
101 | 0 | // Now we're active. |
102 | 0 | self.activeState = .active |
103 | 0 |
|
104 | 0 | case .active: |
105 | 0 | () |
106 | 0 | } |
107 | 0 | } |
108 | | |
109 | | /// Write or buffer the response part, depending on the our current state. |
110 | 0 | internal func _sendResponsePart(_ part: _GRPCClientResponsePart<Response>) throws { |
111 | 0 | try self.send(.responsePart(part)) |
112 | 0 | } |
113 | | |
114 | 0 | internal func _sendError(_ error: Error) throws { |
115 | 0 | try self.send(.error(error)) |
116 | 0 | } |
117 | | |
118 | 0 | private func send(_ event: StreamEvent) throws { |
119 | 0 | switch self.validate(event) { |
120 | 0 | case .valid: |
121 | 0 | self.writeOrBuffer(event) |
122 | 0 |
|
123 | 0 | case let .validIfSentAfter(extraPart): |
124 | 0 | self.writeOrBuffer(extraPart) |
125 | 0 | self.writeOrBuffer(event) |
126 | 0 |
|
127 | 0 | case let .invalid(reason): |
128 | 0 | throw FakeResponseProtocolViolation(reason) |
129 | 0 | } |
130 | 0 | } |
131 | | |
132 | | /// Validate events the user wants to send on the stream. |
133 | 0 | private func validate(_ event: StreamEvent) -> Validation { |
134 | 0 | switch (event, self.sendState) { |
135 | 0 | case (.responsePart(.initialMetadata), .idle): |
136 | 0 | self.sendState = .sending |
137 | 0 | return .valid |
138 | 0 |
|
139 | 0 | case (.responsePart(.initialMetadata), .sending), |
140 | 0 | (.responsePart(.initialMetadata), .closing), |
141 | 0 | (.responsePart(.initialMetadata), .closed): |
142 | 0 | // We can only send initial metadata from '.idle'. |
143 | 0 | return .invalid(reason: "Initial metadata has already been sent") |
144 | 0 |
|
145 | 0 | case (.responsePart(.message), .idle): |
146 | 0 | // This is fine: we don't force the user to specify initial metadata so we send some on their |
147 | 0 | // behalf. |
148 | 0 | self.sendState = .sending |
149 | 0 | return .validIfSentAfter(.responsePart(.initialMetadata([:]))) |
150 | 0 |
|
151 | 0 | case (.responsePart(.message), .sending): |
152 | 0 | return .valid |
153 | 0 |
|
154 | 0 | case (.responsePart(.message), .closing), |
155 | 0 | (.responsePart(.message), .closed): |
156 | 0 | // We can't send messages once we're closing or closed. |
157 | 0 | return .invalid(reason: "Messages can't be sent after the stream has been closed") |
158 | 0 |
|
159 | 0 | case (.responsePart(.trailingMetadata), .idle), |
160 | 0 | (.responsePart(.trailingMetadata), .sending): |
161 | 0 | self.sendState = .closing |
162 | 0 | return .valid |
163 | 0 |
|
164 | 0 | case (.responsePart(.trailingMetadata), .closing), |
165 | 0 | (.responsePart(.trailingMetadata), .closed): |
166 | 0 | // We're already closing or closed. |
167 | 0 | return .invalid(reason: "Trailing metadata can't be sent after the stream has been closed") |
168 | 0 |
|
169 | 0 | case (.responsePart(.status), .idle), |
170 | 0 | (.error, .idle), |
171 | 0 | (.responsePart(.status), .sending), |
172 | 0 | (.error, .sending), |
173 | 0 | (.responsePart(.status), .closed), |
174 | 0 | (.error, .closed): |
175 | 0 | // We can only error/close if we're closing (i.e. have already sent trailers which we enforce |
176 | 0 | // from the API in the subclasses). |
177 | 0 | return .invalid(reason: "Status/error can only be sent after trailing metadata has been sent") |
178 | 0 |
|
179 | 0 | case (.responsePart(.status), .closing), |
180 | 0 | (.error, .closing): |
181 | 0 | self.sendState = .closed |
182 | 0 | return .valid |
183 | 0 | } |
184 | 0 | } |
185 | | |
186 | | private enum Validation { |
187 | | /// Sending the part is valid. |
188 | | case valid |
189 | | |
190 | | /// Sending the part, if it is sent after the given part. |
191 | | case validIfSentAfter(_ part: StreamEvent) |
192 | | |
193 | | /// Sending the part would be a protocol violation. |
194 | | case invalid(reason: String) |
195 | | } |
196 | | |
197 | 0 | private func writeOrBuffer(_ event: StreamEvent) { |
198 | 0 | switch self.activeState { |
199 | 0 | case .inactive: |
200 | 0 | self.responseBuffer.append(event) |
201 | 0 |
|
202 | 0 | case .active: |
203 | 0 | self.write(event) |
204 | 0 | } |
205 | 0 | } |
206 | | |
207 | 0 | private func write(_ part: StreamEvent) { |
208 | 0 | switch part { |
209 | 0 | case let .error(error): |
210 | 0 | self.channel.pipeline.fireErrorCaught(error) |
211 | 0 |
|
212 | 0 | case let .responsePart(responsePart): |
213 | 0 | // We tolerate errors here: an error will be thrown if the write results in an error which |
214 | 0 | // isn't caught in the channel. Errors in the channel get funnelled into the transport held |
215 | 0 | // by the actual call object and handled there. |
216 | 0 | _ = try? self.channel.writeInbound(responsePart) |
217 | 0 | } |
218 | 0 | } |
219 | | } |
220 | | |
221 | | // MARK: - Unary Response |
222 | | |
223 | | /// A fake unary response to be used with a generated test client. |
224 | | /// |
225 | | /// Users typically create fake responses via helper methods on their generated test clients |
226 | | /// corresponding to the RPC which they intend to test. |
227 | | /// |
228 | | /// For unary responses users may call one of two functions for each RPC: |
229 | | /// - `sendMessage(_:initialMetadata:trailingMetadata:status)`, or |
230 | | /// - `sendError(status:trailingMetadata)` |
231 | | /// |
232 | | /// `sendMessage` sends a normal unary response with the provided message and allows the caller to |
233 | | /// also specify initial metadata, trailing metadata and the status. Both metadata arguments are |
234 | | /// empty by default and the status defaults to one with an 'ok' status code. |
235 | | /// |
236 | | /// `sendError` may be used to terminate an RPC without providing a response. As for `sendMessage`, |
237 | | /// the `trailingMetadata` defaults to being empty. |
238 | | public class FakeUnaryResponse<Request, Response>: _FakeResponseStream<Request, Response> { |
239 | 0 | override public init(requestHandler: @escaping (FakeRequestPart<Request>) -> Void = { _ in }) { |
240 | 0 | super.init(requestHandler: requestHandler) |
241 | 0 | } |
242 | | |
243 | | /// Send a response message to the client. |
244 | | /// |
245 | | /// - Parameters: |
246 | | /// - response: The message to send. |
247 | | /// - initialMetadata: The initial metadata to send. By default the metadata will be empty. |
248 | | /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty. |
249 | | /// - status: The status to send. By default this has an '.ok' status code. |
250 | | /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC |
251 | | /// protocol, e.g. sending messages after the RPC has ended. |
252 | | public func sendMessage( |
253 | | _ response: Response, |
254 | | initialMetadata: HPACKHeaders = [:], |
255 | | trailingMetadata: HPACKHeaders = [:], |
256 | | status: GRPCStatus = .ok |
257 | 0 | ) throws { |
258 | 0 | try self._sendResponsePart(.initialMetadata(initialMetadata)) |
259 | 0 | try self._sendResponsePart(.message(.init(response, compressed: false))) |
260 | 0 | try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
261 | 0 | try self._sendResponsePart(.status(status)) |
262 | 0 | } |
263 | | |
264 | | /// Send an error to the client. |
265 | | /// |
266 | | /// - Parameters: |
267 | | /// - error: The error to send. |
268 | | /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty. |
269 | 0 | public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws { |
270 | 0 | try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
271 | 0 | try self._sendError(error) |
272 | 0 | } |
273 | | } |
274 | | |
275 | | // MARK: - Streaming Response |
276 | | |
277 | | /// A fake streaming response to be used with a generated test client. |
278 | | /// |
279 | | /// Users typically create fake responses via helper methods on their generated test clients |
280 | | /// corresponding to the RPC which they intend to test. |
281 | | /// |
282 | | /// For streaming responses users have a number of methods available to them: |
283 | | /// - `sendInitialMetadata(_:)` |
284 | | /// - `sendMessage(_:)` |
285 | | /// - `sendEnd(status:trailingMetadata:)` |
286 | | /// - `sendError(_:trailingMetadata)` |
287 | | /// |
288 | | /// `sendInitialMetadata` may be called to send initial metadata to the client, however, it |
289 | | /// must be called first in order for the metadata to be sent. If it is not called, empty |
290 | | /// metadata will be sent automatically if necessary. |
291 | | /// |
292 | | /// `sendMessage` may be called to send a response message on the stream. This may be called |
293 | | /// multiple times. Messages will be ignored if this is called after `sendEnd` or `sendError`. |
294 | | /// |
295 | | /// `sendEnd` indicates that the response stream has closed. It – or `sendError` - must be called |
296 | | /// once. The `status` defaults to a value with the `ok` code and `trailingMetadata` is empty by |
297 | | /// default. |
298 | | /// |
299 | | /// `sendError` may be called at any time to indicate an error on the response stream. |
300 | | /// Like `sendEnd`, `trailingMetadata` is empty by default. |
301 | | public class FakeStreamingResponse<Request, Response>: _FakeResponseStream<Request, Response> { |
302 | 0 | override public init(requestHandler: @escaping (FakeRequestPart<Request>) -> Void = { _ in }) { |
303 | 0 | super.init(requestHandler: requestHandler) |
304 | 0 | } |
305 | | |
306 | | /// Send initial metadata to the client. |
307 | | /// |
308 | | /// Note that calling this function is not required; empty initial metadata will be sent |
309 | | /// automatically if necessary. |
310 | | /// |
311 | | /// - Parameter metadata: The metadata to send |
312 | | /// - Throws: FakeResponseProtocolViolation if sending initial metadata would violate the gRPC |
313 | | /// protocol, e.g. sending metadata too many times, or out of order. |
314 | 0 | public func sendInitialMetadata(_ metadata: HPACKHeaders) throws { |
315 | 0 | try self._sendResponsePart(.initialMetadata(metadata)) |
316 | 0 | } |
317 | | |
318 | | /// Send a response message to the client. |
319 | | /// |
320 | | /// - Parameter response: The response to send. |
321 | | /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC |
322 | | /// protocol, e.g. sending messages after the RPC has ended. |
323 | 0 | public func sendMessage(_ response: Response) throws { |
324 | 0 | try self._sendResponsePart(.message(.init(response, compressed: false))) |
325 | 0 | } |
326 | | |
327 | | /// Send the RPC status and trailing metadata to the client. |
328 | | /// |
329 | | /// - Parameters: |
330 | | /// - status: The status to send. By default the status code will be '.ok'. |
331 | | /// - trailingMetadata: The trailing metadata to send. Empty by default. |
332 | | /// - Throws: FakeResponseProtocolViolation if ending the RPC would violate the gRPC |
333 | | /// protocol, e.g. sending end after the RPC has already completed. |
334 | 0 | public func sendEnd(status: GRPCStatus = .ok, trailingMetadata: HPACKHeaders = [:]) throws { |
335 | 0 | try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
336 | 0 | try self._sendResponsePart(.status(status)) |
337 | 0 | } |
338 | | |
339 | | /// Send an error to the client. |
340 | | /// |
341 | | /// - Parameters: |
342 | | /// - error: The error to send. |
343 | | /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty. |
344 | | /// - Throws: FakeResponseProtocolViolation if sending the error would violate the gRPC |
345 | | /// protocol, e.g. erroring after the RPC has already completed. |
346 | 0 | public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws { |
347 | 0 | try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
348 | 0 | try self._sendError(error) |
349 | 0 | } |
350 | | } |