/src/grpc-swift/Sources/GRPC/ClientCalls/Call.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | import NIOHPACK |
19 | | import NIOHTTP2 |
20 | | |
21 | | import protocol SwiftProtobuf.Message |
22 | | |
23 | | /// An object representing a single RPC from the perspective of a client. It allows the caller to |
24 | | /// send request parts, request a cancellation, and receive response parts in a provided callback. |
25 | | /// |
26 | | /// The call object sits atop an interceptor pipeline (see ``ClientInterceptor``) which allows for |
27 | | /// request and response streams to be arbitrarily transformed or observed. Requests sent via this |
28 | | /// call will traverse the pipeline before reaching the network, and responses received will |
29 | | /// traverse the pipeline having been received from the network. |
30 | | /// |
31 | | /// This object is a lower-level API than the equivalent wrapped calls (such as ``UnaryCall`` and |
32 | | /// ``BidirectionalStreamingCall``). The caller is therefore required to do more in order to use this |
33 | | /// object correctly. Callers must call ``invoke(onError:onResponsePart:)`` to start the call and ensure that the correct |
34 | | /// number of request parts are sent in the correct order (exactly one `metadata`, followed |
35 | | /// by at most one `message` for unary and server streaming calls, and any number of `message` parts |
36 | | /// for client streaming and bidirectional streaming calls. All call types must terminate their |
37 | | /// request stream by sending one `end` message. |
38 | | /// |
39 | | /// Callers are not able to create ``Call`` objects directly, rather they must be created via an |
40 | | /// object conforming to ``GRPCChannel`` such as ``ClientConnection``. |
41 | | public final class Call<Request, Response> { |
42 | | @usableFromInline |
43 | | internal enum State { |
44 | | /// Idle, waiting to be invoked. |
45 | | case idle(ClientTransportFactory<Request, Response>) |
46 | | |
47 | | /// Invoked, we have a transport on which to send requests. The transport may be closed if the |
48 | | /// RPC has already completed. |
49 | | case invoked(ClientTransport<Request, Response>) |
50 | | } |
51 | | |
52 | | /// The current state of the call. |
53 | | @usableFromInline |
54 | | internal var _state: State |
55 | | |
56 | | /// User provided interceptors for the call. |
57 | | @usableFromInline |
58 | | internal let _interceptors: [ClientInterceptor<Request, Response>] |
59 | | |
60 | | /// Whether compression is enabled on the call. |
61 | 0 | private var isCompressionEnabled: Bool { |
62 | 0 | return self.options.messageEncoding.enabledForRequests |
63 | 0 | } |
64 | | |
65 | | /// The `EventLoop` the call is being invoked on. |
66 | | public let eventLoop: EventLoop |
67 | | |
68 | | /// The path of the RPC, usually generated from a service definition, e.g. "/echo.Echo/Get". |
69 | | public let path: String |
70 | | |
71 | | /// The type of the RPC, e.g. unary, bidirectional streaming. |
72 | | public let type: GRPCCallType |
73 | | |
74 | | /// Options used to invoke the call. |
75 | | public let options: CallOptions |
76 | | |
77 | | /// A promise for the underlying `Channel`. We only allocate this if the user asks for |
78 | | /// the `Channel` and we haven't invoked the transport yet. It's a bit unfortunate. |
79 | | private var channelPromise: EventLoopPromise<Channel>? |
80 | | |
81 | | /// Returns a future for the underlying `Channel`. |
82 | 0 | internal var channel: EventLoopFuture<Channel> { |
83 | 0 | if self.eventLoop.inEventLoop { |
84 | 0 | return self._channel() |
85 | 0 | } else { |
86 | 0 | return self.eventLoop.flatSubmit { |
87 | 0 | return self._channel() |
88 | 0 | } |
89 | 0 | } |
90 | 0 | } |
91 | | |
92 | | // Calls can't be constructed directly: users must make them using a `GRPCChannel`. |
93 | | @inlinable |
94 | | internal init( |
95 | | path: String, |
96 | | type: GRPCCallType, |
97 | | eventLoop: EventLoop, |
98 | | options: CallOptions, |
99 | | interceptors: [ClientInterceptor<Request, Response>], |
100 | | transportFactory: ClientTransportFactory<Request, Response> |
101 | 0 | ) { |
102 | 0 | self.path = path |
103 | 0 | self.type = type |
104 | 0 | self.options = options |
105 | 0 | self._state = .idle(transportFactory) |
106 | 0 | self.eventLoop = eventLoop |
107 | 0 | self._interceptors = interceptors |
108 | 0 | } |
109 | | |
110 | | /// Starts the call and provides a callback which is invoked on every response part received from |
111 | | /// the server. |
112 | | /// |
113 | | /// This must be called prior to ``send(_:)`` or ``cancel()``. |
114 | | /// |
115 | | /// - Parameters: |
116 | | /// - onError: A callback invoked when an error is received. |
117 | | /// - onResponsePart: A callback which is invoked on every response part. |
118 | | /// - Important: This function should only be called once. Subsequent calls will be ignored. |
119 | | @inlinable |
120 | | public func invoke( |
121 | | onError: @escaping (Error) -> Void, |
122 | | onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void |
123 | 0 | ) { |
124 | 0 | self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC") |
125 | 0 |
|
126 | 0 | if self.eventLoop.inEventLoop { |
127 | 0 | self._invoke(onStart: {}, onError: onError, onResponsePart: onResponsePart) |
128 | 0 | } else { |
129 | 0 | self.eventLoop.execute { |
130 | 0 | self._invoke(onStart: {}, onError: onError, onResponsePart: onResponsePart) |
131 | 0 | } |
132 | 0 | } |
133 | 0 | } |
134 | | |
135 | | /// Send a request part on the RPC. |
136 | | /// - Parameters: |
137 | | /// - part: The request part to send. |
138 | | /// - promise: A promise which will be completed when the request part has been handled. |
139 | | /// - Note: Sending will always fail if ``invoke(onError:onResponsePart:)`` has not been called. |
140 | | @inlinable |
141 | 0 | public func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) { |
142 | 0 | if self.eventLoop.inEventLoop { |
143 | 0 | self._send(part, promise: promise) |
144 | 0 | } else { |
145 | 0 | self.eventLoop.execute { |
146 | 0 | self._send(part, promise: promise) |
147 | 0 | } |
148 | 0 | } |
149 | 0 | } |
150 | | |
151 | | /// Attempt to cancel the RPC. |
152 | | /// - Parameter promise: A promise which will be completed once the cancellation request has been |
153 | | /// dealt with. |
154 | | /// - Note: Cancellation will always fail if ``invoke(onError:onResponsePart:)`` has not been called. |
155 | 0 | public func cancel(promise: EventLoopPromise<Void>?) { |
156 | 0 | if self.eventLoop.inEventLoop { |
157 | 0 | self._cancel(promise: promise) |
158 | 0 | } else { |
159 | 0 | self.eventLoop.execute { |
160 | 0 | self._cancel(promise: promise) |
161 | 0 | } |
162 | 0 | } |
163 | 0 | } |
164 | | } |
165 | | |
166 | | extension Call { |
167 | | /// Send a request part on the RPC. |
168 | | /// - Parameter part: The request part to send. |
169 | | /// - Returns: A future which will be resolved when the request has been handled. |
170 | | /// - Note: Sending will always fail if ``invoke(onError:onResponsePart:)`` has not been called. |
171 | | @inlinable |
172 | 0 | public func send(_ part: GRPCClientRequestPart<Request>) -> EventLoopFuture<Void> { |
173 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
174 | 0 | self.send(part, promise: promise) |
175 | 0 | return promise.futureResult |
176 | 0 | } |
177 | | |
178 | | /// Attempt to cancel the RPC. |
179 | | /// - Note: Cancellation will always fail if ``invoke(onError:onResponsePart:)`` has not been called. |
180 | | /// - Returns: A future which will be resolved when the cancellation request has been cancelled. |
181 | 0 | public func cancel() -> EventLoopFuture<Void> { |
182 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
183 | 0 | self.cancel(promise: promise) |
184 | 0 | return promise.futureResult |
185 | 0 | } |
186 | | } |
187 | | |
188 | | extension Call { |
189 | 0 | internal func compress(_ compression: Compression) -> Bool { |
190 | 0 | return compression.isEnabled(callDefault: self.isCompressionEnabled) |
191 | 0 | } |
192 | | |
193 | | internal func sendMessages<Messages>( |
194 | | _ messages: Messages, |
195 | | compression: Compression, |
196 | | promise: EventLoopPromise<Void>? |
197 | 0 | ) where Messages: Sequence, Messages.Element == Request { |
198 | 0 | if self.eventLoop.inEventLoop { |
199 | 0 | if let promise = promise { |
200 | 0 | self._sendMessages(messages, compression: compression, promise: promise) |
201 | 0 | } else { |
202 | 0 | self._sendMessages(messages, compression: compression) |
203 | 0 | } |
204 | 0 | } else { |
205 | 0 | self.eventLoop.execute { |
206 | 0 | if let promise = promise { |
207 | 0 | self._sendMessages(messages, compression: compression, promise: promise) |
208 | 0 | } else { |
209 | 0 | self._sendMessages(messages, compression: compression) |
210 | 0 | } |
211 | 0 | } |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | | // Provide a few convenience methods we need from the wrapped call objects. |
216 | | private func _sendMessages<Messages>( |
217 | | _ messages: Messages, |
218 | | compression: Compression |
219 | 0 | ) where Messages: Sequence, Messages.Element == Request { |
220 | 0 | self.eventLoop.assertInEventLoop() |
221 | 0 | let compress = self.compress(compression) |
222 | 0 |
|
223 | 0 | var iterator = messages.makeIterator() |
224 | 0 | var maybeNext = iterator.next() |
225 | 0 | while let current = maybeNext { |
226 | 0 | let next = iterator.next() |
227 | 0 | // If there's no next message, then we'll flush. |
228 | 0 | let flush = next == nil |
229 | 0 | self._send(.message(current, .init(compress: compress, flush: flush)), promise: nil) |
230 | 0 | maybeNext = next |
231 | 0 | } |
232 | 0 | } |
233 | | |
234 | | private func _sendMessages<Messages>( |
235 | | _ messages: Messages, |
236 | | compression: Compression, |
237 | | promise: EventLoopPromise<Void> |
238 | 0 | ) where Messages: Sequence, Messages.Element == Request { |
239 | 0 | self.eventLoop.assertInEventLoop() |
240 | 0 | let compress = self.compress(compression) |
241 | 0 |
|
242 | 0 | var iterator = messages.makeIterator() |
243 | 0 | var maybeNext = iterator.next() |
244 | 0 | while let current = maybeNext { |
245 | 0 | let next = iterator.next() |
246 | 0 | let isLast = next == nil |
247 | 0 |
|
248 | 0 | // We're already on the event loop, use the `_` send. |
249 | 0 | if isLast { |
250 | 0 | // Only flush and attach the promise to the last message. |
251 | 0 | self._send(.message(current, .init(compress: compress, flush: true)), promise: promise) |
252 | 0 | } else { |
253 | 0 | self._send(.message(current, .init(compress: compress, flush: false)), promise: nil) |
254 | 0 | } |
255 | 0 |
|
256 | 0 | maybeNext = next |
257 | 0 | } |
258 | 0 | } |
259 | | } |
260 | | |
261 | | extension Call { |
262 | | /// Invoke the RPC with this response part handler. |
263 | | /// - Important: This *must* to be called from the `eventLoop`. |
264 | | @usableFromInline |
265 | | internal func _invoke( |
266 | | onStart: @escaping () -> Void, |
267 | | onError: @escaping (Error) -> Void, |
268 | | onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void |
269 | 0 | ) { |
270 | 0 | self.eventLoop.assertInEventLoop() |
271 | 0 |
|
272 | 0 | switch self._state { |
273 | 0 | case let .idle(factory): |
274 | 0 | let transport = factory.makeConfiguredTransport( |
275 | 0 | to: self.path, |
276 | 0 | for: self.type, |
277 | 0 | withOptions: self.options, |
278 | 0 | onEventLoop: self.eventLoop, |
279 | 0 | interceptedBy: self._interceptors, |
280 | 0 | onStart: onStart, |
281 | 0 | onError: onError, |
282 | 0 | onResponsePart: onResponsePart |
283 | 0 | ) |
284 | 0 | self._state = .invoked(transport) |
285 | 0 |
|
286 | 0 | case .invoked: |
287 | 0 | // We can't be invoked twice. Just ignore this. |
288 | 0 | () |
289 | 0 | } |
290 | 0 | } |
291 | | |
292 | | /// Send a request part on the transport. |
293 | | /// - Important: This *must* to be called from the `eventLoop`. |
294 | | @inlinable |
295 | 0 | internal func _send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) { |
296 | 0 | self.eventLoop.assertInEventLoop() |
297 | 0 |
|
298 | 0 | switch self._state { |
299 | 0 | case .idle: |
300 | 0 | promise?.fail(GRPCError.InvalidState("Call must be invoked before sending request parts")) |
301 | 0 |
|
302 | 0 | case let .invoked(transport): |
303 | 0 | transport.send(part, promise: promise) |
304 | 0 | } |
305 | 0 | } |
306 | | |
307 | | /// Attempt to cancel the call. |
308 | | /// - Important: This *must* to be called from the `eventLoop`. |
309 | 0 | private func _cancel(promise: EventLoopPromise<Void>?) { |
310 | 0 | self.eventLoop.assertInEventLoop() |
311 | 0 |
|
312 | 0 | switch self._state { |
313 | 0 | case .idle: |
314 | 0 | promise?.succeed(()) |
315 | 0 | self.channelPromise?.fail(GRPCStatus(code: .cancelled)) |
316 | 0 |
|
317 | 0 | case let .invoked(transport): |
318 | 0 | transport.cancel(promise: promise) |
319 | 0 | } |
320 | 0 | } |
321 | | |
322 | | /// Get the underlying `Channel` for this call. |
323 | | /// - Important: This *must* to be called from the `eventLoop`. |
324 | 0 | private func _channel() -> EventLoopFuture<Channel> { |
325 | 0 | self.eventLoop.assertInEventLoop() |
326 | 0 |
|
327 | 0 | switch (self.channelPromise, self._state) { |
328 | 0 | case let (.some(promise), .idle), |
329 | 0 | let (.some(promise), .invoked): |
330 | 0 | // We already have a promise, just use that. |
331 | 0 | return promise.futureResult |
332 | 0 |
|
333 | 0 | case (.none, .idle): |
334 | 0 | // We need to allocate a promise and ask the transport for the channel later. |
335 | 0 | let promise = self.eventLoop.makePromise(of: Channel.self) |
336 | 0 | self.channelPromise = promise |
337 | 0 | return promise.futureResult |
338 | 0 |
|
339 | 0 | case let (.none, .invoked(transport)): |
340 | 0 | // Just ask the transport. |
341 | 0 | return transport.getChannel() |
342 | 0 | } |
343 | 0 | } |
344 | | } |
345 | | |
346 | | extension Call { |
347 | | // These helpers are for our wrapping call objects (`UnaryCall`, etc.). |
348 | | |
349 | | /// Invokes the call and sends a single request. Sends the metadata, request and closes the |
350 | | /// request stream. |
351 | | /// - Parameters: |
352 | | /// - request: The request to send. |
353 | | /// - onError: A callback invoked when an error is received. |
354 | | /// - onResponsePart: A callback invoked for each response part received. |
355 | | @inlinable |
356 | | internal func invokeUnaryRequest( |
357 | | _ request: Request, |
358 | | onStart: @escaping () -> Void, |
359 | | onError: @escaping (Error) -> Void, |
360 | | onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void |
361 | 0 | ) { |
362 | 0 | if self.eventLoop.inEventLoop { |
363 | 0 | self._invokeUnaryRequest( |
364 | 0 | request: request, |
365 | 0 | onStart: onStart, |
366 | 0 | onError: onError, |
367 | 0 | onResponsePart: onResponsePart |
368 | 0 | ) |
369 | 0 | } else { |
370 | 0 | self.eventLoop.execute { |
371 | 0 | self._invokeUnaryRequest( |
372 | 0 | request: request, |
373 | 0 | onStart: onStart, |
374 | 0 | onError: onError, |
375 | 0 | onResponsePart: onResponsePart |
376 | 0 | ) |
377 | 0 | } |
378 | 0 | } |
379 | 0 | } |
380 | | |
381 | | /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send |
382 | | /// additional messages and end the stream by calling `send(_:promise:)`. |
383 | | /// - Parameters: |
384 | | /// - onError: A callback invoked when an error is received. |
385 | | /// - onResponsePart: A callback invoked for each response part received. |
386 | | @inlinable |
387 | | internal func invokeStreamingRequests( |
388 | | onStart: @escaping () -> Void, |
389 | | onError: @escaping (Error) -> Void, |
390 | | onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void |
391 | 0 | ) { |
392 | 0 | if self.eventLoop.inEventLoop { |
393 | 0 | self._invokeStreamingRequests( |
394 | 0 | onStart: onStart, |
395 | 0 | onError: onError, |
396 | 0 | onResponsePart: onResponsePart |
397 | 0 | ) |
398 | 0 | } else { |
399 | 0 | self.eventLoop.execute { |
400 | 0 | self._invokeStreamingRequests( |
401 | 0 | onStart: onStart, |
402 | 0 | onError: onError, |
403 | 0 | onResponsePart: onResponsePart |
404 | 0 | ) |
405 | 0 | } |
406 | 0 | } |
407 | 0 | } |
408 | | |
409 | | /// On-`EventLoop` implementation of `invokeUnaryRequest(request:_:)`. |
410 | | @usableFromInline |
411 | | internal func _invokeUnaryRequest( |
412 | | request: Request, |
413 | | onStart: @escaping () -> Void, |
414 | | onError: @escaping (Error) -> Void, |
415 | | onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void |
416 | 0 | ) { |
417 | 0 | self.eventLoop.assertInEventLoop() |
418 | 0 | assert(self.type == .unary || self.type == .serverStreaming) |
419 | 0 |
|
420 | 0 | self._invoke(onStart: onStart, onError: onError, onResponsePart: onResponsePart) |
421 | 0 | self._send(.metadata(self.options.customMetadata), promise: nil) |
422 | 0 | self._send( |
423 | 0 | .message(request, .init(compress: self.isCompressionEnabled, flush: false)), |
424 | 0 | promise: nil |
425 | 0 | ) |
426 | 0 | self._send(.end, promise: nil) |
427 | 0 | } |
428 | | |
429 | | /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`. |
430 | | @usableFromInline |
431 | | internal func _invokeStreamingRequests( |
432 | | onStart: @escaping () -> Void, |
433 | | onError: @escaping (Error) -> Void, |
434 | | onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void |
435 | 0 | ) { |
436 | 0 | self.eventLoop.assertInEventLoop() |
437 | 0 | assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming) |
438 | 0 |
|
439 | 0 | self._invoke(onStart: onStart, onError: onError, onResponsePart: onResponsePart) |
440 | 0 | self._send(.metadata(self.options.customMetadata), promise: nil) |
441 | 0 | } |
442 | | } |
443 | | |
444 | | // @unchecked is ok: all mutable state is accessed/modified from the appropriate event loop. |
445 | | extension Call: @unchecked Sendable where Request: Sendable, Response: Sendable {} |