/src/grpc-swift/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import DequeModule |
17 | | import Logging |
18 | | import NIOCore |
19 | | import NIOHPACK |
20 | | |
21 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
22 | | public struct GRPCAsyncServerHandler< |
23 | | Serializer: MessageSerializer, |
24 | | Deserializer: MessageDeserializer, |
25 | | Request: Sendable, |
26 | | Response: Sendable |
27 | | >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request { |
28 | | @usableFromInline |
29 | | internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response> |
30 | | |
31 | 0 | public func receiveMetadata(_ metadata: HPACKHeaders) { |
32 | 0 | self._handler.receiveMetadata(metadata) |
33 | 0 | } |
34 | | |
35 | 0 | public func receiveMessage(_ bytes: ByteBuffer) { |
36 | 0 | self._handler.receiveMessage(bytes) |
37 | 0 | } |
38 | | |
39 | 0 | public func receiveEnd() { |
40 | 0 | self._handler.receiveEnd() |
41 | 0 | } |
42 | | |
43 | 0 | public func receiveError(_ error: Error) { |
44 | 0 | self._handler.receiveError(error) |
45 | 0 | } |
46 | | |
47 | 0 | public func finish() { |
48 | 0 | self._handler.finish() |
49 | 0 | } |
50 | | } |
51 | | |
52 | | // MARK: - RPC Adapters |
53 | | |
54 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
55 | | extension GRPCAsyncServerHandler { |
56 | | public typealias Request = Deserializer.Output |
57 | | public typealias Response = Serializer.Input |
58 | | |
59 | | @inlinable |
60 | | public init( |
61 | | context: CallHandlerContext, |
62 | | requestDeserializer: Deserializer, |
63 | | responseSerializer: Serializer, |
64 | | interceptors: [ServerInterceptor<Request, Response>], |
65 | | wrapping unary: |
66 | | @escaping @Sendable (Request, GRPCAsyncServerCallContext) async throws |
67 | | -> Response |
68 | 0 | ) { |
69 | 0 | self._handler = .init( |
70 | 0 | context: context, |
71 | 0 | requestDeserializer: requestDeserializer, |
72 | 0 | responseSerializer: responseSerializer, |
73 | 0 | callType: .unary, |
74 | 0 | interceptors: interceptors, |
75 | 0 | userHandler: { requestStream, responseStreamWriter, context in |
76 | 0 | var iterator = requestStream.makeAsyncIterator() |
77 | 0 | guard let request = try await iterator.next(), try await iterator.next() == nil else { |
78 | 0 | throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request") |
79 | 0 | } |
80 | 0 | let response = try await unary(request, context) |
81 | 0 | try await responseStreamWriter.send(response) |
82 | 0 | } |
83 | 0 | ) |
84 | 0 | } |
85 | | |
86 | | @inlinable |
87 | | public init( |
88 | | context: CallHandlerContext, |
89 | | requestDeserializer: Deserializer, |
90 | | responseSerializer: Serializer, |
91 | | interceptors: [ServerInterceptor<Request, Response>], |
92 | | wrapping clientStreaming: |
93 | | @escaping @Sendable ( |
94 | | GRPCAsyncRequestStream<Request>, |
95 | | GRPCAsyncServerCallContext |
96 | | ) async throws -> Response |
97 | 0 | ) { |
98 | 0 | self._handler = .init( |
99 | 0 | context: context, |
100 | 0 | requestDeserializer: requestDeserializer, |
101 | 0 | responseSerializer: responseSerializer, |
102 | 0 | callType: .clientStreaming, |
103 | 0 | interceptors: interceptors, |
104 | 0 | userHandler: { requestStream, responseStreamWriter, context in |
105 | 0 | let response = try await clientStreaming(requestStream, context) |
106 | 0 | try await responseStreamWriter.send(response) |
107 | 0 | } |
108 | 0 | ) |
109 | 0 | } |
110 | | |
111 | | @inlinable |
112 | | public init( |
113 | | context: CallHandlerContext, |
114 | | requestDeserializer: Deserializer, |
115 | | responseSerializer: Serializer, |
116 | | interceptors: [ServerInterceptor<Request, Response>], |
117 | | wrapping serverStreaming: |
118 | | @escaping @Sendable ( |
119 | | Request, |
120 | | GRPCAsyncResponseStreamWriter<Response>, |
121 | | GRPCAsyncServerCallContext |
122 | | ) async throws -> Void |
123 | 0 | ) { |
124 | 0 | self._handler = .init( |
125 | 0 | context: context, |
126 | 0 | requestDeserializer: requestDeserializer, |
127 | 0 | responseSerializer: responseSerializer, |
128 | 0 | callType: .serverStreaming, |
129 | 0 | interceptors: interceptors, |
130 | 0 | userHandler: { requestStream, responseStreamWriter, context in |
131 | 0 | var iterator = requestStream.makeAsyncIterator() |
132 | 0 | guard let request = try await iterator.next(), try await iterator.next() == nil else { |
133 | 0 | throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request") |
134 | 0 | } |
135 | 0 | try await serverStreaming(request, responseStreamWriter, context) |
136 | 0 | } |
137 | 0 | ) |
138 | 0 | } |
139 | | |
140 | | @inlinable |
141 | | public init( |
142 | | context: CallHandlerContext, |
143 | | requestDeserializer: Deserializer, |
144 | | responseSerializer: Serializer, |
145 | | interceptors: [ServerInterceptor<Request, Response>], |
146 | | wrapping bidirectional: |
147 | | @escaping @Sendable ( |
148 | | GRPCAsyncRequestStream<Request>, |
149 | | GRPCAsyncResponseStreamWriter<Response>, |
150 | | GRPCAsyncServerCallContext |
151 | | ) async throws -> Void |
152 | 0 | ) { |
153 | 0 | self._handler = .init( |
154 | 0 | context: context, |
155 | 0 | requestDeserializer: requestDeserializer, |
156 | 0 | responseSerializer: responseSerializer, |
157 | 0 | callType: .bidirectionalStreaming, |
158 | 0 | interceptors: interceptors, |
159 | 0 | userHandler: bidirectional |
160 | 0 | ) |
161 | 0 | } |
162 | | } |
163 | | |
164 | | // MARK: - Server Handler |
165 | | |
166 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
167 | | @usableFromInline |
168 | | internal final class AsyncServerHandler< |
169 | | Serializer: MessageSerializer, |
170 | | Deserializer: MessageDeserializer, |
171 | | Request: Sendable, |
172 | | Response: Sendable |
173 | | >: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request { |
174 | | /// A response serializer. |
175 | | @usableFromInline |
176 | | internal let serializer: Serializer |
177 | | |
178 | | /// A request deserializer. |
179 | | @usableFromInline |
180 | | internal let deserializer: Deserializer |
181 | | |
182 | | /// The event loop that this handler executes on. |
183 | | @usableFromInline |
184 | | internal let eventLoop: EventLoop |
185 | | |
186 | | /// A `ByteBuffer` allocator provided by the underlying `Channel`. |
187 | | @usableFromInline |
188 | | internal let allocator: ByteBufferAllocator |
189 | | |
190 | | /// A user-provided error delegate which, if provided, is used to transform errors and potentially |
191 | | /// pack errors into trailers. |
192 | | @usableFromInline |
193 | | internal let errorDelegate: ServerErrorDelegate? |
194 | | |
195 | | /// A logger. |
196 | | @usableFromInline |
197 | | internal let logger: Logger |
198 | | |
199 | | /// A reference to the user info. This is shared with the interceptor pipeline and may be accessed |
200 | | /// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from |
201 | | /// an appropriate event loop. |
202 | | @usableFromInline |
203 | | internal let userInfoRef: Ref<UserInfo> |
204 | | |
205 | | /// Whether compression is enabled on the server and an algorithm has been negotiated with |
206 | | /// the client |
207 | | @usableFromInline |
208 | | internal let compressionEnabledOnRPC: Bool |
209 | | |
210 | | /// Whether the RPC method would like to compress responses (if possible). Defaults to true. |
211 | | @usableFromInline |
212 | | internal var compressResponsesIfPossible: Bool |
213 | | |
214 | | /// The interceptor pipeline does not track flushing as a separate event. The flush decision is |
215 | | /// included with metadata alongside each message. For the status and trailers the flush is |
216 | | /// implicit. For headers we track whether to flush here. |
217 | | /// |
218 | | /// In most cases the flush will be delayed until the first message is flushed and this will |
219 | | /// remain unset. However, this may be set when the server handler |
220 | | /// uses ``GRPCAsyncServerCallContext/sendHeaders(_:)``. |
221 | | @usableFromInline |
222 | | internal var flushNextHeaders: Bool |
223 | | |
224 | | /// A state machine for the interceptor pipeline. |
225 | | @usableFromInline |
226 | | internal private(set) var interceptorStateMachine: ServerInterceptorStateMachine |
227 | | /// The interceptor pipeline. |
228 | | @usableFromInline |
229 | | internal private(set) var interceptors: Optional<ServerInterceptorPipeline<Request, Response>> |
230 | | /// An object for writing intercepted responses to the channel. |
231 | | @usableFromInline |
232 | | internal private(set) var responseWriter: Optional<GRPCServerResponseWriter> |
233 | | |
234 | | /// A state machine for the user implemented function. |
235 | | @usableFromInline |
236 | | internal private(set) var handlerStateMachine: ServerHandlerStateMachine |
237 | | /// A bag of components used by the user handler. |
238 | | @usableFromInline |
239 | | internal private(set) var handlerComponents: |
240 | | Optional< |
241 | | ServerHandlerComponents< |
242 | | Request, |
243 | | Response, |
244 | | GRPCAsyncWriterSinkDelegate<(Response, Compression)> |
245 | | > |
246 | | > |
247 | | |
248 | | /// The user provided function to execute. |
249 | | @usableFromInline |
250 | | internal let userHandler: |
251 | | @Sendable ( |
252 | | GRPCAsyncRequestStream<Request>, |
253 | | GRPCAsyncResponseStreamWriter<Response>, |
254 | | GRPCAsyncServerCallContext |
255 | | ) async throws -> Void |
256 | | |
257 | | @usableFromInline |
258 | | internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer< |
259 | | Request, |
260 | | Error, |
261 | | NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, |
262 | | GRPCAsyncSequenceProducerDelegate |
263 | | > |
264 | | |
265 | | @inlinable |
266 | | internal init( |
267 | | context: CallHandlerContext, |
268 | | requestDeserializer: Deserializer, |
269 | | responseSerializer: Serializer, |
270 | | callType: GRPCCallType, |
271 | | interceptors: [ServerInterceptor<Request, Response>], |
272 | | userHandler: |
273 | | @escaping @Sendable ( |
274 | | GRPCAsyncRequestStream<Request>, |
275 | | GRPCAsyncResponseStreamWriter<Response>, |
276 | | GRPCAsyncServerCallContext |
277 | | ) async throws -> Void |
278 | 0 | ) { |
279 | 0 | self.serializer = responseSerializer |
280 | 0 | self.deserializer = requestDeserializer |
281 | 0 | self.eventLoop = context.eventLoop |
282 | 0 | self.allocator = context.allocator |
283 | 0 | self.responseWriter = context.responseWriter |
284 | 0 | self.errorDelegate = context.errorDelegate |
285 | 0 | self.compressionEnabledOnRPC = context.encoding.isEnabled |
286 | 0 | self.compressResponsesIfPossible = true |
287 | 0 | self.flushNextHeaders = false |
288 | 0 | self.logger = context.logger |
289 | 0 |
|
290 | 0 | self.userInfoRef = Ref(UserInfo()) |
291 | 0 | self.handlerStateMachine = .init() |
292 | 0 | self.handlerComponents = nil |
293 | 0 |
|
294 | 0 | self.userHandler = userHandler |
295 | 0 |
|
296 | 0 | self.interceptorStateMachine = .init() |
297 | 0 | self.interceptors = nil |
298 | 0 | self.interceptors = ServerInterceptorPipeline( |
299 | 0 | logger: context.logger, |
300 | 0 | eventLoop: context.eventLoop, |
301 | 0 | path: context.path, |
302 | 0 | callType: callType, |
303 | 0 | remoteAddress: context.remoteAddress, |
304 | 0 | userInfoRef: self.userInfoRef, |
305 | 0 | closeFuture: context.closeFuture, |
306 | 0 | interceptors: interceptors, |
307 | 0 | onRequestPart: self.receiveInterceptedPart(_:), Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerS4PartOyq0_GcAJcfu_ Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerS4PartOyq0_GcAJcfu_yA1_cfu0_ |
308 | 0 | onResponsePart: self.sendInterceptedPart(_:promise:) Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerU4PartOyq1_G_7NIOCore16EventLoopPromiseVyytGSgtcAJcfu1_ Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerU4PartOyq1_G_7NIOCore16EventLoopPromiseVyytGSgtcAJcfu1_yA1__A6_tcfu2_ |
309 | 0 | ) |
310 | 0 | } |
311 | | |
312 | | // MARK: - GRPCServerHandlerProtocol conformance |
313 | | |
314 | | @inlinable |
315 | 0 | internal func receiveMetadata(_ headers: HPACKHeaders) { |
316 | 0 | switch self.interceptorStateMachine.interceptRequestMetadata() { |
317 | 0 | case .intercept: |
318 | 0 | self.interceptors?.receive(.metadata(headers)) |
319 | 0 | case .cancel: |
320 | 0 | self.cancel(error: nil) |
321 | 0 | case .drop: |
322 | 0 | () |
323 | 0 | } |
324 | 0 | } |
325 | | |
326 | | @inlinable |
327 | 0 | internal func receiveMessage(_ bytes: ByteBuffer) { |
328 | 0 | let request: Request |
329 | 0 |
|
330 | 0 | do { |
331 | 0 | request = try self.deserializer.deserialize(byteBuffer: bytes) |
332 | 0 | } catch { |
333 | 0 | return self.cancel(error: error) |
334 | 0 | } |
335 | 0 |
|
336 | 0 | switch self.interceptorStateMachine.interceptRequestMessage() { |
337 | 0 | case .intercept: |
338 | 0 | self.interceptors?.receive(.message(request)) |
339 | 0 | case .cancel: |
340 | 0 | self.cancel(error: nil) |
341 | 0 | case .drop: |
342 | 0 | () |
343 | 0 | } |
344 | 0 | } |
345 | | |
346 | | @inlinable |
347 | 0 | internal func receiveEnd() { |
348 | 0 | switch self.interceptorStateMachine.interceptRequestEnd() { |
349 | 0 | case .intercept: |
350 | 0 | self.interceptors?.receive(.end) |
351 | 0 | case .cancel: |
352 | 0 | self.cancel(error: nil) |
353 | 0 | case .drop: |
354 | 0 | () |
355 | 0 | } |
356 | 0 | } |
357 | | |
358 | | @inlinable |
359 | 0 | internal func receiveError(_ error: Error) { |
360 | 0 | self.cancel(error: error) |
361 | 0 | } |
362 | | |
363 | | @inlinable |
364 | 0 | internal func finish() { |
365 | 0 | self.cancel(error: nil) |
366 | 0 | } |
367 | | |
368 | | @usableFromInline |
369 | 0 | internal func cancel(error: Error?) { |
370 | 0 | self.eventLoop.assertInEventLoop() |
371 | 0 |
|
372 | 0 | switch self.handlerStateMachine.cancel() { |
373 | 0 | case .cancelAndNilOutHandlerComponents: |
374 | 0 | // Cancel handler related things (task, response writer). |
375 | 0 | self.handlerComponents?.cancel() |
376 | 0 | self.handlerComponents = nil |
377 | 0 |
|
378 | 0 | // We don't distinguish between having sent the status or not; we just tell the interceptor |
379 | 0 | // state machine that we want to send a response status. It will inform us whether to |
380 | 0 | // generate and send one or not. |
381 | 0 | switch self.interceptorStateMachine.interceptedResponseStatus() { |
382 | 0 | case .forward: |
383 | 0 | let error = error ?? GRPCStatus.processingError |
384 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
385 | 0 | error, |
386 | 0 | delegate: self.errorDelegate |
387 | 0 | ) |
388 | 0 | self.responseWriter?.sendEnd(status: status, trailers: trailers, promise: nil) |
389 | 0 | case .drop, .cancel: |
390 | 0 | () |
391 | 0 | } |
392 | 0 |
|
393 | 0 | case .none: |
394 | 0 | () |
395 | 0 | } |
396 | 0 |
|
397 | 0 | switch self.interceptorStateMachine.cancel() { |
398 | 0 | case .sendStatusThenNilOutInterceptorPipeline: |
399 | 0 | self.responseWriter?.sendEnd(status: .processingError, trailers: [:], promise: nil) |
400 | 0 | fallthrough |
401 | 0 | case .nilOutInterceptorPipeline: |
402 | 0 | self.interceptors?.close() |
403 | 0 | self.interceptors = nil |
404 | 0 | self.responseWriter = nil |
405 | 0 | case .none: |
406 | 0 | () |
407 | 0 | } |
408 | 0 | } |
409 | | |
410 | | // MARK: - Interceptors to User Function |
411 | | |
412 | | @inlinable |
413 | 0 | internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) { |
414 | 0 | switch part { |
415 | 0 | case let .metadata(headers): |
416 | 0 | self.receiveInterceptedMetadata(headers) |
417 | 0 | case let .message(message): |
418 | 0 | self.receiveInterceptedMessage(message) |
419 | 0 | case .end: |
420 | 0 | self.receiveInterceptedEnd() |
421 | 0 | } |
422 | 0 | } |
423 | | |
424 | | @inlinable |
425 | 0 | internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) { |
426 | 0 | switch self.interceptorStateMachine.interceptedRequestMetadata() { |
427 | 0 | case .forward: |
428 | 0 | () // continue |
429 | 0 | case .cancel: |
430 | 0 | return self.cancel(error: nil) |
431 | 0 | case .drop: |
432 | 0 | return |
433 | 0 | } |
434 | 0 |
|
435 | 0 | switch self.handlerStateMachine.handleMetadata() { |
436 | 0 | case .invokeHandler: |
437 | 0 | // We're going to invoke the handler. We need to create a handful of things in order to do |
438 | 0 | // that: |
439 | 0 | // |
440 | 0 | // - A context which allows the handler to set response headers/trailers and provides them |
441 | 0 | // with a logger amongst other things. |
442 | 0 | // - A request source; we push request messages into this which the handler consumes via |
443 | 0 | // an async sequence. |
444 | 0 | // - An async writer and delegate. The delegate calls us back with responses. The writer is |
445 | 0 | // passed to the handler. |
446 | 0 | // |
447 | 0 | // All of these components are held in a bundle ("handler components") outside of the state |
448 | 0 | // machine. We release these when we eventually call cancel (either when we `self.cancel()` |
449 | 0 | // as a result of an error or when `self.finish()` is called). |
450 | 0 | let handlerContext = GRPCAsyncServerCallContext( |
451 | 0 | headers: headers, |
452 | 0 | logger: self.logger, |
453 | 0 | contextProvider: self |
454 | 0 | ) |
455 | 0 |
|
456 | 0 | let backpressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark( |
457 | 0 | lowWatermark: 10, |
458 | 0 | highWatermark: 50 |
459 | 0 | ) |
460 | 0 | let requestSequenceProducer = NIOThrowingAsyncSequenceProducer.makeSequence( |
461 | 0 | elementType: Request.self, |
462 | 0 | failureType: Error.self, |
463 | 0 | backPressureStrategy: backpressureStrategy, |
464 | 0 | delegate: GRPCAsyncSequenceProducerDelegate() |
465 | 0 | ) |
466 | 0 |
|
467 | 0 | let responseWriter = NIOAsyncWriter.makeWriter( |
468 | 0 | isWritable: true, |
469 | 0 | delegate: GRPCAsyncWriterSinkDelegate<(Response, Compression)>( |
470 | 0 | didYield: self.interceptResponseMessages, Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy11DequeModule0J0Vyq1__AA11CompressionVtGYbcACyxq_q0_q1_Gcfu_ Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy11DequeModule0J0Vyq1__AA11CompressionVtGYbcACyxq_q0_q1_Gcfu_yAMYbcfu0_ |
471 | 0 | didTerminate: { error in |
472 | 0 | self.interceptTermination(error) |
473 | 0 | } |
474 | 0 | ) |
475 | 0 | ) |
476 | 0 |
|
477 | 0 | // Update our state before invoke the handler. |
478 | 0 | self.handlerStateMachine.handlerInvoked(requestHeaders: headers) |
479 | 0 | self.handlerComponents = ServerHandlerComponents< |
480 | 0 | Request, |
481 | 0 | Response, |
482 | 0 | GRPCAsyncWriterSinkDelegate<(Response, Compression)> |
483 | 0 | >( |
484 | 0 | requestSource: requestSequenceProducer.source, |
485 | 0 | responseWriterSink: responseWriter.sink, |
486 | 0 | task: Task { |
487 | 0 | // We don't have a task cancellation handler here: we do it in `self.cancel()`. |
488 | 0 | await self.invokeUserHandler( |
489 | 0 | requestSequence: requestSequenceProducer, |
490 | 0 | responseWriter: responseWriter.writer, |
491 | 0 | callContext: handlerContext |
492 | 0 | ) |
493 | 0 | } |
494 | 0 | ) |
495 | 0 |
|
496 | 0 | case .cancel: |
497 | 0 | self.cancel(error: nil) |
498 | 0 | } |
499 | 0 | } |
500 | | |
501 | | @Sendable |
502 | | @usableFromInline |
503 | | internal func invokeUserHandler( |
504 | | requestSequence: AsyncSequenceProducer.NewSequence, |
505 | | responseWriter: NIOAsyncWriter< |
506 | | (Response, Compression), |
507 | | GRPCAsyncWriterSinkDelegate<(Response, Compression)> |
508 | | >, |
509 | | callContext: GRPCAsyncServerCallContext |
510 | 0 | ) async { |
511 | 0 | defer { |
512 | 0 | // It's possible the user handler completed before the end of the request stream. We |
513 | 0 | // explicitly finish it to drop any unconsumed inbound messages. |
514 | 0 | requestSequence.source.finish() |
515 | 0 | } |
516 | 0 |
|
517 | 0 | do { |
518 | 0 | let grpcRequestStream = GRPCAsyncRequestStream(requestSequence.sequence) |
519 | 0 | let grpcResponseStreamWriter = GRPCAsyncResponseStreamWriter(wrapping: responseWriter) |
520 | 0 | try await self.userHandler(grpcRequestStream, grpcResponseStreamWriter, callContext) |
521 | 0 |
|
522 | 0 | responseWriter.finish() |
523 | 0 | } catch { |
524 | 0 | responseWriter.finish(error: error) |
525 | 0 | } |
526 | 0 | } |
527 | | |
528 | | @inlinable |
529 | 0 | internal func receiveInterceptedMessage(_ request: Request) { |
530 | 0 | switch self.interceptorStateMachine.interceptedRequestMessage() { |
531 | 0 | case .forward: |
532 | 0 | switch self.handlerStateMachine.handleMessage() { |
533 | 0 | case .forward: |
534 | 0 | _ = self.handlerComponents?.requestSource.yield(request) |
535 | 0 | case .cancel: |
536 | 0 | self.cancel(error: nil) |
537 | 0 | } |
538 | 0 |
|
539 | 0 | case .cancel: |
540 | 0 | self.cancel(error: nil) |
541 | 0 |
|
542 | 0 | case .drop: |
543 | 0 | () |
544 | 0 | } |
545 | 0 | } |
546 | | |
547 | | @inlinable |
548 | 0 | internal func receiveInterceptedEnd() { |
549 | 0 | switch self.interceptorStateMachine.interceptedRequestEnd() { |
550 | 0 | case .forward: |
551 | 0 | switch self.handlerStateMachine.handleEnd() { |
552 | 0 | case .forward: |
553 | 0 | self.handlerComponents?.requestSource.finish() |
554 | 0 | case .cancel: |
555 | 0 | self.cancel(error: nil) |
556 | 0 | } |
557 | 0 | case .cancel: |
558 | 0 | self.cancel(error: nil) |
559 | 0 | case .drop: |
560 | 0 | () |
561 | 0 | } |
562 | 0 | } |
563 | | |
564 | | // MARK: - User Function To Interceptors |
565 | | |
566 | | @inlinable |
567 | 0 | internal func _interceptResponseMessage(_ response: Response, compression: Compression) { |
568 | 0 | self.eventLoop.assertInEventLoop() |
569 | 0 |
|
570 | 0 | switch self.handlerStateMachine.sendMessage() { |
571 | 0 | case let .intercept(.some(headers)): |
572 | 0 | switch self.interceptorStateMachine.interceptResponseMetadata() { |
573 | 0 | case .intercept: |
574 | 0 | self.interceptors?.send(.metadata(headers), promise: nil) |
575 | 0 | case .cancel: |
576 | 0 | return self.cancel(error: nil) |
577 | 0 | case .drop: |
578 | 0 | () |
579 | 0 | } |
580 | 0 | // Fall through to the next case to send the response message. |
581 | 0 | fallthrough |
582 | 0 |
|
583 | 0 | case .intercept(.none): |
584 | 0 | switch self.interceptorStateMachine.interceptResponseMessage() { |
585 | 0 | case .intercept: |
586 | 0 | let senderWantsCompression = compression.isEnabled( |
587 | 0 | callDefault: self.compressResponsesIfPossible |
588 | 0 | ) |
589 | 0 |
|
590 | 0 | let compress = self.compressionEnabledOnRPC && senderWantsCompression |
591 | 0 |
|
592 | 0 | let metadata = MessageMetadata(compress: compress, flush: true) |
593 | 0 | self.interceptors?.send(.message(response, metadata), promise: nil) |
594 | 0 | case .cancel: |
595 | 0 | return self.cancel(error: nil) |
596 | 0 | case .drop: |
597 | 0 | () |
598 | 0 | } |
599 | 0 |
|
600 | 0 | case .drop: |
601 | 0 | () |
602 | 0 | } |
603 | 0 | } |
604 | | |
605 | | @Sendable |
606 | | @inlinable |
607 | 0 | internal func interceptResponseMessages(_ messages: Deque<(Response, Compression)>) { |
608 | 0 | if self.eventLoop.inEventLoop { |
609 | 0 | for message in messages { |
610 | 0 | self._interceptResponseMessage(message.0, compression: message.1) |
611 | 0 | } |
612 | 0 | } else { |
613 | 0 | self.eventLoop.execute { |
614 | 0 | for message in messages { |
615 | 0 | self._interceptResponseMessage(message.0, compression: message.1) |
616 | 0 | } |
617 | 0 | } |
618 | 0 | } |
619 | 0 | } |
620 | | |
621 | | @inlinable |
622 | 0 | internal func _interceptTermination(_ error: Error?) { |
623 | 0 | self.eventLoop.assertInEventLoop() |
624 | 0 |
|
625 | 0 | let processedError: Error? |
626 | 0 | if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk { |
627 | 0 | processedError = GRPCStatus( |
628 | 0 | code: .unknown, |
629 | 0 | message: "Handler threw error with status code 'ok'." |
630 | 0 | ) |
631 | 0 | } else { |
632 | 0 | processedError = error |
633 | 0 | } |
634 | 0 |
|
635 | 0 | switch self.handlerStateMachine.sendStatus() { |
636 | 0 | case let .intercept(requestHeaders, trailers): |
637 | 0 | let status: GRPCStatus |
638 | 0 | let processedTrailers: HPACKHeaders |
639 | 0 |
|
640 | 0 | if let processedError = processedError { |
641 | 0 | (status, processedTrailers) = ServerErrorProcessor.processObserverError( |
642 | 0 | processedError, |
643 | 0 | headers: requestHeaders, |
644 | 0 | trailers: trailers, |
645 | 0 | delegate: self.errorDelegate |
646 | 0 | ) |
647 | 0 | } else { |
648 | 0 | status = GRPCStatus.ok |
649 | 0 | processedTrailers = trailers |
650 | 0 | } |
651 | 0 |
|
652 | 0 | switch self.interceptorStateMachine.interceptResponseStatus() { |
653 | 0 | case .intercept: |
654 | 0 | self.interceptors?.send(.end(status, processedTrailers), promise: nil) |
655 | 0 | case .cancel: |
656 | 0 | return self.cancel(error: nil) |
657 | 0 | case .drop: |
658 | 0 | () |
659 | 0 | } |
660 | 0 |
|
661 | 0 | case .drop: |
662 | 0 | () |
663 | 0 | } |
664 | 0 | } |
665 | | |
666 | | @Sendable |
667 | | @inlinable |
668 | 0 | internal func interceptTermination(_ status: Error?) { |
669 | 0 | if self.eventLoop.inEventLoop { |
670 | 0 | self._interceptTermination(status) |
671 | 0 | } else { |
672 | 0 | self.eventLoop.execute { |
673 | 0 | self._interceptTermination(status) |
674 | 0 | } |
675 | 0 | } |
676 | 0 | } |
677 | | |
678 | | @inlinable |
679 | | internal func sendInterceptedPart( |
680 | | _ part: GRPCServerResponsePart<Response>, |
681 | | promise: EventLoopPromise<Void>? |
682 | 0 | ) { |
683 | 0 | switch part { |
684 | 0 | case let .metadata(headers): |
685 | 0 | self.sendInterceptedMetadata(headers, promise: promise) |
686 | 0 |
|
687 | 0 | case let .message(message, metadata): |
688 | 0 | do { |
689 | 0 | let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator()) |
690 | 0 | self.sendInterceptedResponse(bytes, metadata: metadata, promise: promise) |
691 | 0 | } catch { |
692 | 0 | promise?.fail(error) |
693 | 0 | self.cancel(error: error) |
694 | 0 | } |
695 | 0 |
|
696 | 0 | case let .end(status, trailers): |
697 | 0 | self.sendInterceptedStatus(status, metadata: trailers, promise: promise) |
698 | 0 | } |
699 | 0 | } |
700 | | |
701 | | @inlinable |
702 | | internal func sendInterceptedMetadata( |
703 | | _ metadata: HPACKHeaders, |
704 | | promise: EventLoopPromise<Void>? |
705 | 0 | ) { |
706 | 0 | switch self.interceptorStateMachine.interceptedResponseMetadata() { |
707 | 0 | case .forward: |
708 | 0 | if let responseWriter = self.responseWriter { |
709 | 0 | let flush = self.flushNextHeaders |
710 | 0 | self.flushNextHeaders = false |
711 | 0 | responseWriter.sendMetadata(metadata, flush: flush, promise: promise) |
712 | 0 | } else if let promise = promise { |
713 | 0 | promise.fail(GRPCStatus.processingError) |
714 | 0 | } |
715 | 0 | case .cancel: |
716 | 0 | self.cancel(error: nil) |
717 | 0 | case .drop: |
718 | 0 | () |
719 | 0 | } |
720 | 0 | } |
721 | | |
722 | | @inlinable |
723 | | internal func sendInterceptedResponse( |
724 | | _ bytes: ByteBuffer, |
725 | | metadata: MessageMetadata, |
726 | | promise: EventLoopPromise<Void>? |
727 | 0 | ) { |
728 | 0 | switch self.interceptorStateMachine.interceptedResponseMessage() { |
729 | 0 | case .forward: |
730 | 0 | if let responseWriter = self.responseWriter { |
731 | 0 | responseWriter.sendMessage(bytes, metadata: metadata, promise: promise) |
732 | 0 | } else if let promise = promise { |
733 | 0 | promise.fail(GRPCStatus.processingError) |
734 | 0 | } |
735 | 0 | case .cancel: |
736 | 0 | self.cancel(error: nil) |
737 | 0 | case .drop: |
738 | 0 | () |
739 | 0 | } |
740 | 0 | } |
741 | | |
742 | | @inlinable |
743 | | internal func sendInterceptedStatus( |
744 | | _ status: GRPCStatus, |
745 | | metadata: HPACKHeaders, |
746 | | promise: EventLoopPromise<Void>? |
747 | 0 | ) { |
748 | 0 | switch self.interceptorStateMachine.interceptedResponseStatus() { |
749 | 0 | case .forward: |
750 | 0 | if let responseWriter = self.responseWriter { |
751 | 0 | responseWriter.sendEnd(status: status, trailers: metadata, promise: promise) |
752 | 0 | } else if let promise = promise { |
753 | 0 | promise.fail(GRPCStatus.processingError) |
754 | 0 | } |
755 | 0 | case .cancel: |
756 | 0 | self.cancel(error: nil) |
757 | 0 | case .drop: |
758 | 0 | () |
759 | 0 | } |
760 | 0 | } |
761 | | } |
762 | | |
763 | | // Sendability is unchecked as all mutable state is accessed/modified from an appropriate event |
764 | | // loop. |
765 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
766 | | extension AsyncServerHandler: @unchecked Sendable {} |
767 | | |
768 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
769 | | extension AsyncServerHandler: AsyncServerCallContextProvider { |
770 | | @usableFromInline |
771 | 0 | internal func setResponseHeaders(_ headers: HPACKHeaders) async throws { |
772 | 0 | let completed = self.eventLoop.submit { |
773 | 0 | if !self.handlerStateMachine.setResponseHeaders(headers) { |
774 | 0 | throw GRPCStatus( |
775 | 0 | code: .failedPrecondition, |
776 | 0 | message: "Tried to send response headers in an invalid state" |
777 | 0 | ) |
778 | 0 | } |
779 | 0 | } |
780 | 0 | try await completed.get() |
781 | 0 | } |
782 | | |
783 | | @usableFromInline |
784 | 0 | internal func acceptRPC(_ headers: HPACKHeaders) async { |
785 | 0 | let completed = self.eventLoop.submit { |
786 | 0 | guard self.handlerStateMachine.setResponseHeaders(headers) else { return } |
787 | 0 |
|
788 | 0 | // Shh,it's a lie! We don't really have a message to send but the state machine doesn't know |
789 | 0 | // (or care) about that. It will, however, tell us if we can send the headers or not. |
790 | 0 | switch self.handlerStateMachine.sendMessage() { |
791 | 0 | case let .intercept(.some(headers)): |
792 | 0 | switch self.interceptorStateMachine.interceptResponseMetadata() { |
793 | 0 | case .intercept: |
794 | 0 | self.flushNextHeaders = true |
795 | 0 | self.interceptors?.send(.metadata(headers), promise: nil) |
796 | 0 | case .cancel: |
797 | 0 | return self.cancel(error: nil) |
798 | 0 | case .drop: |
799 | 0 | () |
800 | 0 | } |
801 | 0 |
|
802 | 0 | case .intercept(.none), .drop: |
803 | 0 | // intercept(.none) means headers have already been sent; we should never hit this because |
804 | 0 | // we guard on setting the response headers above. |
805 | 0 | () |
806 | 0 | } |
807 | 0 | } |
808 | 0 | try? await completed.get() |
809 | 0 | } |
810 | | |
811 | | @usableFromInline |
812 | 0 | internal func setResponseTrailers(_ headers: HPACKHeaders) async throws { |
813 | 0 | let completed = self.eventLoop.submit { |
814 | 0 | self.handlerStateMachine.setResponseTrailers(headers) |
815 | 0 | } |
816 | 0 | try await completed.get() |
817 | 0 | } |
818 | | |
819 | | @usableFromInline |
820 | 0 | internal func setResponseCompression(_ enabled: Bool) async throws { |
821 | 0 | let completed = self.eventLoop.submit { |
822 | 0 | self.compressResponsesIfPossible = enabled |
823 | 0 | } |
824 | 0 | try await completed.get() |
825 | 0 | } |
826 | | |
827 | | @usableFromInline |
828 | | func withUserInfo<Result: Sendable>( |
829 | | _ modify: @Sendable @escaping (UserInfo) throws -> Result |
830 | 0 | ) async throws -> Result { |
831 | 0 | let result = self.eventLoop.submit { |
832 | 0 | try modify(self.userInfoRef.value) |
833 | 0 | } |
834 | 0 | return try await result.get() |
835 | 0 | } |
836 | | |
837 | | @usableFromInline |
838 | | func withMutableUserInfo<Result: Sendable>( |
839 | | _ modify: @Sendable @escaping (inout UserInfo) throws -> Result |
840 | 0 | ) async throws -> Result { |
841 | 0 | let result = self.eventLoop.submit { |
842 | 0 | try modify(&self.userInfoRef.value) |
843 | 0 | } |
844 | 0 | return try await result.get() |
845 | 0 | } |
846 | | } |
847 | | |
848 | | /// This protocol exists so that the generic server handler can be erased from the |
849 | | /// `GRPCAsyncServerCallContext`. |
850 | | /// |
851 | | /// It provides methods which update context on the async handler by first executing onto the |
852 | | /// correct event loop. |
853 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
854 | | @usableFromInline |
855 | | protocol AsyncServerCallContextProvider: Sendable { |
856 | | func setResponseHeaders(_ headers: HPACKHeaders) async throws |
857 | | func acceptRPC(_ headers: HPACKHeaders) async |
858 | | func setResponseTrailers(_ trailers: HPACKHeaders) async throws |
859 | | func setResponseCompression(_ enabled: Bool) async throws |
860 | | |
861 | | func withUserInfo<Result: Sendable>( |
862 | | _ modify: @Sendable @escaping (UserInfo) throws -> Result |
863 | | ) async throws -> Result |
864 | | |
865 | | func withMutableUserInfo<Result: Sendable>( |
866 | | _ modify: @Sendable @escaping (inout UserInfo) throws -> Result |
867 | | ) async throws -> Result |
868 | | } |
869 | | |
870 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
871 | | @usableFromInline |
872 | | internal struct ServerHandlerComponents< |
873 | | Request: Sendable, |
874 | | Response: Sendable, |
875 | | Delegate: NIOAsyncWriterSinkDelegate |
876 | | > where Delegate.Element == (Response, Compression) { |
877 | | @usableFromInline |
878 | | internal typealias AsyncWriterSink = NIOAsyncWriter<(Response, Compression), Delegate>.Sink |
879 | | |
880 | | @usableFromInline |
881 | | internal typealias AsyncSequenceSource = NIOThrowingAsyncSequenceProducer< |
882 | | Request, |
883 | | Error, |
884 | | NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, |
885 | | GRPCAsyncSequenceProducerDelegate |
886 | | >.Source |
887 | | |
888 | | @usableFromInline |
889 | | internal let task: Task<Void, Never> |
890 | | @usableFromInline |
891 | | internal let responseWriterSink: AsyncWriterSink |
892 | | @usableFromInline |
893 | | internal let requestSource: AsyncSequenceSource |
894 | | |
895 | | @inlinable |
896 | | init( |
897 | | requestSource: AsyncSequenceSource, |
898 | | responseWriterSink: AsyncWriterSink, |
899 | | task: Task<Void, Never> |
900 | 0 | ) { |
901 | 0 | self.task = task |
902 | 0 | self.responseWriterSink = responseWriterSink |
903 | 0 | self.requestSource = requestSource |
904 | 0 | } |
905 | | |
906 | 0 | func cancel() { |
907 | 0 | // Cancel the request and response streams. |
908 | 0 | // |
909 | 0 | // The user handler is encouraged to check for cancellation, however, we should assume |
910 | 0 | // they do not. Finishing the request source stops any more requests from being delivered |
911 | 0 | // to the request stream, and finishing the writer sink will ensure no more responses are |
912 | 0 | // written. This should reduce how long the user handler runs for as it can no longer do |
913 | 0 | // anything useful. |
914 | 0 | self.requestSource.finish() |
915 | 0 | self.responseWriterSink.finish(error: CancellationError()) |
916 | 0 | self.task.cancel() |
917 | 0 | } |
918 | | } |