/src/grpc-swift/Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import NIOCore |
17 | | import NIOHPACK |
18 | | |
19 | | public final class ServerStreamingServerHandler< |
20 | | Serializer: MessageSerializer, |
21 | | Deserializer: MessageDeserializer |
22 | | >: GRPCServerHandlerProtocol { |
23 | | public typealias Request = Deserializer.Output |
24 | | public typealias Response = Serializer.Input |
25 | | |
26 | | /// A response serializer. |
27 | | @usableFromInline |
28 | | internal let serializer: Serializer |
29 | | |
30 | | /// A request deserializer. |
31 | | @usableFromInline |
32 | | internal let deserializer: Deserializer |
33 | | |
34 | | /// A pipeline of user provided interceptors. |
35 | | @usableFromInline |
36 | | internal var interceptors: ServerInterceptorPipeline<Request, Response>! |
37 | | |
38 | | /// The context required in order create the function. |
39 | | @usableFromInline |
40 | | internal let context: CallHandlerContext |
41 | | |
42 | | /// A reference to a `UserInfo`. |
43 | | @usableFromInline |
44 | | internal let userInfoRef: Ref<UserInfo> |
45 | | |
46 | | /// The user provided function to execute. |
47 | | @usableFromInline |
48 | | internal let userFunction: |
49 | | (Request, StreamingResponseCallContext<Response>) |
50 | | -> EventLoopFuture<GRPCStatus> |
51 | | |
52 | | /// The state of the handler. |
53 | | @usableFromInline |
54 | 89.2k | internal var state: State = .idle |
55 | | |
56 | | @usableFromInline |
57 | | internal enum State { |
58 | | // Initial state. Nothing has happened yet. |
59 | | case idle |
60 | | // Headers have been received and now we're holding a context with which to invoke the user |
61 | | // function when we receive a message. |
62 | | case createdContext(_StreamingResponseCallContext<Request, Response>) |
63 | | // The user function has been invoked, we're waiting for the status promise to be completed. |
64 | | case invokedFunction(_StreamingResponseCallContext<Request, Response>) |
65 | | // The function has completed or we are no longer proceeding with execution (because of an error |
66 | | // or unexpected closure). |
67 | | case completed |
68 | | } |
69 | | |
70 | | @inlinable |
71 | | public init( |
72 | | context: CallHandlerContext, |
73 | | requestDeserializer: Deserializer, |
74 | | responseSerializer: Serializer, |
75 | | interceptors: [ServerInterceptor<Request, Response>], |
76 | | userFunction: @escaping (Request, StreamingResponseCallContext<Response>) |
77 | | -> EventLoopFuture<GRPCStatus> |
78 | 42.0k | ) { |
79 | 42.0k | self.serializer = responseSerializer |
80 | 42.0k | self.deserializer = requestDeserializer |
81 | 42.0k | self.context = context |
82 | 42.0k | self.userFunction = userFunction |
83 | 42.0k | |
84 | 42.0k | let userInfoRef = Ref(UserInfo()) |
85 | 42.0k | self.userInfoRef = userInfoRef |
86 | 42.0k | self.interceptors = ServerInterceptorPipeline( |
87 | 42.0k | logger: context.logger, |
88 | 42.0k | eventLoop: context.eventLoop, |
89 | 42.0k | path: context.path, |
90 | 42.0k | callType: .serverStreaming, |
91 | 42.0k | remoteAddress: context.remoteAddress, |
92 | 42.0k | userInfoRef: userInfoRef, |
93 | 42.0k | closeFuture: context.closeFuture, |
94 | 42.0k | interceptors: interceptors, |
95 | 113k | onRequestPart: self.receiveInterceptedPart(_:), $s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_ Line | Count | Source | 95 | 42.0k | onRequestPart: self.receiveInterceptedPart(_:), |
$s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA3_cfu0_ Line | Count | Source | 95 | 71.1k | onRequestPart: self.receiveInterceptedPart(_:), |
|
96 | 374k | onResponsePart: self.sendInterceptedPart(_:promise:) $s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA010GRPCServerW4PartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_ Line | Count | Source | 96 | 42.0k | onResponsePart: self.sendInterceptedPart(_:promise:) |
$s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA010GRPCServerW4PartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_yA3__A7_tcfu2_ Line | Count | Source | 96 | 332k | onResponsePart: self.sendInterceptedPart(_:promise:) |
|
97 | 42.0k | ) |
98 | 42.0k | } |
99 | | |
100 | | // MARK: Public API; gRPC to Handler |
101 | | |
102 | | @inlinable |
103 | 13.7k | public func receiveMetadata(_ headers: HPACKHeaders) { |
104 | 13.7k | self.interceptors.receive(.metadata(headers)) |
105 | 13.7k | } |
106 | | |
107 | | @inlinable |
108 | 8.83k | public func receiveMessage(_ bytes: ByteBuffer) { |
109 | 8.83k | do { |
110 | 8.83k | let message = try self.deserializer.deserialize(byteBuffer: bytes) |
111 | 8.83k | self.interceptors.receive(.message(message)) |
112 | 8.83k | } catch { |
113 | 2.66k | self.handleError(error) |
114 | 8.83k | } |
115 | 8.83k | } |
116 | | |
117 | | @inlinable |
118 | 3.61k | public func receiveEnd() { |
119 | 3.61k | self.interceptors.receive(.end) |
120 | 3.61k | } |
121 | | |
122 | | @inlinable |
123 | 1.68k | public func receiveError(_ error: Error) { |
124 | 1.68k | self.handleError(error) |
125 | 1.68k | self.finish() |
126 | 1.68k | } |
127 | | |
128 | | @inlinable |
129 | 15.4k | public func finish() { |
130 | 15.4k | switch self.state { |
131 | 15.4k | case .idle: |
132 | 0 | self.interceptors = nil |
133 | 0 | self.state = .completed |
134 | 15.4k | |
135 | 15.4k | case let .createdContext(context), |
136 | 0 | let .invokedFunction(context): |
137 | 0 | context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil)) |
138 | 0 | self.context.eventLoop.execute { |
139 | 0 | self.interceptors = nil |
140 | 0 | } |
141 | 15.4k | |
142 | 15.4k | case .completed: |
143 | 15.4k | self.interceptors = nil |
144 | 15.4k | } |
145 | 15.4k | } |
146 | | |
147 | | // MARK: - Interceptors to User Function |
148 | | |
149 | | @inlinable |
150 | 71.1k | internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) { |
151 | 71.1k | switch part { |
152 | 71.1k | case let .metadata(headers): |
153 | 42.0k | self.receiveInterceptedMetadata(headers) |
154 | 71.1k | case let .message(message): |
155 | 17.4k | self.receiveInterceptedMessage(message) |
156 | 71.1k | case .end: |
157 | 11.5k | self.receiveInterceptedEnd() |
158 | 71.1k | } |
159 | 71.1k | } |
160 | | |
161 | | @inlinable |
162 | 42.0k | internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) { |
163 | 42.0k | switch self.state { |
164 | 42.0k | case .idle: |
165 | 42.0k | // Make a context to invoke the observer block factory with. |
166 | 42.0k | let context = _StreamingResponseCallContext<Request, Response>( |
167 | 42.0k | eventLoop: self.context.eventLoop, |
168 | 42.0k | headers: headers, |
169 | 42.0k | logger: self.context.logger, |
170 | 42.0k | userInfoRef: self.userInfoRef, |
171 | 42.0k | compressionIsEnabled: self.context.encoding.isEnabled, |
172 | 42.0k | closeFuture: self.context.closeFuture, |
173 | 289k | sendResponse: self.interceptResponse(_:metadata:promise:) $s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageG0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_ Line | Count | Source | 173 | 42.0k | sendResponse: self.interceptResponse(_:metadata:promise:) |
$s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageG0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_yAI_AkPtcfu0_ Line | Count | Source | 173 | 247k | sendResponse: self.interceptResponse(_:metadata:promise:) |
|
174 | 42.0k | ) |
175 | 42.0k | |
176 | 42.0k | // Move to the next state. |
177 | 42.0k | self.state = .createdContext(context) |
178 | 42.0k | |
179 | 42.0k | // Register a callback on the status future. |
180 | 84.1k | context.statusPromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:)) $s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_ Line | Count | Source | 180 | 42.0k | context.statusPromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:)) |
$s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_yAMcfu2_ Line | Count | Source | 180 | 42.0k | context.statusPromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:)) |
|
181 | 42.0k | |
182 | 42.0k | // Send response headers back via the interceptors. |
183 | 42.0k | self.interceptors.send(.metadata([:]), promise: nil) |
184 | 42.0k | |
185 | 42.0k | case .createdContext, .invokedFunction: |
186 | 0 | self.handleError(GRPCError.InvalidState("Protocol violation: already received headers")) |
187 | 42.0k | |
188 | 42.0k | case .completed: |
189 | 0 | // We may receive headers from the interceptor pipeline if we have already finished (i.e. due |
190 | 0 | // to an error or otherwise) and an interceptor doing some async work later emitting headers. |
191 | 0 | // Dropping them is fine. |
192 | 0 | () |
193 | 42.0k | } |
194 | 42.0k | } |
195 | | |
196 | | @inlinable |
197 | 17.4k | internal func receiveInterceptedMessage(_ request: Request) { |
198 | 17.4k | switch self.state { |
199 | 17.4k | case .idle: |
200 | 0 | self.handleError(GRPCError.ProtocolViolation("Message received before headers")) |
201 | 17.4k | |
202 | 17.4k | case let .createdContext(context): |
203 | 17.4k | self.state = .invokedFunction(context) |
204 | 17.4k | // Complete the status promise with the function outcome. |
205 | 17.4k | context.statusPromise.completeWith(self.userFunction(request, context)) |
206 | 17.4k | |
207 | 17.4k | case .invokedFunction: |
208 | 0 | let error = GRPCError.ProtocolViolation("Multiple messages received on server streaming RPC") |
209 | 0 | self.handleError(error) |
210 | 17.4k | |
211 | 17.4k | case .completed: |
212 | 0 | // We received a message but we're already done: this may happen if we terminate the RPC |
213 | 0 | // due to a channel error, for example. |
214 | 0 | () |
215 | 17.4k | } |
216 | 17.4k | } |
217 | | |
218 | | @inlinable |
219 | 11.5k | internal func receiveInterceptedEnd() { |
220 | 11.5k | switch self.state { |
221 | 11.5k | case .idle: |
222 | 0 | self.handleError(GRPCError.ProtocolViolation("End received before headers")) |
223 | 11.5k | |
224 | 11.5k | case .createdContext: |
225 | 11.5k | self.handleError(GRPCError.ProtocolViolation("End received before message")) |
226 | 11.5k | |
227 | 11.5k | case .invokedFunction, .completed: |
228 | 0 | () |
229 | 11.5k | } |
230 | 11.5k | } |
231 | | |
232 | | // MARK: - User Function To Interceptors |
233 | | |
234 | | @inlinable |
235 | | internal func interceptResponse( |
236 | | _ response: Response, |
237 | | metadata: MessageMetadata, |
238 | | promise: EventLoopPromise<Void>? |
239 | 247k | ) { |
240 | 247k | switch self.state { |
241 | 247k | case .idle: |
242 | 0 | // The observer block can't send responses if it doesn't exist. |
243 | 0 | preconditionFailure() |
244 | 247k | |
245 | 247k | case .createdContext, .invokedFunction: |
246 | 247k | // The user has access to the response context before returning a future observer, |
247 | 247k | // so 'createdContext' is valid here (if a little strange). |
248 | 247k | self.interceptors.send(.message(response, metadata), promise: promise) |
249 | 247k | |
250 | 247k | case .completed: |
251 | 0 | promise?.fail(GRPCError.AlreadyComplete()) |
252 | 247k | } |
253 | 247k | } |
254 | | |
255 | | @inlinable |
256 | 42.0k | internal func userFunctionCompletedWithResult(_ result: Result<GRPCStatus, Error>) { |
257 | 42.0k | switch self.state { |
258 | 42.0k | case .idle: |
259 | 0 | // Invalid state: the user function can only completed if it was created. |
260 | 0 | preconditionFailure() |
261 | 42.0k | |
262 | 42.0k | case let .createdContext(context), |
263 | 17.4k | let .invokedFunction(context): |
264 | 17.4k | |
265 | 17.4k | switch result { |
266 | 17.4k | case let .success(status): |
267 | 17.4k | // We're sending end back, we're done. |
268 | 17.4k | self.state = .completed |
269 | 17.4k | self.interceptors.send(.end(status, context.trailers), promise: nil) |
270 | 17.4k | |
271 | 17.4k | case let .failure(error): |
272 | 0 | self.handleError(error, thrownFromHandler: true) |
273 | 42.0k | } |
274 | 42.0k | |
275 | 42.0k | case .completed: |
276 | 24.5k | // We've already completed. Ignore this. |
277 | 24.5k | () |
278 | 42.0k | } |
279 | 42.0k | } |
280 | | |
281 | | @inlinable |
282 | | internal func sendInterceptedPart( |
283 | | _ part: GRPCServerResponsePart<Response>, |
284 | | promise: EventLoopPromise<Void>? |
285 | 332k | ) { |
286 | 332k | switch part { |
287 | 332k | case let .metadata(headers): |
288 | 42.0k | self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise) |
289 | 332k | |
290 | 332k | case let .message(message, metadata): |
291 | 247k | do { |
292 | 247k | let bytes = try self.serializer.serialize(message, allocator: self.context.allocator) |
293 | 247k | self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise) |
294 | 247k | } catch { |
295 | 0 | // Serialization failed: fail the promise and send end. |
296 | 0 | promise?.fail(error) |
297 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
298 | 0 | error, |
299 | 0 | delegate: self.context.errorDelegate |
300 | 0 | ) |
301 | 0 | // Loop back via the interceptors. |
302 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
303 | 332k | } |
304 | 332k | |
305 | 332k | case let .end(status, trailers): |
306 | 42.0k | self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise) |
307 | 332k | } |
308 | 332k | } |
309 | | |
310 | | @inlinable |
311 | 25.4k | internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) { |
312 | 25.4k | switch self.state { |
313 | 25.4k | case .idle: |
314 | 0 | assert(!isHandlerError) |
315 | 0 | self.state = .completed |
316 | 0 | // We don't have a promise to fail. Just send back end. |
317 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
318 | 0 | error, |
319 | 0 | delegate: self.context.errorDelegate |
320 | 0 | ) |
321 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
322 | 25.4k | |
323 | 25.4k | case let .createdContext(context), |
324 | 24.5k | let .invokedFunction(context): |
325 | 24.5k | // We don't have a promise to fail. Just send back end. |
326 | 24.5k | self.state = .completed |
327 | 24.5k | |
328 | 24.5k | let status: GRPCStatus |
329 | 24.5k | let trailers: HPACKHeaders |
330 | 24.5k | |
331 | 24.5k | if isHandlerError { |
332 | 0 | (status, trailers) = ServerErrorProcessor.processObserverError( |
333 | 0 | error, |
334 | 0 | headers: context.headers, |
335 | 0 | trailers: context.trailers, |
336 | 0 | delegate: self.context.errorDelegate |
337 | 0 | ) |
338 | 24.5k | } else { |
339 | 24.5k | (status, trailers) = ServerErrorProcessor.processLibraryError( |
340 | 24.5k | error, |
341 | 24.5k | delegate: self.context.errorDelegate |
342 | 24.5k | ) |
343 | 24.5k | } |
344 | 24.5k | |
345 | 24.5k | self.interceptors.send(.end(status, trailers), promise: nil) |
346 | 24.5k | // We're already in the 'completed' state so failing the promise will be a no-op in the |
347 | 24.5k | // callback to 'userFunctionCompletedWithResult' (but we also need to avoid leaking the |
348 | 24.5k | // promise.) |
349 | 24.5k | context.statusPromise.fail(error) |
350 | 25.4k | |
351 | 25.4k | case .completed: |
352 | 897 | () |
353 | 25.4k | } |
354 | 25.4k | } |
355 | | } |