/src/grpc-swift/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import NIOCore |
17 | | import NIOHPACK |
18 | | |
19 | | public final class BidirectionalStreamingServerHandler< |
20 | | Serializer: MessageSerializer, |
21 | | Deserializer: MessageDeserializer |
22 | | >: GRPCServerHandlerProtocol { |
23 | | public typealias Request = Deserializer.Output |
24 | | public typealias Response = Serializer.Input |
25 | | |
26 | | /// A response serializer. |
27 | | @usableFromInline |
28 | | internal let serializer: Serializer |
29 | | |
30 | | /// A request deserializer. |
31 | | @usableFromInline |
32 | | internal let deserializer: Deserializer |
33 | | |
34 | | /// A pipeline of user provided interceptors. |
35 | | @usableFromInline |
36 | | internal var interceptors: ServerInterceptorPipeline<Request, Response>! |
37 | | |
38 | | /// Stream events which have arrived before the stream observer future has been resolved. |
39 | | @usableFromInline |
40 | 129k | internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer() |
41 | | |
42 | | /// The context required in order create the function. |
43 | | @usableFromInline |
44 | | internal let context: CallHandlerContext |
45 | | |
46 | | /// A reference to a `UserInfo`. |
47 | | @usableFromInline |
48 | | internal let userInfoRef: Ref<UserInfo> |
49 | | |
50 | | /// The user provided function to execute. |
51 | | @usableFromInline |
52 | | internal let observerFactory: |
53 | | (_StreamingResponseCallContext<Request, Response>) |
54 | | -> EventLoopFuture<(StreamEvent<Request>) -> Void> |
55 | | |
56 | | /// The state of the handler. |
57 | | @usableFromInline |
58 | 129k | internal var state: State = .idle |
59 | | |
60 | | @usableFromInline |
61 | | internal enum State { |
62 | | // No headers have been received. |
63 | | case idle |
64 | | // Headers have been received, a context has been created and the user code has been called to |
65 | | // make a stream observer with. The observer is yet to see any messages. |
66 | | case creatingObserver(_StreamingResponseCallContext<Request, Response>) |
67 | | // The observer future has resolved and the observer may have seen messages. |
68 | | case observing((StreamEvent<Request>) -> Void, _StreamingResponseCallContext<Request, Response>) |
69 | | // The observer has completed by completing the status promise. |
70 | | case completed |
71 | | } |
72 | | |
73 | | @inlinable |
74 | | public init( |
75 | | context: CallHandlerContext, |
76 | | requestDeserializer: Deserializer, |
77 | | responseSerializer: Serializer, |
78 | | interceptors: [ServerInterceptor<Request, Response>], |
79 | | observerFactory: |
80 | | @escaping (StreamingResponseCallContext<Response>) |
81 | | -> EventLoopFuture<(StreamEvent<Request>) -> Void> |
82 | 57.4k | ) { |
83 | 57.4k | self.serializer = responseSerializer |
84 | 57.4k | self.deserializer = requestDeserializer |
85 | 57.4k | self.context = context |
86 | 57.4k | self.observerFactory = observerFactory |
87 | 57.4k | |
88 | 57.4k | let userInfoRef = Ref(UserInfo()) |
89 | 57.4k | self.userInfoRef = userInfoRef |
90 | 57.4k | self.interceptors = ServerInterceptorPipeline( |
91 | 57.4k | logger: context.logger, |
92 | 57.4k | eventLoop: context.eventLoop, |
93 | 57.4k | path: context.path, |
94 | 57.4k | callType: .bidirectionalStreaming, |
95 | 57.4k | remoteAddress: context.remoteAddress, |
96 | 57.4k | userInfoRef: userInfoRef, |
97 | 57.4k | closeFuture: context.closeFuture, |
98 | 57.4k | interceptors: interceptors, |
99 | 3.92M | onRequestPart: self.receiveInterceptedPart(_:), $s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_ Line | Count | Source | 99 | 57.4k | onRequestPart: self.receiveInterceptedPart(_:), |
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA4_cfu0_ Line | Count | Source | 99 | 3.86M | onRequestPart: self.receiveInterceptedPart(_:), |
|
100 | 3.95M | onResponsePart: self.sendInterceptedPart(_:promise:) $s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_ Line | Count | Source | 100 | 57.4k | onResponsePart: self.sendInterceptedPart(_:promise:) |
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_yA4__A8_tcfu2_ Line | Count | Source | 100 | 3.89M | onResponsePart: self.sendInterceptedPart(_:promise:) |
|
101 | 57.4k | ) |
102 | 57.4k | } |
103 | | |
104 | | // MARK: - Public API: gRPC to Handler |
105 | | |
106 | | @inlinable |
107 | 14.3k | public func receiveMetadata(_ headers: HPACKHeaders) { |
108 | 14.3k | self.interceptors.receive(.metadata(headers)) |
109 | 14.3k | } |
110 | | |
111 | | @inlinable |
112 | 950k | public func receiveMessage(_ bytes: ByteBuffer) { |
113 | 950k | do { |
114 | 950k | let message = try self.deserializer.deserialize(byteBuffer: bytes) |
115 | 944k | self.interceptors.receive(.message(message)) |
116 | 944k | } catch { |
117 | 5.73k | self.handleError(error) |
118 | 950k | } |
119 | 950k | } |
120 | | |
121 | | @inlinable |
122 | 7.20k | public func receiveEnd() { |
123 | 7.20k | self.interceptors.receive(.end) |
124 | 7.20k | } |
125 | | |
126 | | @inlinable |
127 | 1.68k | public func receiveError(_ error: Error) { |
128 | 1.68k | self.handleError(error) |
129 | 1.68k | self.finish() |
130 | 1.68k | } |
131 | | |
132 | | @inlinable |
133 | 16.0k | public func finish() { |
134 | 16.0k | switch self.state { |
135 | 16.0k | case .idle: |
136 | 0 | self.interceptors = nil |
137 | 0 | self.state = .completed |
138 | 16.0k | |
139 | 16.0k | case let .creatingObserver(context), |
140 | 0 | let .observing(_, context): |
141 | 0 | context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil)) |
142 | 0 | self.context.eventLoop.execute { |
143 | 0 | self.interceptors = nil |
144 | 0 | } |
145 | 16.0k | |
146 | 16.0k | case .completed: |
147 | 16.0k | self.interceptors = nil |
148 | 16.0k | } |
149 | 16.0k | } |
150 | | |
151 | | // MARK: - Interceptors to User Function |
152 | | |
153 | | @inlinable |
154 | 3.86M | internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) { |
155 | 3.86M | switch part { |
156 | 3.86M | case let .metadata(headers): |
157 | 57.4k | self.receiveInterceptedMetadata(headers) |
158 | 3.86M | case let .message(message): |
159 | 3.77M | self.receiveInterceptedMessage(message) |
160 | 3.86M | case .end: |
161 | 28.8k | self.receiveInterceptedEnd() |
162 | 3.86M | } |
163 | 3.86M | } |
164 | | |
165 | | @inlinable |
166 | 57.4k | internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) { |
167 | 57.4k | switch self.state { |
168 | 57.4k | case .idle: |
169 | 57.4k | // Make a context to invoke the observer block factory with. |
170 | 57.4k | let context = _StreamingResponseCallContext<Request, Response>( |
171 | 57.4k | eventLoop: self.context.eventLoop, |
172 | 57.4k | headers: headers, |
173 | 57.4k | logger: self.context.logger, |
174 | 57.4k | userInfoRef: self.userInfoRef, |
175 | 57.4k | compressionIsEnabled: self.context.encoding.isEnabled, |
176 | 57.4k | closeFuture: self.context.closeFuture, |
177 | 3.83M | sendResponse: self.interceptResponse(_:metadata:promise:) $s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_ Line | Count | Source | 177 | 57.4k | sendResponse: self.interceptResponse(_:metadata:promise:) |
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_yAI_AkPtcfu0_ Line | Count | Source | 177 | 3.77M | sendResponse: self.interceptResponse(_:metadata:promise:) |
|
178 | 57.4k | ) |
179 | 57.4k | |
180 | 57.4k | // Move to the next state. |
181 | 57.4k | self.state = .creatingObserver(context) |
182 | 57.4k | |
183 | 57.4k | // Send response headers back via the interceptors. |
184 | 57.4k | self.interceptors.send(.metadata([:]), promise: nil) |
185 | 57.4k | |
186 | 57.4k | // Register callbacks on the status future. |
187 | 114k | context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:)) $s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_ Line | Count | Source | 187 | 57.4k | context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:)) |
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_yAMcfu2_ Line | Count | Source | 187 | 57.4k | context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:)) |
|
188 | 57.4k | |
189 | 57.4k | // Make an observer block and register a completion block. |
190 | 114k | self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:)) $s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_ Line | Count | Source | 190 | 57.4k | self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:)) |
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_yAPcfu4_ Line | Count | Source | 190 | 57.4k | self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:)) |
|
191 | 57.4k | |
192 | 57.4k | case .creatingObserver, .observing: |
193 | 0 | self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC")) |
194 | 57.4k | |
195 | 57.4k | case .completed: |
196 | 0 | // We may receive headers from the interceptor pipeline if we have already finished (i.e. due |
197 | 0 | // to an error or otherwise) and an interceptor doing some async work later emitting headers. |
198 | 0 | // Dropping them is fine. |
199 | 0 | () |
200 | 57.4k | } |
201 | 57.4k | } |
202 | | |
203 | | @inlinable |
204 | 3.77M | internal func receiveInterceptedMessage(_ request: Request) { |
205 | 3.77M | switch self.state { |
206 | 3.77M | case .idle: |
207 | 0 | self.handleError(GRPCError.ProtocolViolation("Message received before headers")) |
208 | 3.77M | case .creatingObserver: |
209 | 0 | self.requestBuffer.append(.message(request)) |
210 | 3.77M | case let .observing(observer, _): |
211 | 3.77M | observer(.message(request)) |
212 | 3.77M | case .completed: |
213 | 0 | // We received a message but we're already done: this may happen if we terminate the RPC |
214 | 0 | // due to a channel error, for example. |
215 | 0 | () |
216 | 3.77M | } |
217 | 3.77M | } |
218 | | |
219 | | @inlinable |
220 | 28.8k | internal func receiveInterceptedEnd() { |
221 | 28.8k | switch self.state { |
222 | 28.8k | case .idle: |
223 | 0 | self.handleError(GRPCError.ProtocolViolation("End of stream received before headers")) |
224 | 28.8k | case .creatingObserver: |
225 | 0 | self.requestBuffer.append(.end) |
226 | 28.8k | case let .observing(observer, _): |
227 | 28.8k | observer(.end) |
228 | 28.8k | case .completed: |
229 | 0 | // We received a message but we're already done: this may happen if we terminate the RPC |
230 | 0 | // due to a channel error, for example. |
231 | 0 | () |
232 | 28.8k | } |
233 | 28.8k | } |
234 | | |
235 | | // MARK: - User Function To Interceptors |
236 | | |
237 | | @inlinable |
238 | | internal func userFunctionResolvedWithResult( |
239 | | _ result: Result<(StreamEvent<Request>) -> Void, Error> |
240 | 57.4k | ) { |
241 | 57.4k | switch self.state { |
242 | 57.4k | case .idle, .observing: |
243 | 0 | // The observer block can't resolve if it hasn't been created ('idle') and it can't be |
244 | 0 | // resolved more than once ('observing'). |
245 | 0 | preconditionFailure() |
246 | 57.4k | |
247 | 57.4k | case let .creatingObserver(context): |
248 | 57.4k | switch result { |
249 | 57.4k | case let .success(observer): |
250 | 57.4k | // We have an observer block now; unbuffer any requests. |
251 | 57.4k | self.state = .observing(observer, context) |
252 | 57.4k | while let request = self.requestBuffer.popFirst() { |
253 | 0 | observer(request) |
254 | 0 | } |
255 | 57.4k | |
256 | 57.4k | case let .failure(error): |
257 | 0 | self.handleError(error, thrownFromHandler: true) |
258 | 57.4k | } |
259 | 57.4k | |
260 | 57.4k | case .completed: |
261 | 0 | // We've already completed. That's fine. |
262 | 0 | () |
263 | 57.4k | } |
264 | 57.4k | } |
265 | | |
266 | | @inlinable |
267 | | internal func interceptResponse( |
268 | | _ response: Response, |
269 | | metadata: MessageMetadata, |
270 | | promise: EventLoopPromise<Void>? |
271 | 3.77M | ) { |
272 | 3.77M | switch self.state { |
273 | 3.77M | case .idle: |
274 | 0 | // The observer block can't end responses if it doesn't exist! |
275 | 0 | preconditionFailure() |
276 | 3.77M | |
277 | 3.77M | case .creatingObserver, .observing: |
278 | 3.77M | // The user has access to the response context before returning a future observer, |
279 | 3.77M | // so 'creatingObserver' is valid here (if a little strange). |
280 | 3.77M | self.interceptors.send(.message(response, metadata), promise: promise) |
281 | 3.77M | |
282 | 3.77M | case .completed: |
283 | 0 | promise?.fail(GRPCError.AlreadyComplete()) |
284 | 3.77M | } |
285 | 3.77M | } |
286 | | |
287 | | @inlinable |
288 | 57.4k | internal func userFunctionStatusResolved(_ result: Result<GRPCStatus, Error>) { |
289 | 57.4k | switch self.state { |
290 | 57.4k | case .idle: |
291 | 0 | // The promise can't fail before we create it. |
292 | 0 | preconditionFailure() |
293 | 57.4k | |
294 | 57.4k | // Making is possible, the user can complete the status before returning a stream handler. |
295 | 57.4k | case let .creatingObserver(context), let .observing(_, context): |
296 | 28.8k | switch result { |
297 | 28.8k | case let .success(status): |
298 | 28.8k | // We're sending end back, we're done. |
299 | 28.8k | self.state = .completed |
300 | 28.8k | self.interceptors.send(.end(status, context.trailers), promise: nil) |
301 | 28.8k | |
302 | 28.8k | case let .failure(error): |
303 | 0 | self.handleError(error, thrownFromHandler: true) |
304 | 28.8k | } |
305 | 57.4k | |
306 | 57.4k | case .completed: |
307 | 28.6k | () |
308 | 57.4k | } |
309 | 57.4k | } |
310 | | |
311 | | @inlinable |
312 | 29.6k | internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) { |
313 | 29.6k | switch self.state { |
314 | 29.6k | case .idle: |
315 | 0 | assert(!isHandlerError) |
316 | 0 | self.state = .completed |
317 | 0 | // We don't have a promise to fail. Just send back end. |
318 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
319 | 0 | error, |
320 | 0 | delegate: self.context.errorDelegate |
321 | 0 | ) |
322 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
323 | 29.6k | |
324 | 29.6k | case let .creatingObserver(context), |
325 | 28.6k | let .observing(_, context): |
326 | 28.6k | // We don't have a promise to fail. Just send back end. |
327 | 28.6k | self.state = .completed |
328 | 28.6k | |
329 | 28.6k | let status: GRPCStatus |
330 | 28.6k | let trailers: HPACKHeaders |
331 | 28.6k | |
332 | 28.6k | if isHandlerError { |
333 | 0 | (status, trailers) = ServerErrorProcessor.processObserverError( |
334 | 0 | error, |
335 | 0 | headers: context.headers, |
336 | 0 | trailers: context.trailers, |
337 | 0 | delegate: self.context.errorDelegate |
338 | 0 | ) |
339 | 28.6k | } else { |
340 | 28.6k | (status, trailers) = ServerErrorProcessor.processLibraryError( |
341 | 28.6k | error, |
342 | 28.6k | delegate: self.context.errorDelegate |
343 | 28.6k | ) |
344 | 28.6k | } |
345 | 28.6k | |
346 | 28.6k | self.interceptors.send(.end(status, trailers), promise: nil) |
347 | 28.6k | // We're already in the 'completed' state so failing the promise will be a no-op in the |
348 | 28.6k | // callback to 'userHandlerCompleted' (but we also need to avoid leaking the promise.) |
349 | 28.6k | context.statusPromise.fail(error) |
350 | 29.6k | |
351 | 29.6k | case .completed: |
352 | 1.02k | () |
353 | 29.6k | } |
354 | 29.6k | } |
355 | | |
356 | | @inlinable |
357 | | internal func sendInterceptedPart( |
358 | | _ part: GRPCServerResponsePart<Response>, |
359 | | promise: EventLoopPromise<Void>? |
360 | 3.89M | ) { |
361 | 3.89M | switch part { |
362 | 3.89M | case let .metadata(headers): |
363 | 57.4k | self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise) |
364 | 3.89M | |
365 | 3.89M | case let .message(message, metadata): |
366 | 3.77M | do { |
367 | 3.77M | let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator()) |
368 | 3.77M | self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise) |
369 | 3.77M | } catch { |
370 | 0 | // Serialization failed: fail the promise and send end. |
371 | 0 | promise?.fail(error) |
372 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
373 | 0 | error, |
374 | 0 | delegate: self.context.errorDelegate |
375 | 0 | ) |
376 | 0 | // Loop back via the interceptors. |
377 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
378 | 0 | } |
379 | 3.89M | |
380 | 3.89M | case let .end(status, trailers): |
381 | 57.4k | self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise) |
382 | 3.89M | } |
383 | 3.89M | } |
384 | | } |