/src/grpc-swift/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import NIOCore |
17 | | import NIOHPACK |
18 | | |
19 | | public final class BidirectionalStreamingServerHandler< |
20 | | Serializer: MessageSerializer, |
21 | | Deserializer: MessageDeserializer |
22 | | >: GRPCServerHandlerProtocol { |
23 | | public typealias Request = Deserializer.Output |
24 | | public typealias Response = Serializer.Input |
25 | | |
26 | | /// A response serializer. |
27 | | @usableFromInline |
28 | | internal let serializer: Serializer |
29 | | |
30 | | /// A request deserializer. |
31 | | @usableFromInline |
32 | | internal let deserializer: Deserializer |
33 | | |
34 | | /// A pipeline of user provided interceptors. |
35 | | @usableFromInline |
36 | | internal var interceptors: ServerInterceptorPipeline<Request, Response>! |
37 | | |
38 | | /// Stream events which have arrived before the stream observer future has been resolved. |
39 | | @usableFromInline |
40 | 216k | internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer() |
41 | | |
42 | | /// The context required in order create the function. |
43 | | @usableFromInline |
44 | | internal let context: CallHandlerContext |
45 | | |
46 | | /// A reference to a `UserInfo`. |
47 | | @usableFromInline |
48 | | internal let userInfoRef: Ref<UserInfo> |
49 | | |
50 | | /// The user provided function to execute. |
51 | | @usableFromInline |
52 | | internal let observerFactory: |
53 | | (_StreamingResponseCallContext<Request, Response>) |
54 | | -> EventLoopFuture<(StreamEvent<Request>) -> Void> |
55 | | |
56 | | /// The state of the handler. |
57 | | @usableFromInline |
58 | 216k | internal var state: State = .idle |
59 | | |
60 | | @usableFromInline |
61 | | internal enum State { |
62 | | // No headers have been received. |
63 | | case idle |
64 | | // Headers have been received, a context has been created and the user code has been called to |
65 | | // make a stream observer with. The observer is yet to see any messages. |
66 | | case creatingObserver(_StreamingResponseCallContext<Request, Response>) |
67 | | // The observer future has resolved and the observer may have seen messages. |
68 | | case observing((StreamEvent<Request>) -> Void, _StreamingResponseCallContext<Request, Response>) |
69 | | // The observer has completed by completing the status promise. |
70 | | case completed |
71 | | } |
72 | | |
73 | | @inlinable |
74 | | public init( |
75 | | context: CallHandlerContext, |
76 | | requestDeserializer: Deserializer, |
77 | | responseSerializer: Serializer, |
78 | | interceptors: [ServerInterceptor<Request, Response>], |
79 | | observerFactory: @escaping (StreamingResponseCallContext<Response>) |
80 | | -> EventLoopFuture<(StreamEvent<Request>) -> Void> |
81 | 99.8k | ) { |
82 | 99.8k | self.serializer = responseSerializer |
83 | 99.8k | self.deserializer = requestDeserializer |
84 | 99.8k | self.context = context |
85 | 99.8k | self.observerFactory = observerFactory |
86 | 99.8k | |
87 | 99.8k | let userInfoRef = Ref(UserInfo()) |
88 | 99.8k | self.userInfoRef = userInfoRef |
89 | 99.8k | self.interceptors = ServerInterceptorPipeline( |
90 | 99.8k | logger: context.logger, |
91 | 99.8k | eventLoop: context.eventLoop, |
92 | 99.8k | path: context.path, |
93 | 99.8k | callType: .bidirectionalStreaming, |
94 | 99.8k | remoteAddress: context.remoteAddress, |
95 | 99.8k | userInfoRef: userInfoRef, |
96 | 99.8k | closeFuture: context.closeFuture, |
97 | 99.8k | interceptors: interceptors, |
98 | 3.57M | onRequestPart: self.receiveInterceptedPart(_:), $s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_ Line | Count | Source | 98 | 99.8k | onRequestPart: self.receiveInterceptedPart(_:), |
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA4_cfu0_ Line | Count | Source | 98 | 3.47M | onRequestPart: self.receiveInterceptedPart(_:), |
|
99 | 3.65M | onResponsePart: self.sendInterceptedPart(_:promise:) $s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_ Line | Count | Source | 99 | 99.8k | onResponsePart: self.sendInterceptedPart(_:promise:) |
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_yA4__A8_tcfu2_ Line | Count | Source | 99 | 3.55M | onResponsePart: self.sendInterceptedPart(_:promise:) |
|
100 | 99.8k | ) |
101 | 99.8k | } |
102 | | |
103 | | // MARK: - Public API: gRPC to Handler |
104 | | |
105 | | @inlinable |
106 | 30.1k | public func receiveMetadata(_ headers: HPACKHeaders) { |
107 | 30.1k | self.interceptors.receive(.metadata(headers)) |
108 | 30.1k | } |
109 | | |
110 | | @inlinable |
111 | 1.11M | public func receiveMessage(_ bytes: ByteBuffer) { |
112 | 1.11M | do { |
113 | 1.11M | let message = try self.deserializer.deserialize(byteBuffer: bytes) |
114 | 1.11M | self.interceptors.receive(.message(message)) |
115 | 1.11M | } catch { |
116 | 21.7k | self.handleError(error) |
117 | 1.11M | } |
118 | 1.11M | } |
119 | | |
120 | | @inlinable |
121 | 5.45k | public func receiveEnd() { |
122 | 5.45k | self.interceptors.receive(.end) |
123 | 5.45k | } |
124 | | |
125 | | @inlinable |
126 | 3.43k | public func receiveError(_ error: Error) { |
127 | 3.43k | self.handleError(error) |
128 | 3.43k | self.finish() |
129 | 3.43k | } |
130 | | |
131 | | @inlinable |
132 | 33.5k | public func finish() { |
133 | 33.5k | switch self.state { |
134 | 33.5k | case .idle: |
135 | 0 | self.interceptors = nil |
136 | 0 | self.state = .completed |
137 | 33.5k | |
138 | 33.5k | case let .creatingObserver(context), |
139 | 0 | let .observing(_, context): |
140 | 0 | context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil)) |
141 | 0 | self.context.eventLoop.execute { |
142 | 0 | self.interceptors = nil |
143 | 0 | } |
144 | 33.5k | |
145 | 33.5k | case .completed: |
146 | 33.5k | self.interceptors = nil |
147 | 33.5k | } |
148 | 33.5k | } |
149 | | |
150 | | // MARK: - Interceptors to User Function |
151 | | |
152 | | @inlinable |
153 | 3.47M | internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) { |
154 | 3.47M | switch part { |
155 | 3.47M | case let .metadata(headers): |
156 | 99.8k | self.receiveInterceptedMetadata(headers) |
157 | 3.47M | case let .message(message): |
158 | 3.35M | self.receiveInterceptedMessage(message) |
159 | 3.47M | case .end: |
160 | 19.8k | self.receiveInterceptedEnd() |
161 | 3.47M | } |
162 | 3.47M | } |
163 | | |
164 | | @inlinable |
165 | 99.8k | internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) { |
166 | 99.8k | switch self.state { |
167 | 99.8k | case .idle: |
168 | 99.8k | // Make a context to invoke the observer block factory with. |
169 | 99.8k | let context = _StreamingResponseCallContext<Request, Response>( |
170 | 99.8k | eventLoop: self.context.eventLoop, |
171 | 99.8k | headers: headers, |
172 | 99.8k | logger: self.context.logger, |
173 | 99.8k | userInfoRef: self.userInfoRef, |
174 | 99.8k | compressionIsEnabled: self.context.encoding.isEnabled, |
175 | 99.8k | closeFuture: self.context.closeFuture, |
176 | 3.45M | sendResponse: self.interceptResponse(_:metadata:promise:) $s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_ Line | Count | Source | 176 | 99.8k | sendResponse: self.interceptResponse(_:metadata:promise:) |
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_yAI_AkPtcfu0_ Line | Count | Source | 176 | 3.35M | sendResponse: self.interceptResponse(_:metadata:promise:) |
|
177 | 99.8k | ) |
178 | 99.8k | |
179 | 99.8k | // Move to the next state. |
180 | 99.8k | self.state = .creatingObserver(context) |
181 | 99.8k | |
182 | 99.8k | // Send response headers back via the interceptors. |
183 | 99.8k | self.interceptors.send(.metadata([:]), promise: nil) |
184 | 99.8k | |
185 | 99.8k | // Register callbacks on the status future. |
186 | 199k | context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:)) $s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_ Line | Count | Source | 186 | 99.8k | context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:)) |
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_yAMcfu2_ Line | Count | Source | 186 | 99.8k | context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:)) |
|
187 | 99.8k | |
188 | 99.8k | // Make an observer block and register a completion block. |
189 | 199k | self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:)) $s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_ Line | Count | Source | 189 | 99.8k | self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:)) |
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_yAPcfu4_ Line | Count | Source | 189 | 99.8k | self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:)) |
|
190 | 99.8k | |
191 | 99.8k | case .creatingObserver, .observing: |
192 | 0 | self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC")) |
193 | 99.8k | |
194 | 99.8k | case .completed: |
195 | 0 | // We may receive headers from the interceptor pipeline if we have already finished (i.e. due |
196 | 0 | // to an error or otherwise) and an interceptor doing some async work later emitting headers. |
197 | 0 | // Dropping them is fine. |
198 | 0 | () |
199 | 99.8k | } |
200 | 99.8k | } |
201 | | |
202 | | @inlinable |
203 | 3.35M | internal func receiveInterceptedMessage(_ request: Request) { |
204 | 3.35M | switch self.state { |
205 | 3.35M | case .idle: |
206 | 0 | self.handleError(GRPCError.ProtocolViolation("Message received before headers")) |
207 | 3.35M | case .creatingObserver: |
208 | 0 | self.requestBuffer.append(.message(request)) |
209 | 3.35M | case let .observing(observer, _): |
210 | 3.35M | observer(.message(request)) |
211 | 3.35M | case .completed: |
212 | 0 | // We received a message but we're already done: this may happen if we terminate the RPC |
213 | 0 | // due to a channel error, for example. |
214 | 0 | () |
215 | 3.35M | } |
216 | 3.35M | } |
217 | | |
218 | | @inlinable |
219 | 19.8k | internal func receiveInterceptedEnd() { |
220 | 19.8k | switch self.state { |
221 | 19.8k | case .idle: |
222 | 0 | self.handleError(GRPCError.ProtocolViolation("End of stream received before headers")) |
223 | 19.8k | case .creatingObserver: |
224 | 0 | self.requestBuffer.append(.end) |
225 | 19.8k | case let .observing(observer, _): |
226 | 19.8k | observer(.end) |
227 | 19.8k | case .completed: |
228 | 0 | // We received a message but we're already done: this may happen if we terminate the RPC |
229 | 0 | // due to a channel error, for example. |
230 | 0 | () |
231 | 19.8k | } |
232 | 19.8k | } |
233 | | |
234 | | // MARK: - User Function To Interceptors |
235 | | |
236 | | @inlinable |
237 | | internal func userFunctionResolvedWithResult( |
238 | | _ result: Result<(StreamEvent<Request>) -> Void, Error> |
239 | 99.8k | ) { |
240 | 99.8k | switch self.state { |
241 | 99.8k | case .idle, .observing: |
242 | 0 | // The observer block can't resolve if it hasn't been created ('idle') and it can't be |
243 | 0 | // resolved more than once ('observing'). |
244 | 0 | preconditionFailure() |
245 | 99.8k | |
246 | 99.8k | case let .creatingObserver(context): |
247 | 99.8k | switch result { |
248 | 99.8k | case let .success(observer): |
249 | 99.8k | // We have an observer block now; unbuffer any requests. |
250 | 99.8k | self.state = .observing(observer, context) |
251 | 99.8k | while let request = self.requestBuffer.popFirst() { |
252 | 0 | observer(request) |
253 | 99.8k | } |
254 | 99.8k | |
255 | 99.8k | case let .failure(error): |
256 | 0 | self.handleError(error, thrownFromHandler: true) |
257 | 99.8k | } |
258 | 99.8k | |
259 | 99.8k | case .completed: |
260 | 0 | // We've already completed. That's fine. |
261 | 0 | () |
262 | 99.8k | } |
263 | 99.8k | } |
264 | | |
265 | | @inlinable |
266 | | internal func interceptResponse( |
267 | | _ response: Response, |
268 | | metadata: MessageMetadata, |
269 | | promise: EventLoopPromise<Void>? |
270 | 3.35M | ) { |
271 | 3.35M | switch self.state { |
272 | 3.35M | case .idle: |
273 | 0 | // The observer block can't end responses if it doesn't exist! |
274 | 0 | preconditionFailure() |
275 | 3.35M | |
276 | 3.35M | case .creatingObserver, .observing: |
277 | 3.35M | // The user has access to the response context before returning a future observer, |
278 | 3.35M | // so 'creatingObserver' is valid here (if a little strange). |
279 | 3.35M | self.interceptors.send(.message(response, metadata), promise: promise) |
280 | 3.35M | |
281 | 3.35M | case .completed: |
282 | 0 | promise?.fail(GRPCError.AlreadyComplete()) |
283 | 3.35M | } |
284 | 3.35M | } |
285 | | |
286 | | @inlinable |
287 | 99.8k | internal func userFunctionStatusResolved(_ result: Result<GRPCStatus, Error>) { |
288 | 99.8k | switch self.state { |
289 | 99.8k | case .idle: |
290 | 0 | // The promise can't fail before we create it. |
291 | 0 | preconditionFailure() |
292 | 99.8k | |
293 | 99.8k | // Making is possible, the user can complete the status before returning a stream handler. |
294 | 99.8k | case let .creatingObserver(context), let .observing(_, context): |
295 | 19.8k | switch result { |
296 | 19.8k | case let .success(status): |
297 | 19.8k | // We're sending end back, we're done. |
298 | 19.8k | self.state = .completed |
299 | 19.8k | self.interceptors.send(.end(status, context.trailers), promise: nil) |
300 | 19.8k | |
301 | 19.8k | case let .failure(error): |
302 | 0 | self.handleError(error, thrownFromHandler: true) |
303 | 99.8k | } |
304 | 99.8k | |
305 | 99.8k | case .completed: |
306 | 80.0k | () |
307 | 99.8k | } |
308 | 99.8k | } |
309 | | |
310 | | @inlinable |
311 | 81.4k | internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) { |
312 | 81.4k | switch self.state { |
313 | 81.4k | case .idle: |
314 | 0 | assert(!isHandlerError) |
315 | 0 | self.state = .completed |
316 | 0 | // We don't have a promise to fail. Just send back end. |
317 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
318 | 0 | error, |
319 | 0 | delegate: self.context.errorDelegate |
320 | 0 | ) |
321 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
322 | 81.4k | |
323 | 81.4k | case let .creatingObserver(context), |
324 | 80.0k | let .observing(_, context): |
325 | 80.0k | // We don't have a promise to fail. Just send back end. |
326 | 80.0k | self.state = .completed |
327 | 80.0k | |
328 | 80.0k | let status: GRPCStatus |
329 | 80.0k | let trailers: HPACKHeaders |
330 | 80.0k | |
331 | 80.0k | if isHandlerError { |
332 | 0 | (status, trailers) = ServerErrorProcessor.processObserverError( |
333 | 0 | error, |
334 | 0 | headers: context.headers, |
335 | 0 | trailers: context.trailers, |
336 | 0 | delegate: self.context.errorDelegate |
337 | 0 | ) |
338 | 80.0k | } else { |
339 | 80.0k | (status, trailers) = ServerErrorProcessor.processLibraryError( |
340 | 80.0k | error, |
341 | 80.0k | delegate: self.context.errorDelegate |
342 | 80.0k | ) |
343 | 80.0k | } |
344 | 80.0k | |
345 | 80.0k | self.interceptors.send(.end(status, trailers), promise: nil) |
346 | 80.0k | // We're already in the 'completed' state so failing the promise will be a no-op in the |
347 | 80.0k | // callback to 'userHandlerCompleted' (but we also need to avoid leaking the promise.) |
348 | 80.0k | context.statusPromise.fail(error) |
349 | 81.4k | |
350 | 81.4k | case .completed: |
351 | 1.39k | () |
352 | 81.4k | } |
353 | 81.4k | } |
354 | | |
355 | | @inlinable |
356 | | internal func sendInterceptedPart( |
357 | | _ part: GRPCServerResponsePart<Response>, |
358 | | promise: EventLoopPromise<Void>? |
359 | 3.55M | ) { |
360 | 3.55M | switch part { |
361 | 3.55M | case let .metadata(headers): |
362 | 99.8k | self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise) |
363 | 3.55M | |
364 | 3.55M | case let .message(message, metadata): |
365 | 3.35M | do { |
366 | 3.35M | let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator()) |
367 | 3.35M | self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise) |
368 | 3.35M | } catch { |
369 | 0 | // Serialization failed: fail the promise and send end. |
370 | 0 | promise?.fail(error) |
371 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
372 | 0 | error, |
373 | 0 | delegate: self.context.errorDelegate |
374 | 0 | ) |
375 | 0 | // Loop back via the interceptors. |
376 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
377 | 3.55M | } |
378 | 3.55M | |
379 | 3.55M | case let .end(status, trailers): |
380 | 99.8k | self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise) |
381 | 3.55M | } |
382 | 3.55M | } |
383 | | } |