/src/grpc-swift/Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import NIOCore |
17 | | import NIOHPACK |
18 | | |
19 | | public final class UnaryServerHandler< |
20 | | Serializer: MessageSerializer, |
21 | | Deserializer: MessageDeserializer |
22 | | >: GRPCServerHandlerProtocol { |
23 | | public typealias Request = Deserializer.Output |
24 | | public typealias Response = Serializer.Input |
25 | | |
26 | | /// A response serializer. |
27 | | @usableFromInline |
28 | | internal let serializer: Serializer |
29 | | |
30 | | /// A request deserializer. |
31 | | @usableFromInline |
32 | | internal let deserializer: Deserializer |
33 | | |
34 | | /// A pipeline of user provided interceptors. |
35 | | @usableFromInline |
36 | | internal var interceptors: ServerInterceptorPipeline<Request, Response>! |
37 | | |
38 | | /// The context required in order create the function. |
39 | | @usableFromInline |
40 | | internal let context: CallHandlerContext |
41 | | |
42 | | /// A reference to a `UserInfo`. |
43 | | @usableFromInline |
44 | | internal let userInfoRef: Ref<UserInfo> |
45 | | |
46 | | /// The user provided function to execute. |
47 | | @usableFromInline |
48 | | internal let userFunction: (Request, StatusOnlyCallContext) -> EventLoopFuture<Response> |
49 | | |
50 | | /// The state of the function invocation. |
51 | | @usableFromInline |
52 | 164k | internal var state: State = .idle |
53 | | |
54 | | @usableFromInline |
55 | | internal enum State { |
56 | | // Initial state. Nothing has happened yet. |
57 | | case idle |
58 | | // Headers have been received and now we're holding a context with which to invoke the user |
59 | | // function when we receive a message. |
60 | | case createdContext(UnaryResponseCallContext<Response>) |
61 | | // The user function has been invoked, we're waiting for the response. |
62 | | case invokedFunction(UnaryResponseCallContext<Response>) |
63 | | // The function has completed or we are no longer proceeding with execution (because of an error |
64 | | // or unexpected closure). |
65 | | case completed |
66 | | } |
67 | | |
68 | | @inlinable |
69 | | public init( |
70 | | context: CallHandlerContext, |
71 | | requestDeserializer: Deserializer, |
72 | | responseSerializer: Serializer, |
73 | | interceptors: [ServerInterceptor<Request, Response>], |
74 | | userFunction: @escaping (Request, StatusOnlyCallContext) -> EventLoopFuture<Response> |
75 | 81.7k | ) { |
76 | 81.7k | self.userFunction = userFunction |
77 | 81.7k | self.serializer = responseSerializer |
78 | 81.7k | self.deserializer = requestDeserializer |
79 | 81.7k | self.context = context |
80 | 81.7k | |
81 | 81.7k | let userInfoRef = Ref(UserInfo()) |
82 | 81.7k | self.userInfoRef = userInfoRef |
83 | 81.7k | self.interceptors = ServerInterceptorPipeline( |
84 | 81.7k | logger: context.logger, |
85 | 81.7k | eventLoop: context.eventLoop, |
86 | 81.7k | path: context.path, |
87 | 81.7k | callType: .unary, |
88 | 81.7k | remoteAddress: context.remoteAddress, |
89 | 81.7k | userInfoRef: userInfoRef, |
90 | 81.7k | closeFuture: context.closeFuture, |
91 | 81.7k | interceptors: interceptors, |
92 | 215k | onRequestPart: self.receiveInterceptedPart(_:), $s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_ Line | Count | Source | 92 | 81.7k | onRequestPart: self.receiveInterceptedPart(_:), |
$s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA_cfu0_ Line | Count | Source | 92 | 133k | onRequestPart: self.receiveInterceptedPart(_:), |
|
93 | 271k | onResponsePart: self.sendInterceptedPart(_:promise:) $s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA22GRPCServerResponsePartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_ Line | Count | Source | 93 | 81.7k | onResponsePart: self.sendInterceptedPart(_:promise:) |
$s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA22GRPCServerResponsePartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_yA__A3_tcfu2_ Line | Count | Source | 93 | 189k | onResponsePart: self.sendInterceptedPart(_:promise:) |
|
94 | 81.7k | ) |
95 | 81.7k | } |
96 | | |
97 | | // MARK: - Public API: gRPC to Interceptors |
98 | | |
99 | | @inlinable |
100 | 32.0k | public func receiveMetadata(_ metadata: HPACKHeaders) { |
101 | 32.0k | self.interceptors.receive(.metadata(metadata)) |
102 | 32.0k | } |
103 | | |
104 | | @inlinable |
105 | 19.0k | public func receiveMessage(_ bytes: ByteBuffer) { |
106 | 19.0k | do { |
107 | 19.0k | let message = try self.deserializer.deserialize(byteBuffer: bytes) |
108 | 19.0k | self.interceptors.receive(.message(message)) |
109 | 19.0k | } catch { |
110 | 7.66k | self.handleError(error) |
111 | 19.0k | } |
112 | 19.0k | } |
113 | | |
114 | | @inlinable |
115 | 10.8k | public func receiveEnd() { |
116 | 10.8k | self.interceptors.receive(.end) |
117 | 10.8k | } |
118 | | |
119 | | @inlinable |
120 | 2.94k | public func receiveError(_ error: Error) { |
121 | 2.94k | self.handleError(error) |
122 | 2.94k | self.finish() |
123 | 2.94k | } |
124 | | |
125 | | @inlinable |
126 | 35.0k | public func finish() { |
127 | 35.0k | switch self.state { |
128 | 35.0k | case .idle: |
129 | 0 | self.interceptors = nil |
130 | 0 | self.state = .completed |
131 | 35.0k | |
132 | 35.0k | case let .createdContext(context), |
133 | 0 | let .invokedFunction(context): |
134 | 0 | context.responsePromise.fail(GRPCStatus(code: .unavailable, message: nil)) |
135 | 0 | self.context.eventLoop.execute { |
136 | 0 | self.interceptors = nil |
137 | 0 | } |
138 | 35.0k | |
139 | 35.0k | case .completed: |
140 | 35.0k | self.interceptors = nil |
141 | 35.0k | } |
142 | 35.0k | } |
143 | | |
144 | | // MARK: - Interceptors to User Function |
145 | | |
146 | | @inlinable |
147 | 133k | internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) { |
148 | 133k | switch part { |
149 | 133k | case let .metadata(headers): |
150 | 81.7k | self.receiveInterceptedMetadata(headers) |
151 | 133k | case let .message(message): |
152 | 26.2k | self.receiveInterceptedMessage(message) |
153 | 133k | case .end: |
154 | 25.4k | self.receiveInterceptedEnd() |
155 | 133k | } |
156 | 133k | } |
157 | | |
158 | | @inlinable |
159 | 81.7k | internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) { |
160 | 81.7k | switch self.state { |
161 | 81.7k | case .idle: |
162 | 81.7k | // Make a context to invoke the user function with. |
163 | 81.7k | let context = UnaryResponseCallContext<Response>( |
164 | 81.7k | eventLoop: self.context.eventLoop, |
165 | 81.7k | headers: headers, |
166 | 81.7k | logger: self.context.logger, |
167 | 81.7k | userInfoRef: self.userInfoRef, |
168 | 81.7k | closeFuture: self.context.closeFuture |
169 | 81.7k | ) |
170 | 81.7k | |
171 | 81.7k | // Move to the next state. |
172 | 81.7k | self.state = .createdContext(context) |
173 | 81.7k | |
174 | 81.7k | // Register a callback on the response future. The user function will complete this promise. |
175 | 163k | context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:)) $s4GRPC18UnaryServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOy5InputQzs5Error_pGcACyxq_Gcfu_ Line | Count | Source | 175 | 81.7k | context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:)) |
$s4GRPC18UnaryServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOy5InputQzs5Error_pGcACyxq_Gcfu_yAMcfu0_ Line | Count | Source | 175 | 81.7k | context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:)) |
|
176 | 81.7k | |
177 | 81.7k | // Send back response headers. |
178 | 81.7k | self.interceptors.send(.metadata([:]), promise: nil) |
179 | 81.7k | |
180 | 81.7k | case .createdContext, .invokedFunction: |
181 | 0 | self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC")) |
182 | 81.7k | |
183 | 81.7k | case .completed: |
184 | 0 | // We may receive headers from the interceptor pipeline if we have already finished (i.e. due |
185 | 0 | // to an error or otherwise) and an interceptor doing some async work later emitting headers. |
186 | 0 | // Dropping them is fine. |
187 | 0 | () |
188 | 81.7k | } |
189 | 81.7k | } |
190 | | |
191 | | @inlinable |
192 | 26.2k | internal func receiveInterceptedMessage(_ request: Request) { |
193 | 26.2k | switch self.state { |
194 | 26.2k | case .idle: |
195 | 0 | self.handleError(GRPCError.ProtocolViolation("Message received before headers")) |
196 | 26.2k | |
197 | 26.2k | case let .createdContext(context): |
198 | 26.2k | // Happy path: execute the function; complete the promise with the result. |
199 | 26.2k | self.state = .invokedFunction(context) |
200 | 26.2k | context.responsePromise.completeWith(self.userFunction(request, context)) |
201 | 26.2k | |
202 | 26.2k | case .invokedFunction: |
203 | 0 | // The function's already been invoked with a message. |
204 | 0 | self.handleError(GRPCError.ProtocolViolation("Multiple messages received on unary RPC")) |
205 | 26.2k | |
206 | 26.2k | case .completed: |
207 | 0 | // We received a message but we're already done: this may happen if we terminate the RPC |
208 | 0 | // due to a channel error, for example. |
209 | 0 | () |
210 | 26.2k | } |
211 | 26.2k | } |
212 | | |
213 | | @inlinable |
214 | 25.4k | internal func receiveInterceptedEnd() { |
215 | 25.4k | switch self.state { |
216 | 25.4k | case .idle: |
217 | 0 | self.handleError(GRPCError.ProtocolViolation("End received before headers")) |
218 | 25.4k | |
219 | 25.4k | case .createdContext: |
220 | 25.4k | self.handleError(GRPCError.ProtocolViolation("End received before message")) |
221 | 25.4k | |
222 | 25.4k | case .invokedFunction, .completed: |
223 | 0 | () |
224 | 25.4k | } |
225 | 25.4k | } |
226 | | |
227 | | // MARK: - User Function To Interceptors |
228 | | |
229 | | @inlinable |
230 | 81.7k | internal func userFunctionCompletedWithResult(_ result: Result<Response, Error>) { |
231 | 81.7k | switch self.state { |
232 | 81.7k | case .idle: |
233 | 0 | // Invalid state: the user function can only complete if it was executed. |
234 | 0 | preconditionFailure() |
235 | 81.7k | |
236 | 81.7k | // 'created' is allowed here: we may have to (and tear down) after receiving headers |
237 | 81.7k | // but before receiving a message. |
238 | 81.7k | case let .createdContext(context), |
239 | 26.2k | let .invokedFunction(context): |
240 | 26.2k | |
241 | 26.2k | switch result { |
242 | 26.2k | case let .success(response): |
243 | 26.2k | // Complete, as we're sending 'end'. |
244 | 26.2k | self.state = .completed |
245 | 26.2k | |
246 | 26.2k | // Compression depends on whether it's enabled on the server and the setting in the caller |
247 | 26.2k | // context. |
248 | 26.2k | let compress = self.context.encoding.isEnabled && context.compressionEnabled |
249 | 26.2k | let metadata = MessageMetadata(compress: compress, flush: false) |
250 | 26.2k | self.interceptors.send(.message(response, metadata), promise: nil) |
251 | 26.2k | self.interceptors.send(.end(context.responseStatus, context.trailers), promise: nil) |
252 | 26.2k | |
253 | 26.2k | case let .failure(error): |
254 | 0 | self.handleError(error, thrownFromHandler: true) |
255 | 81.7k | } |
256 | 81.7k | |
257 | 81.7k | case .completed: |
258 | 55.5k | // We've already failed. Ignore this. |
259 | 55.5k | () |
260 | 81.7k | } |
261 | 81.7k | } |
262 | | |
263 | | @inlinable |
264 | | internal func sendInterceptedPart( |
265 | | _ part: GRPCServerResponsePart<Response>, |
266 | | promise: EventLoopPromise<Void>? |
267 | 189k | ) { |
268 | 189k | switch part { |
269 | 189k | case let .metadata(headers): |
270 | 81.7k | // We can delay this flush until the end of the RPC. |
271 | 81.7k | self.context.responseWriter.sendMetadata(headers, flush: false, promise: promise) |
272 | 189k | |
273 | 189k | case let .message(message, metadata): |
274 | 26.2k | do { |
275 | 26.2k | let bytes = try self.serializer.serialize(message, allocator: self.context.allocator) |
276 | 26.2k | self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise) |
277 | 26.2k | } catch { |
278 | 0 | // Serialization failed: fail the promise and send end. |
279 | 0 | promise?.fail(error) |
280 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
281 | 0 | error, |
282 | 0 | delegate: self.context.errorDelegate |
283 | 0 | ) |
284 | 0 | // Loop back via the interceptors. |
285 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
286 | 189k | } |
287 | 189k | |
288 | 189k | case let .end(status, trailers): |
289 | 81.7k | self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise) |
290 | 189k | } |
291 | 189k | } |
292 | | |
293 | | @inlinable |
294 | 58.0k | internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) { |
295 | 58.0k | switch self.state { |
296 | 58.0k | case .idle: |
297 | 0 | assert(!isHandlerError) |
298 | 0 | self.state = .completed |
299 | 0 | // We don't have a promise to fail. Just send back end. |
300 | 0 | let (status, trailers) = ServerErrorProcessor.processLibraryError( |
301 | 0 | error, |
302 | 0 | delegate: self.context.errorDelegate |
303 | 0 | ) |
304 | 0 | self.interceptors.send(.end(status, trailers), promise: nil) |
305 | 58.0k | |
306 | 58.0k | case let .createdContext(context), |
307 | 55.5k | let .invokedFunction(context): |
308 | 55.5k | // We don't have a promise to fail. Just send back end. |
309 | 55.5k | self.state = .completed |
310 | 55.5k | |
311 | 55.5k | let status: GRPCStatus |
312 | 55.5k | let trailers: HPACKHeaders |
313 | 55.5k | |
314 | 55.5k | if isHandlerError { |
315 | 0 | (status, trailers) = ServerErrorProcessor.processObserverError( |
316 | 0 | error, |
317 | 0 | headers: context.headers, |
318 | 0 | trailers: context.trailers, |
319 | 0 | delegate: self.context.errorDelegate |
320 | 0 | ) |
321 | 55.5k | } else { |
322 | 55.5k | (status, trailers) = ServerErrorProcessor.processLibraryError( |
323 | 55.5k | error, |
324 | 55.5k | delegate: self.context.errorDelegate |
325 | 55.5k | ) |
326 | 55.5k | } |
327 | 55.5k | |
328 | 55.5k | self.interceptors.send(.end(status, trailers), promise: nil) |
329 | 55.5k | // We're already in the 'completed' state so failing the promise will be a no-op in the |
330 | 55.5k | // callback to 'userFunctionCompletedWithResult' (but we also need to avoid leaking the |
331 | 55.5k | // promise.) |
332 | 55.5k | context.responsePromise.fail(error) |
333 | 58.0k | |
334 | 58.0k | case .completed: |
335 | 2.50k | () |
336 | 58.0k | } |
337 | 58.0k | } |
338 | | } |