/src/grpc-swift/Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | import NIOHPACK |
19 | | import NIOHTTP2 |
20 | | |
21 | | struct HTTP2ToRawGRPCStateMachine { |
22 | | /// The current state. |
23 | 45.8k | private var state: State = .requestIdleResponseIdle |
24 | | } |
25 | | |
26 | | extension HTTP2ToRawGRPCStateMachine { |
27 | | enum State { |
28 | | // Both peers are idle. Nothing has happened to the stream. |
29 | | case requestIdleResponseIdle |
30 | | |
31 | | // Received valid headers. Nothing has been sent in response. |
32 | | case requestOpenResponseIdle(RequestOpenResponseIdleState) |
33 | | |
34 | | // Received valid headers and request(s). Response headers have been sent. |
35 | | case requestOpenResponseOpen(RequestOpenResponseOpenState) |
36 | | |
37 | | // Received valid headers and request(s) but not end of the request stream. Response stream has |
38 | | // been closed. |
39 | | case requestOpenResponseClosed |
40 | | |
41 | | // The request stream is closed. Nothing has been sent in response. |
42 | | case requestClosedResponseIdle(RequestClosedResponseIdleState) |
43 | | |
44 | | // The request stream is closed. Response headers have been sent. |
45 | | case requestClosedResponseOpen(RequestClosedResponseOpenState) |
46 | | |
47 | | // Both streams are closed. This state is terminal. |
48 | | case requestClosedResponseClosed |
49 | | } |
50 | | |
51 | | struct RequestOpenResponseIdleState { |
52 | | /// A length prefixed message reader for request messages. |
53 | | var reader: LengthPrefixedMessageReader |
54 | | |
55 | | /// A length prefixed message writer for response messages. |
56 | | var writer: CoalescingLengthPrefixedMessageWriter |
57 | | |
58 | | /// The content type of the RPC. |
59 | | var contentType: ContentType |
60 | | |
61 | | /// An accept encoding header to send in the response headers indicating the message encoding |
62 | | /// that the server supports. |
63 | | var acceptEncoding: String? |
64 | | |
65 | | /// A message encoding header to send in the response headers indicating the encoding which will |
66 | | /// be used for responses. |
67 | | var responseEncoding: String? |
68 | | |
69 | | /// Whether to normalize user-provided metadata. |
70 | | var normalizeHeaders: Bool |
71 | | |
72 | | /// The pipeline configuration state. |
73 | | var configurationState: ConfigurationState |
74 | | } |
75 | | |
76 | | struct RequestClosedResponseIdleState { |
77 | | /// A length prefixed message reader for request messages. |
78 | | var reader: LengthPrefixedMessageReader |
79 | | |
80 | | /// A length prefixed message writer for response messages. |
81 | | var writer: CoalescingLengthPrefixedMessageWriter |
82 | | |
83 | | /// The content type of the RPC. |
84 | | var contentType: ContentType |
85 | | |
86 | | /// An accept encoding header to send in the response headers indicating the message encoding |
87 | | /// that the server supports. |
88 | | var acceptEncoding: String? |
89 | | |
90 | | /// A message encoding header to send in the response headers indicating the encoding which will |
91 | | /// be used for responses. |
92 | | var responseEncoding: String? |
93 | | |
94 | | /// Whether to normalize user-provided metadata. |
95 | | var normalizeHeaders: Bool |
96 | | |
97 | | /// The pipeline configuration state. |
98 | | var configurationState: ConfigurationState |
99 | | |
100 | 0 | init(from state: RequestOpenResponseIdleState) { |
101 | 0 | self.reader = state.reader |
102 | 0 | self.writer = state.writer |
103 | 0 | self.contentType = state.contentType |
104 | 0 | self.acceptEncoding = state.acceptEncoding |
105 | 0 | self.responseEncoding = state.responseEncoding |
106 | 0 | self.normalizeHeaders = state.normalizeHeaders |
107 | 0 | self.configurationState = state.configurationState |
108 | 0 | } |
109 | | } |
110 | | |
111 | | struct RequestOpenResponseOpenState { |
112 | | /// A length prefixed message reader for request messages. |
113 | | var reader: LengthPrefixedMessageReader |
114 | | |
115 | | /// A length prefixed message writer for response messages. |
116 | | var writer: CoalescingLengthPrefixedMessageWriter |
117 | | |
118 | | /// Whether to normalize user-provided metadata. |
119 | | var normalizeHeaders: Bool |
120 | | |
121 | 113k | init(from state: RequestOpenResponseIdleState) { |
122 | 113k | self.reader = state.reader |
123 | 113k | self.writer = state.writer |
124 | 113k | self.normalizeHeaders = state.normalizeHeaders |
125 | 113k | } |
126 | | } |
127 | | |
128 | | struct RequestClosedResponseOpenState { |
129 | | /// A length prefixed message reader for request messages. |
130 | | var reader: LengthPrefixedMessageReader |
131 | | |
132 | | /// A length prefixed message writer for response messages. |
133 | | var writer: CoalescingLengthPrefixedMessageWriter |
134 | | |
135 | | /// Whether to normalize user-provided metadata. |
136 | | var normalizeHeaders: Bool |
137 | | |
138 | 53.7k | init(from state: RequestOpenResponseOpenState) { |
139 | 53.7k | self.reader = state.reader |
140 | 53.7k | self.writer = state.writer |
141 | 53.7k | self.normalizeHeaders = state.normalizeHeaders |
142 | 53.7k | } |
143 | | |
144 | 0 | init(from state: RequestClosedResponseIdleState) { |
145 | 0 | self.reader = state.reader |
146 | 0 | self.writer = state.writer |
147 | 0 | self.normalizeHeaders = state.normalizeHeaders |
148 | 0 | } |
149 | | } |
150 | | |
151 | | /// The pipeline configuration state. |
152 | | enum ConfigurationState { |
153 | | /// The pipeline is being configured. Any message data will be buffered into an appropriate |
154 | | /// message reader. |
155 | | case configuring(HPACKHeaders) |
156 | | |
157 | | /// The pipeline is configured. |
158 | | case configured |
159 | | |
160 | | /// Returns true if the configuration is in the `.configured` state. |
161 | 0 | var isConfigured: Bool { |
162 | 0 | switch self { |
163 | 0 | case .configuring: |
164 | 0 | return false |
165 | 0 | case .configured: |
166 | 0 | return true |
167 | 0 | } |
168 | 0 | } |
169 | | |
170 | | /// Configuration has completed. |
171 | 113k | mutating func configured() -> HPACKHeaders { |
172 | 113k | switch self { |
173 | 113k | case .configured: |
174 | 0 | preconditionFailure("Invalid state: already configured") |
175 | 113k | |
176 | 113k | case let .configuring(headers): |
177 | 113k | self = .configured |
178 | 113k | return headers |
179 | 113k | } |
180 | 0 | } |
181 | | } |
182 | | } |
183 | | |
184 | | extension HTTP2ToRawGRPCStateMachine { |
185 | | enum PipelineConfiguredAction { |
186 | | /// Forward the given headers. |
187 | | case forwardHeaders(HPACKHeaders) |
188 | | /// Forward the given headers and try reading the next message. |
189 | | case forwardHeadersAndRead(HPACKHeaders) |
190 | | } |
191 | | |
192 | | enum ReceiveHeadersAction { |
193 | | /// Configure the RPC to use the given server handler. |
194 | | case configure(GRPCServerHandlerProtocol) |
195 | | /// Reject the RPC by writing out the given headers and setting end-stream. |
196 | | case rejectRPC(HPACKHeaders) |
197 | | } |
198 | | |
199 | | enum ReadNextMessageAction { |
200 | | /// Do nothing. |
201 | | case none |
202 | | /// Forward the buffer. |
203 | | case forwardMessage(ByteBuffer) |
204 | | /// Forward the buffer and try reading the next message. |
205 | | case forwardMessageThenReadNextMessage(ByteBuffer) |
206 | | /// Forward the 'end' of stream request part. |
207 | | case forwardEnd |
208 | | /// Throw an error down the pipeline. |
209 | | case errorCaught(Error) |
210 | | } |
211 | | |
212 | | struct StateAndReceiveHeadersAction { |
213 | | /// The next state. |
214 | | var state: State |
215 | | /// The action to take. |
216 | | var action: ReceiveHeadersAction |
217 | | } |
218 | | |
219 | | struct StateAndReceiveDataAction { |
220 | | /// The next state. |
221 | | var state: State |
222 | | /// The action to take |
223 | | var action: ReceiveDataAction |
224 | | } |
225 | | |
226 | | enum ReceiveDataAction: Hashable { |
227 | | /// Try to read the next message from the state machine. |
228 | | case tryReading |
229 | | /// Invoke 'finish' on the RPC handler. |
230 | | case finishHandler |
231 | | /// Do nothing. |
232 | | case nothing |
233 | | } |
234 | | |
235 | | enum SendEndAction { |
236 | | /// Send trailers to the client. |
237 | | case sendTrailers(HPACKHeaders) |
238 | | /// Send trailers to the client and invoke 'finish' on the RPC handler. |
239 | | case sendTrailersAndFinish(HPACKHeaders) |
240 | | /// Fail any promise associated with this send. |
241 | | case failure(Error) |
242 | | } |
243 | | } |
244 | | |
245 | | // MARK: Receive Headers |
246 | | |
247 | | // This is the only state in which we can receive headers. |
248 | | extension HTTP2ToRawGRPCStateMachine.State { |
249 | | private func _receive( |
250 | | headers: HPACKHeaders, |
251 | | eventLoop: EventLoop, |
252 | | errorDelegate: ServerErrorDelegate?, |
253 | | remoteAddress: SocketAddress?, |
254 | | logger: Logger, |
255 | | allocator: ByteBufferAllocator, |
256 | | responseWriter: GRPCServerResponseWriter, |
257 | | closeFuture: EventLoopFuture<Void>, |
258 | | services: [Substring: CallHandlerProvider], |
259 | | encoding: ServerMessageEncoding, |
260 | | normalizeHeaders: Bool |
261 | 413k | ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction { |
262 | 413k | // Extract and validate the content type. If it's nil we need to close. |
263 | 413k | guard let contentType = self.extractContentType(from: headers) else { |
264 | 231k | return self.unsupportedContentType() |
265 | 231k | } |
266 | 181k | |
267 | 181k | // Now extract the request message encoding and setup an appropriate message reader. |
268 | 181k | // We may send back a list of acceptable request message encodings as well. |
269 | 181k | let reader: LengthPrefixedMessageReader |
270 | 181k | let acceptableRequestEncoding: String? |
271 | 181k | |
272 | 181k | switch self.extractRequestEncoding(from: headers, encoding: encoding) { |
273 | 181k | case let .valid(messageReader, acceptEncodingHeader): |
274 | 177k | reader = messageReader |
275 | 177k | acceptableRequestEncoding = acceptEncodingHeader |
276 | 181k | |
277 | 181k | case let .invalid(status, acceptableRequestEncoding): |
278 | 3.62k | return self.invalidRequestEncoding( |
279 | 3.62k | status: status, |
280 | 3.62k | acceptableRequestEncoding: acceptableRequestEncoding, |
281 | 3.62k | contentType: contentType |
282 | 3.62k | ) |
283 | 181k | } |
284 | 177k | |
285 | 177k | // Figure out which encoding we should use for responses. |
286 | 177k | let (writer, responseEncoding) = self.extractResponseEncoding( |
287 | 177k | from: headers, |
288 | 177k | encoding: encoding, |
289 | 177k | allocator: allocator |
290 | 177k | ) |
291 | 177k | |
292 | 177k | // Parse the path, and create a call handler. |
293 | 177k | guard let path = headers.first(name: ":path") else { |
294 | 0 | return self.methodNotImplemented("", contentType: contentType) |
295 | 177k | } |
296 | 177k | |
297 | 177k | guard let callPath = CallPath(requestURI: path), |
298 | 177k | let service = services[Substring(callPath.service)] |
299 | 177k | else { |
300 | 17.4k | return self.methodNotImplemented(path, contentType: contentType) |
301 | 160k | } |
302 | 160k | |
303 | 160k | // Create a call handler context, i.e. a bunch of 'stuff' we need to create the handler with, |
304 | 160k | // some of which is exposed to service providers. |
305 | 160k | let context = CallHandlerContext( |
306 | 160k | errorDelegate: errorDelegate, |
307 | 160k | logger: logger, |
308 | 160k | encoding: encoding, |
309 | 160k | eventLoop: eventLoop, |
310 | 160k | path: path, |
311 | 160k | remoteAddress: remoteAddress, |
312 | 160k | responseWriter: responseWriter, |
313 | 160k | allocator: allocator, |
314 | 160k | closeFuture: closeFuture |
315 | 160k | ) |
316 | 160k | |
317 | 160k | // We have a matching service, hopefully we have a provider for the method too. |
318 | 160k | let method = Substring(callPath.method) |
319 | 160k | |
320 | 160k | if let handler = service.handle(method: method, context: context) { |
321 | 158k | let nextState = HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState( |
322 | 158k | reader: reader, |
323 | 158k | writer: writer, |
324 | 158k | contentType: contentType, |
325 | 158k | acceptEncoding: acceptableRequestEncoding, |
326 | 158k | responseEncoding: responseEncoding, |
327 | 158k | normalizeHeaders: normalizeHeaders, |
328 | 158k | configurationState: .configuring(headers) |
329 | 158k | ) |
330 | 158k | |
331 | 158k | return .init( |
332 | 158k | state: .requestOpenResponseIdle(nextState), |
333 | 158k | action: .configure(handler) |
334 | 158k | ) |
335 | 158k | } else { |
336 | 1.95k | return self.methodNotImplemented(path, contentType: contentType) |
337 | 1.95k | } |
338 | 160k | } |
339 | | |
340 | | /// The 'content-type' is not supported; close with status code 415. |
341 | 231k | private func unsupportedContentType() -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction { |
342 | 231k | // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md |
343 | 231k | // |
344 | 231k | // If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond |
345 | 231k | // with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2 |
346 | 231k | // clients from interpreting a gRPC error response, which uses status 200 (OK), as |
347 | 231k | // successful. |
348 | 231k | let trailers = HPACKHeaders([(":status", "415")]) |
349 | 231k | return .init( |
350 | 231k | state: .requestClosedResponseClosed, |
351 | 231k | action: .rejectRPC(trailers) |
352 | 231k | ) |
353 | 231k | } |
354 | | |
355 | | /// The RPC method is not implemented. Close with an appropriate status. |
356 | | private func methodNotImplemented( |
357 | | _ path: String, |
358 | | contentType: ContentType |
359 | 19.4k | ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction { |
360 | 19.4k | let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly( |
361 | 19.4k | for: GRPCStatus(code: .unimplemented, message: "'\(path)' is not implemented"), |
362 | 19.4k | contentType: contentType, |
363 | 19.4k | acceptableRequestEncoding: nil, |
364 | 19.4k | userProvidedHeaders: nil, |
365 | 19.4k | normalizeUserProvidedHeaders: false |
366 | 19.4k | ) |
367 | 19.4k | |
368 | 19.4k | return .init( |
369 | 19.4k | state: .requestClosedResponseClosed, |
370 | 19.4k | action: .rejectRPC(trailers) |
371 | 19.4k | ) |
372 | 19.4k | } |
373 | | |
374 | | /// The request encoding specified by the client is not supported. Close with an appropriate |
375 | | /// status. |
376 | | private func invalidRequestEncoding( |
377 | | status: GRPCStatus, |
378 | | acceptableRequestEncoding: String?, |
379 | | contentType: ContentType |
380 | 3.62k | ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction { |
381 | 3.62k | let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly( |
382 | 3.62k | for: status, |
383 | 3.62k | contentType: contentType, |
384 | 3.62k | acceptableRequestEncoding: acceptableRequestEncoding, |
385 | 3.62k | userProvidedHeaders: nil, |
386 | 3.62k | normalizeUserProvidedHeaders: false |
387 | 3.62k | ) |
388 | 3.62k | |
389 | 3.62k | return .init( |
390 | 3.62k | state: .requestClosedResponseClosed, |
391 | 3.62k | action: .rejectRPC(trailers) |
392 | 3.62k | ) |
393 | 3.62k | } |
394 | | |
395 | | /// Makes a 'GRPCStatus' and response trailers suitable for returning to the client when the |
396 | | /// request message encoding is not supported. |
397 | | /// |
398 | | /// - Parameters: |
399 | | /// - encoding: The unsupported request message encoding sent by the client. |
400 | | /// - acceptable: The list if acceptable request message encoding the client may use. |
401 | | /// - Returns: The status and trailers to return to the client. |
402 | | private func makeStatusAndTrailersForUnsupportedEncoding( |
403 | | _ encoding: String, |
404 | | advertisedEncoding: [String] |
405 | 1.56k | ) -> (GRPCStatus, acceptEncoding: String?) { |
406 | 1.56k | let status: GRPCStatus |
407 | 1.56k | let acceptEncoding: String? |
408 | 1.56k | |
409 | 1.56k | if advertisedEncoding.isEmpty { |
410 | 1.56k | // No compression is supported; there's nothing to tell the client about. |
411 | 1.56k | status = GRPCStatus(code: .unimplemented, message: "compression is not supported") |
412 | 1.56k | acceptEncoding = nil |
413 | 1.56k | } else { |
414 | 0 | // Return a list of supported encodings which we advertise. (The list we advertise may be a |
415 | 0 | // subset of the encodings we support.) |
416 | 0 | acceptEncoding = advertisedEncoding.joined(separator: ",") |
417 | 0 | status = GRPCStatus( |
418 | 0 | code: .unimplemented, |
419 | 0 | message: "\(encoding) compression is not supported, supported algorithms are " |
420 | 0 | + "listed in '\(GRPCHeaderName.acceptEncoding)'" |
421 | 0 | ) |
422 | 0 | } |
423 | 1.56k | |
424 | 1.56k | return (status, acceptEncoding) |
425 | 1.56k | } |
426 | | |
427 | | /// Extract and validate the 'content-type' sent by the client. |
428 | | /// - Parameter headers: The headers to extract the 'content-type' from |
429 | 413k | private func extractContentType(from headers: HPACKHeaders) -> ContentType? { |
430 | 413k | return headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) |
431 | 413k | } |
432 | | |
433 | | /// The result of validating the request encoding header. |
434 | | private enum RequestEncodingValidation { |
435 | | /// The encoding was valid. |
436 | | case valid(messageReader: LengthPrefixedMessageReader, acceptEncoding: String?) |
437 | | /// The encoding was invalid, the RPC should be terminated with this status. |
438 | | case invalid(status: GRPCStatus, acceptEncoding: String?) |
439 | | } |
440 | | |
441 | | /// Extract and validate the request message encoding header. |
442 | | /// - Parameters: |
443 | | /// - headers: The headers to extract the message encoding header from. |
444 | | /// - Returns: `RequestEncodingValidation`, either a message reader suitable for decoding requests |
445 | | /// and an accept encoding response header if the request encoding was valid, or a pair of |
446 | | /// `GRPCStatus` and trailers to close the RPC with. |
447 | | private func extractRequestEncoding( |
448 | | from headers: HPACKHeaders, |
449 | | encoding: ServerMessageEncoding |
450 | 181k | ) -> RequestEncodingValidation { |
451 | 181k | let encodingValues = headers.values(forHeader: GRPCHeaderName.encoding, canonicalForm: true) |
452 | 181k | var encodingIterator = encodingValues.makeIterator() |
453 | 181k | let encodingHeader = encodingIterator.next() |
454 | 181k | |
455 | 181k | // Fail if there's more than one encoding header. |
456 | 181k | if let first = encodingHeader, let second = encodingIterator.next() { |
457 | 2.05k | var encodings: [Substring] = [] |
458 | 2.05k | encodings.reserveCapacity(8) |
459 | 2.05k | encodings.append(first) |
460 | 2.05k | encodings.append(second) |
461 | 743k | while let next = encodingIterator.next() { |
462 | 743k | encodings.append(next) |
463 | 743k | } |
464 | 2.05k | let status = GRPCStatus( |
465 | 2.05k | code: .invalidArgument, |
466 | 2.05k | message: |
467 | 2.05k | "'\(GRPCHeaderName.encoding)' must contain no more than one value but was '\(encodings.joined(separator: ", "))'" |
468 | 2.05k | ) |
469 | 2.05k | return .invalid(status: status, acceptEncoding: nil) |
470 | 179k | } |
471 | 179k | |
472 | 179k | let result: RequestEncodingValidation |
473 | 179k | let validator = MessageEncodingHeaderValidator(encoding: encoding) |
474 | 179k | |
475 | 179k | switch validator.validate(requestEncoding: encodingHeader.map { String($0) }) { |
476 | 179k | case let .supported(algorithm, decompressionLimit, acceptEncoding): |
477 | 0 | // Request message encoding is valid and supported. |
478 | 0 | result = .valid( |
479 | 0 | messageReader: LengthPrefixedMessageReader( |
480 | 0 | compression: algorithm, |
481 | 0 | decompressionLimit: decompressionLimit |
482 | 0 | ), |
483 | 0 | acceptEncoding: acceptEncoding.isEmpty ? nil : acceptEncoding.joined(separator: ",") |
484 | 0 | ) |
485 | 179k | |
486 | 179k | case .noCompression: |
487 | 177k | // No message encoding header was present. This means no compression is being used. |
488 | 177k | result = .valid( |
489 | 177k | messageReader: LengthPrefixedMessageReader(), |
490 | 177k | acceptEncoding: nil |
491 | 177k | ) |
492 | 179k | |
493 | 179k | case let .unsupported(encoding, acceptable): |
494 | 1.56k | // Request encoding is not supported. |
495 | 1.56k | let (status, acceptEncoding) = self.makeStatusAndTrailersForUnsupportedEncoding( |
496 | 1.56k | encoding, |
497 | 1.56k | advertisedEncoding: acceptable |
498 | 1.56k | ) |
499 | 1.56k | result = .invalid(status: status, acceptEncoding: acceptEncoding) |
500 | 179k | } |
501 | 179k | |
502 | 179k | return result |
503 | 181k | } |
504 | | |
505 | | /// Extract a suitable message encoding for responses. |
506 | | /// - Parameters: |
507 | | /// - headers: The headers to extract the acceptable response message encoding from. |
508 | | /// - configuration: The encoding configuration for the server. |
509 | | /// - Returns: A message writer and the response encoding header to send back to the client. |
510 | | private func extractResponseEncoding( |
511 | | from headers: HPACKHeaders, |
512 | | encoding: ServerMessageEncoding, |
513 | | allocator: ByteBufferAllocator |
514 | 177k | ) -> (CoalescingLengthPrefixedMessageWriter, String?) { |
515 | 177k | let writer: CoalescingLengthPrefixedMessageWriter |
516 | 177k | let responseEncoding: String? |
517 | 177k | |
518 | 177k | switch encoding { |
519 | 177k | case let .enabled(configuration): |
520 | 0 | // Extract the encodings acceptable to the client for response messages. |
521 | 0 | let acceptableResponseEncoding = headers[canonicalForm: GRPCHeaderName.acceptEncoding] |
522 | 0 |
|
523 | 0 | // Select the first algorithm that we support and have enabled. If we don't find one then we |
524 | 0 | // won't compress response messages. |
525 | 0 | let algorithm = acceptableResponseEncoding.lazy.compactMap { value in |
526 | 0 | CompressionAlgorithm(rawValue: value) |
527 | 0 | }.first { |
528 | 0 | configuration.enabledAlgorithms.contains($0) |
529 | 0 | } |
530 | 0 |
|
531 | 0 | writer = .init(compression: algorithm, allocator: allocator) |
532 | 0 | responseEncoding = algorithm?.name |
533 | 177k | |
534 | 177k | case .disabled: |
535 | 177k | // The server doesn't have compression enabled. |
536 | 177k | writer = .init(compression: .none, allocator: allocator) |
537 | 177k | responseEncoding = nil |
538 | 177k | } |
539 | 177k | |
540 | 177k | return (writer, responseEncoding) |
541 | 177k | } |
542 | | } |
543 | | |
544 | | // MARK: - Receive Data |
545 | | |
546 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState { |
547 | | mutating func receive( |
548 | | buffer: inout ByteBuffer, |
549 | | endStream: Bool |
550 | 0 | ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction { |
551 | 0 | // Append the bytes to the reader. |
552 | 0 | self.reader.append(buffer: &buffer) |
553 | 0 |
|
554 | 0 | let state: HTTP2ToRawGRPCStateMachine.State |
555 | 0 | let action: HTTP2ToRawGRPCStateMachine.ReceiveDataAction |
556 | 0 |
|
557 | 0 | switch (self.configurationState.isConfigured, endStream) { |
558 | 0 | case (true, true): |
559 | 0 | /// Configured and end stream: read from the buffer, end will be sent as a result of draining |
560 | 0 | /// the reader in the next state. |
561 | 0 | state = .requestClosedResponseIdle(.init(from: self)) |
562 | 0 | action = .tryReading |
563 | 0 |
|
564 | 0 | case (true, false): |
565 | 0 | /// Configured but not end stream, just read from the buffer. |
566 | 0 | state = .requestOpenResponseIdle(self) |
567 | 0 | action = .tryReading |
568 | 0 |
|
569 | 0 | case (false, true): |
570 | 0 | // Not configured yet, but end of stream. Request stream is now closed but there's no point |
571 | 0 | // reading yet. |
572 | 0 | state = .requestClosedResponseIdle(.init(from: self)) |
573 | 0 | action = .nothing |
574 | 0 |
|
575 | 0 | case (false, false): |
576 | 0 | // Not configured yet, not end stream. No point reading a message yet since we don't have |
577 | 0 | // anywhere to deliver it. |
578 | 0 | state = .requestOpenResponseIdle(self) |
579 | 0 | action = .nothing |
580 | 0 | } |
581 | 0 |
|
582 | 0 | return .init(state: state, action: action) |
583 | 0 | } |
584 | | } |
585 | | |
586 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState { |
587 | | mutating func receive( |
588 | | buffer: inout ByteBuffer, |
589 | | endStream: Bool |
590 | 139k | ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction { |
591 | 139k | self.reader.append(buffer: &buffer) |
592 | 139k | |
593 | 139k | let state: HTTP2ToRawGRPCStateMachine.State |
594 | 139k | |
595 | 139k | if endStream { |
596 | 53.7k | // End stream, so move to the closed state. Any end of request stream events events will |
597 | 53.7k | // happen as a result of reading from the closed state. |
598 | 53.7k | state = .requestClosedResponseOpen(.init(from: self)) |
599 | 85.9k | } else { |
600 | 85.9k | state = .requestOpenResponseOpen(self) |
601 | 85.9k | } |
602 | 139k | |
603 | 139k | return .init(state: state, action: .tryReading) |
604 | 139k | } |
605 | | } |
606 | | |
607 | | // MARK: - Send Headers |
608 | | |
609 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState { |
610 | 113k | func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders { |
611 | 113k | return HTTP2ToRawGRPCStateMachine.makeResponseHeaders( |
612 | 113k | contentType: self.contentType, |
613 | 113k | responseEncoding: self.responseEncoding, |
614 | 113k | acceptableRequestEncoding: self.acceptEncoding, |
615 | 113k | userProvidedHeaders: userProvidedHeaders, |
616 | 113k | normalizeUserProvidedHeaders: self.normalizeHeaders |
617 | 113k | ) |
618 | 113k | } |
619 | | } |
620 | | |
621 | | extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState { |
622 | 0 | func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders { |
623 | 0 | return HTTP2ToRawGRPCStateMachine.makeResponseHeaders( |
624 | 0 | contentType: self.contentType, |
625 | 0 | responseEncoding: self.responseEncoding, |
626 | 0 | acceptableRequestEncoding: self.acceptEncoding, |
627 | 0 | userProvidedHeaders: userProvidedHeaders, |
628 | 0 | normalizeUserProvidedHeaders: self.normalizeHeaders |
629 | 0 | ) |
630 | 0 | } |
631 | | } |
632 | | |
633 | | // MARK: - Send Data |
634 | | |
635 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState { |
636 | | mutating func send( |
637 | | buffer: ByteBuffer, |
638 | | compress: Bool, |
639 | | promise: EventLoopPromise<Void>? |
640 | 1.22M | ) { |
641 | 1.22M | self.writer.append(buffer: buffer, compress: compress, promise: promise) |
642 | 1.22M | } |
643 | | } |
644 | | |
645 | | extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState { |
646 | | mutating func send( |
647 | | buffer: ByteBuffer, |
648 | | compress: Bool, |
649 | | promise: EventLoopPromise<Void>? |
650 | 36.6k | ) { |
651 | 36.6k | self.writer.append(buffer: buffer, compress: compress, promise: promise) |
652 | 36.6k | } |
653 | | } |
654 | | |
655 | | // MARK: - Send End |
656 | | |
657 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState { |
658 | | func send( |
659 | | status: GRPCStatus, |
660 | | trailers userProvidedTrailers: HPACKHeaders |
661 | 0 | ) -> HPACKHeaders { |
662 | 0 | return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly( |
663 | 0 | for: status, |
664 | 0 | contentType: self.contentType, |
665 | 0 | acceptableRequestEncoding: self.acceptEncoding, |
666 | 0 | userProvidedHeaders: userProvidedTrailers, |
667 | 0 | normalizeUserProvidedHeaders: self.normalizeHeaders |
668 | 0 | ) |
669 | 0 | } |
670 | | } |
671 | | |
672 | | extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState { |
673 | | func send( |
674 | | status: GRPCStatus, |
675 | | trailers userProvidedTrailers: HPACKHeaders |
676 | 0 | ) -> HPACKHeaders { |
677 | 0 | return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly( |
678 | 0 | for: status, |
679 | 0 | contentType: self.contentType, |
680 | 0 | acceptableRequestEncoding: self.acceptEncoding, |
681 | 0 | userProvidedHeaders: userProvidedTrailers, |
682 | 0 | normalizeUserProvidedHeaders: self.normalizeHeaders |
683 | 0 | ) |
684 | 0 | } |
685 | | } |
686 | | |
687 | | extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState { |
688 | | func send( |
689 | | status: GRPCStatus, |
690 | | trailers userProvidedTrailers: HPACKHeaders |
691 | 53.7k | ) -> HPACKHeaders { |
692 | 53.7k | return HTTP2ToRawGRPCStateMachine.makeResponseTrailers( |
693 | 53.7k | for: status, |
694 | 53.7k | userProvidedHeaders: userProvidedTrailers, |
695 | 53.7k | normalizeUserProvidedHeaders: true |
696 | 53.7k | ) |
697 | 53.7k | } |
698 | | } |
699 | | |
700 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState { |
701 | | func send( |
702 | | status: GRPCStatus, |
703 | | trailers userProvidedTrailers: HPACKHeaders |
704 | 59.7k | ) -> HPACKHeaders { |
705 | 59.7k | return HTTP2ToRawGRPCStateMachine.makeResponseTrailers( |
706 | 59.7k | for: status, |
707 | 59.7k | userProvidedHeaders: userProvidedTrailers, |
708 | 59.7k | normalizeUserProvidedHeaders: true |
709 | 59.7k | ) |
710 | 59.7k | } |
711 | | } |
712 | | |
713 | | // MARK: - Pipeline Configured |
714 | | |
715 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState { |
716 | 113k | mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction { |
717 | 113k | let headers = self.configurationState.configured() |
718 | 113k | let action: HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction |
719 | 113k | |
720 | 113k | // If there are unprocessed bytes then we need to read messages as well. |
721 | 113k | let hasUnprocessedBytes = self.reader.unprocessedBytes != 0 |
722 | 113k | |
723 | 113k | if hasUnprocessedBytes { |
724 | 0 | // If there are unprocessed bytes, we need to try to read after sending the metadata. |
725 | 0 | action = .forwardHeadersAndRead(headers) |
726 | 113k | } else { |
727 | 113k | // No unprocessed bytes; the reader is empty. Just send the metadata. |
728 | 113k | action = .forwardHeaders(headers) |
729 | 113k | } |
730 | 113k | |
731 | 113k | return action |
732 | 113k | } |
733 | | } |
734 | | |
735 | | extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState { |
736 | 0 | mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction { |
737 | 0 | let headers = self.configurationState.configured() |
738 | 0 | // Since we're already closed, we need to forward the headers and start reading. |
739 | 0 | return .forwardHeadersAndRead(headers) |
740 | 0 | } |
741 | | } |
742 | | |
743 | | // MARK: - Read Next Request |
744 | | |
745 | | extension HTTP2ToRawGRPCStateMachine { |
746 | | static func read( |
747 | | from reader: inout LengthPrefixedMessageReader, |
748 | | requestStreamClosed: Bool, |
749 | | maxLength: Int |
750 | 1.31M | ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction { |
751 | 1.31M | do { |
752 | 1.31M | if let buffer = try reader.nextMessage(maxLength: maxLength) { |
753 | 1.24M | if reader.unprocessedBytes > 0 || requestStreamClosed { |
754 | 1.21M | // Either there are unprocessed bytes or the request stream is now closed: deliver the |
755 | 1.21M | // message and then try to read. The subsequent read may be another message or it may |
756 | 1.21M | // be end stream. |
757 | 1.21M | return .forwardMessageThenReadNextMessage(buffer) |
758 | 1.21M | } else { |
759 | 24.7k | // Nothing left to process and the stream isn't closed yet, just forward the message. |
760 | 24.7k | return .forwardMessage(buffer) |
761 | 24.7k | } |
762 | 1.24M | } else if requestStreamClosed { |
763 | 53.7k | return .forwardEnd |
764 | 53.7k | } else { |
765 | 19.5k | return .none |
766 | 19.5k | } |
767 | 1.31M | } catch { |
768 | 3.70k | return .errorCaught(error) |
769 | 3.70k | } |
770 | 1.31M | } |
771 | | } |
772 | | |
773 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState { |
774 | | mutating func readNextRequest( |
775 | | maxLength: Int |
776 | 0 | ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction { |
777 | 0 | return HTTP2ToRawGRPCStateMachine.read( |
778 | 0 | from: &self.reader, |
779 | 0 | requestStreamClosed: false, |
780 | 0 | maxLength: maxLength |
781 | 0 | ) |
782 | 0 | } |
783 | | } |
784 | | |
785 | | extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState { |
786 | | mutating func readNextRequest( |
787 | | maxLength: Int |
788 | 1.26M | ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction { |
789 | 1.26M | return HTTP2ToRawGRPCStateMachine.read( |
790 | 1.26M | from: &self.reader, |
791 | 1.26M | requestStreamClosed: false, |
792 | 1.26M | maxLength: maxLength |
793 | 1.26M | ) |
794 | 1.26M | } |
795 | | } |
796 | | |
797 | | extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState { |
798 | | mutating func readNextRequest( |
799 | | maxLength: Int |
800 | 0 | ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction { |
801 | 0 | return HTTP2ToRawGRPCStateMachine.read( |
802 | 0 | from: &self.reader, |
803 | 0 | requestStreamClosed: true, |
804 | 0 | maxLength: maxLength |
805 | 0 | ) |
806 | 0 | } |
807 | | } |
808 | | |
809 | | extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState { |
810 | | mutating func readNextRequest( |
811 | | maxLength: Int |
812 | 53.7k | ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction { |
813 | 53.7k | return HTTP2ToRawGRPCStateMachine.read( |
814 | 53.7k | from: &self.reader, |
815 | 53.7k | requestStreamClosed: true, |
816 | 53.7k | maxLength: maxLength |
817 | 53.7k | ) |
818 | 53.7k | } |
819 | | } |
820 | | |
821 | | // MARK: - Top Level State Changes |
822 | | |
823 | | extension HTTP2ToRawGRPCStateMachine { |
824 | | /// Receive request headers. |
825 | | mutating func receive( |
826 | | headers: HPACKHeaders, |
827 | | eventLoop: EventLoop, |
828 | | errorDelegate: ServerErrorDelegate?, |
829 | | remoteAddress: SocketAddress?, |
830 | | logger: Logger, |
831 | | allocator: ByteBufferAllocator, |
832 | | responseWriter: GRPCServerResponseWriter, |
833 | | closeFuture: EventLoopFuture<Void>, |
834 | | services: [Substring: CallHandlerProvider], |
835 | | encoding: ServerMessageEncoding, |
836 | | normalizeHeaders: Bool |
837 | 283k | ) -> ReceiveHeadersAction { |
838 | 283k | return self.state.receive( |
839 | 283k | headers: headers, |
840 | 283k | eventLoop: eventLoop, |
841 | 283k | errorDelegate: errorDelegate, |
842 | 283k | remoteAddress: remoteAddress, |
843 | 283k | logger: logger, |
844 | 283k | allocator: allocator, |
845 | 283k | responseWriter: responseWriter, |
846 | 283k | closeFuture: closeFuture, |
847 | 283k | services: services, |
848 | 283k | encoding: encoding, |
849 | 283k | normalizeHeaders: normalizeHeaders |
850 | 283k | ) |
851 | 283k | } |
852 | | |
853 | | /// Receive request buffer. |
854 | | /// - Parameters: |
855 | | /// - buffer: The received buffer. |
856 | | /// - endStream: Whether end stream was set. |
857 | | /// - Returns: Returns whether the caller should try to read a message from the buffer. |
858 | 1.21M | mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> ReceiveDataAction { |
859 | 1.21M | self.state.receive(buffer: &buffer, endStream: endStream) |
860 | 1.21M | } |
861 | | |
862 | | /// Send response headers. |
863 | 113k | mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> { |
864 | 113k | self.state.send(headers: headers) |
865 | 113k | } |
866 | | |
867 | | /// Send a response buffer. |
868 | | mutating func send( |
869 | | buffer: ByteBuffer, |
870 | | compress: Bool, |
871 | | promise: EventLoopPromise<Void>? |
872 | 1.26M | ) -> Result<Void, Error> { |
873 | 1.26M | self.state.send(buffer: buffer, compress: compress, promise: promise) |
874 | 1.26M | } |
875 | | |
876 | 181k | mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? { |
877 | 181k | self.state.nextResponse() |
878 | 181k | } |
879 | | |
880 | | /// Send status and trailers. |
881 | | mutating func send( |
882 | | status: GRPCStatus, |
883 | | trailers: HPACKHeaders |
884 | 113k | ) -> HTTP2ToRawGRPCStateMachine.SendEndAction { |
885 | 113k | self.state.send(status: status, trailers: trailers) |
886 | 113k | } |
887 | | |
888 | | /// The pipeline has been configured with a service provider. |
889 | 113k | mutating func pipelineConfigured() -> PipelineConfiguredAction { |
890 | 113k | self.state.pipelineConfigured() |
891 | 113k | } |
892 | | |
893 | | /// Try to read a request message. |
894 | 1.35M | mutating func readNextRequest(maxLength: Int) -> ReadNextMessageAction { |
895 | 1.35M | self.state.readNextRequest(maxLength: maxLength) |
896 | 1.35M | } |
897 | | } |
898 | | |
899 | | extension HTTP2ToRawGRPCStateMachine.State { |
900 | 113k | mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction { |
901 | 113k | switch self { |
902 | 113k | case .requestIdleResponseIdle: |
903 | 0 | preconditionFailure("Invalid state: pipeline configured before receiving request headers") |
904 | 113k | |
905 | 113k | case var .requestOpenResponseIdle(state): |
906 | 113k | let action = state.pipelineConfigured() |
907 | 113k | self = .requestOpenResponseIdle(state) |
908 | 113k | return action |
909 | 113k | |
910 | 113k | case var .requestClosedResponseIdle(state): |
911 | 0 | let action = state.pipelineConfigured() |
912 | 0 | self = .requestClosedResponseIdle(state) |
913 | 0 | return action |
914 | 113k | |
915 | 113k | case .requestOpenResponseOpen, |
916 | 0 | .requestOpenResponseClosed, |
917 | 0 | .requestClosedResponseOpen, |
918 | 0 | .requestClosedResponseClosed: |
919 | 0 | preconditionFailure("Invalid state: response stream opened before pipeline was configured") |
920 | 113k | } |
921 | 0 | } |
922 | | |
923 | | mutating func receive( |
924 | | headers: HPACKHeaders, |
925 | | eventLoop: EventLoop, |
926 | | errorDelegate: ServerErrorDelegate?, |
927 | | remoteAddress: SocketAddress?, |
928 | | logger: Logger, |
929 | | allocator: ByteBufferAllocator, |
930 | | responseWriter: GRPCServerResponseWriter, |
931 | | closeFuture: EventLoopFuture<Void>, |
932 | | services: [Substring: CallHandlerProvider], |
933 | | encoding: ServerMessageEncoding, |
934 | | normalizeHeaders: Bool |
935 | 283k | ) -> HTTP2ToRawGRPCStateMachine.ReceiveHeadersAction { |
936 | 283k | switch self { |
937 | 283k | // These are the only states in which we can receive headers. Everything else is invalid. |
938 | 283k | case .requestIdleResponseIdle, |
939 | 283k | .requestClosedResponseClosed: |
940 | 283k | let stateAndAction = self._receive( |
941 | 283k | headers: headers, |
942 | 283k | eventLoop: eventLoop, |
943 | 283k | errorDelegate: errorDelegate, |
944 | 283k | remoteAddress: remoteAddress, |
945 | 283k | logger: logger, |
946 | 283k | allocator: allocator, |
947 | 283k | responseWriter: responseWriter, |
948 | 283k | closeFuture: closeFuture, |
949 | 283k | services: services, |
950 | 283k | encoding: encoding, |
951 | 283k | normalizeHeaders: normalizeHeaders |
952 | 283k | ) |
953 | 283k | self = stateAndAction.state |
954 | 283k | return stateAndAction.action |
955 | 283k | |
956 | 283k | // We can't receive headers in any of these states. |
957 | 283k | case .requestOpenResponseIdle, |
958 | 0 | .requestOpenResponseOpen, |
959 | 0 | .requestOpenResponseClosed, |
960 | 0 | .requestClosedResponseIdle, |
961 | 0 | .requestClosedResponseOpen: |
962 | 0 | preconditionFailure("Invalid state: \(self)") |
963 | 283k | } |
964 | 0 | } |
965 | | |
966 | | /// Receive a buffer from the client. |
967 | | mutating func receive( |
968 | | buffer: inout ByteBuffer, |
969 | | endStream: Bool |
970 | 1.21M | ) -> HTTP2ToRawGRPCStateMachine.ReceiveDataAction { |
971 | 1.21M | switch self { |
972 | 1.21M | case .requestIdleResponseIdle: |
973 | 0 | /// This isn't allowed: we must receive the request headers first. |
974 | 0 | preconditionFailure("Invalid state") |
975 | 1.21M | |
976 | 1.21M | case var .requestOpenResponseIdle(state): |
977 | 0 | let stateAndAction = state.receive(buffer: &buffer, endStream: endStream) |
978 | 0 | self = stateAndAction.state |
979 | 0 | return stateAndAction.action |
980 | 1.21M | |
981 | 1.21M | case var .requestOpenResponseOpen(state): |
982 | 139k | let stateAndAction = state.receive(buffer: &buffer, endStream: endStream) |
983 | 139k | self = stateAndAction.state |
984 | 139k | return stateAndAction.action |
985 | 1.21M | |
986 | 1.21M | case .requestClosedResponseIdle, |
987 | 0 | .requestClosedResponseOpen: |
988 | 0 | preconditionFailure("Invalid state: the request stream is already closed") |
989 | 1.21M | |
990 | 1.21M | case .requestOpenResponseClosed: |
991 | 58.3k | if endStream { |
992 | 58.1k | // Server has finish responding and this is the end of the request stream; we're done for |
993 | 58.1k | // this RPC now, finish the handler. |
994 | 58.1k | self = .requestClosedResponseClosed |
995 | 58.1k | return .finishHandler |
996 | 58.1k | } else { |
997 | 194 | // Server has finished responding but this isn't the end of the request stream; ignore the |
998 | 194 | // input, we need to wait for end stream before tearing down the handler. |
999 | 194 | return .nothing |
1000 | 194 | } |
1001 | 1.21M | |
1002 | 1.21M | case .requestClosedResponseClosed: |
1003 | 1.02M | return .nothing |
1004 | 1.21M | } |
1005 | 0 | } |
1006 | | |
1007 | | mutating func readNextRequest( |
1008 | | maxLength: Int |
1009 | 1.35M | ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction { |
1010 | 1.35M | switch self { |
1011 | 1.35M | case .requestIdleResponseIdle: |
1012 | 0 | preconditionFailure("Invalid state") |
1013 | 1.35M | |
1014 | 1.35M | case var .requestOpenResponseIdle(state): |
1015 | 0 | let action = state.readNextRequest(maxLength: maxLength) |
1016 | 0 | self = .requestOpenResponseIdle(state) |
1017 | 0 | return action |
1018 | 1.35M | |
1019 | 1.35M | case var .requestOpenResponseOpen(state): |
1020 | 1.26M | let action = state.readNextRequest(maxLength: maxLength) |
1021 | 1.26M | self = .requestOpenResponseOpen(state) |
1022 | 1.26M | return action |
1023 | 1.35M | |
1024 | 1.35M | case var .requestClosedResponseIdle(state): |
1025 | 0 | let action = state.readNextRequest(maxLength: maxLength) |
1026 | 0 | self = .requestClosedResponseIdle(state) |
1027 | 0 | return action |
1028 | 1.35M | |
1029 | 1.35M | case var .requestClosedResponseOpen(state): |
1030 | 53.7k | let action = state.readNextRequest(maxLength: maxLength) |
1031 | 53.7k | self = .requestClosedResponseOpen(state) |
1032 | 53.7k | return action |
1033 | 1.35M | |
1034 | 1.35M | case .requestOpenResponseClosed, |
1035 | 37.9k | .requestClosedResponseClosed: |
1036 | 37.9k | return .none |
1037 | 1.35M | } |
1038 | 0 | } |
1039 | | |
1040 | 113k | mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> { |
1041 | 113k | switch self { |
1042 | 113k | case .requestIdleResponseIdle: |
1043 | 0 | preconditionFailure("Invalid state: the request stream isn't open") |
1044 | 113k | |
1045 | 113k | case let .requestOpenResponseIdle(state): |
1046 | 113k | let headers = state.send(headers: headers) |
1047 | 113k | self = .requestOpenResponseOpen(.init(from: state)) |
1048 | 113k | return .success(headers) |
1049 | 113k | |
1050 | 113k | case let .requestClosedResponseIdle(state): |
1051 | 0 | let headers = state.send(headers: headers) |
1052 | 0 | self = .requestClosedResponseOpen(.init(from: state)) |
1053 | 0 | return .success(headers) |
1054 | 113k | |
1055 | 113k | case .requestOpenResponseOpen, |
1056 | 0 | .requestOpenResponseClosed, |
1057 | 0 | .requestClosedResponseOpen, |
1058 | 0 | .requestClosedResponseClosed: |
1059 | 0 | return .failure(GRPCError.AlreadyComplete()) |
1060 | 113k | } |
1061 | 0 | } |
1062 | | |
1063 | | mutating func send( |
1064 | | buffer: ByteBuffer, |
1065 | | compress: Bool, |
1066 | | promise: EventLoopPromise<Void>? |
1067 | 1.26M | ) -> Result<Void, Error> { |
1068 | 1.26M | switch self { |
1069 | 1.26M | case .requestIdleResponseIdle: |
1070 | 0 | preconditionFailure("Invalid state: the request stream is still closed") |
1071 | 1.26M | |
1072 | 1.26M | case .requestOpenResponseIdle, |
1073 | 0 | .requestClosedResponseIdle: |
1074 | 0 | let error = GRPCError.InvalidState("Response headers must be sent before response message") |
1075 | 0 | return .failure(error) |
1076 | 1.26M | |
1077 | 1.26M | case var .requestOpenResponseOpen(state): |
1078 | 1.22M | self = .requestClosedResponseClosed |
1079 | 1.22M | state.send(buffer: buffer, compress: compress, promise: promise) |
1080 | 1.22M | self = .requestOpenResponseOpen(state) |
1081 | 1.22M | return .success(()) |
1082 | 1.26M | |
1083 | 1.26M | case var .requestClosedResponseOpen(state): |
1084 | 36.6k | self = .requestClosedResponseClosed |
1085 | 36.6k | state.send(buffer: buffer, compress: compress, promise: promise) |
1086 | 36.6k | self = .requestClosedResponseOpen(state) |
1087 | 36.6k | return .success(()) |
1088 | 1.26M | |
1089 | 1.26M | case .requestOpenResponseClosed, |
1090 | 0 | .requestClosedResponseClosed: |
1091 | 0 | return .failure(GRPCError.AlreadyComplete()) |
1092 | 1.26M | } |
1093 | 0 | } |
1094 | | |
1095 | 181k | mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? { |
1096 | 181k | switch self { |
1097 | 181k | case .requestIdleResponseIdle: |
1098 | 0 | preconditionFailure("Invalid state: the request stream is still closed") |
1099 | 181k | |
1100 | 181k | case .requestOpenResponseIdle, |
1101 | 0 | .requestClosedResponseIdle: |
1102 | 0 | return nil |
1103 | 181k | |
1104 | 181k | case var .requestOpenResponseOpen(state): |
1105 | 82.3k | self = .requestClosedResponseClosed |
1106 | 82.3k | let result = state.writer.next() |
1107 | 82.3k | self = .requestOpenResponseOpen(state) |
1108 | 82.3k | return result |
1109 | 181k | |
1110 | 181k | case var .requestClosedResponseOpen(state): |
1111 | 96.8k | self = .requestClosedResponseClosed |
1112 | 96.8k | let result = state.writer.next() |
1113 | 96.8k | self = .requestClosedResponseOpen(state) |
1114 | 96.8k | return result |
1115 | 181k | |
1116 | 181k | case .requestOpenResponseClosed, |
1117 | 2.59k | .requestClosedResponseClosed: |
1118 | 2.59k | return nil |
1119 | 181k | } |
1120 | 0 | } |
1121 | | |
1122 | | mutating func send( |
1123 | | status: GRPCStatus, |
1124 | | trailers: HPACKHeaders |
1125 | 113k | ) -> HTTP2ToRawGRPCStateMachine.SendEndAction { |
1126 | 113k | switch self { |
1127 | 113k | case .requestIdleResponseIdle: |
1128 | 0 | preconditionFailure("Invalid state: the request stream is still closed") |
1129 | 113k | |
1130 | 113k | case let .requestOpenResponseIdle(state): |
1131 | 0 | self = .requestOpenResponseClosed |
1132 | 0 | return .sendTrailers(state.send(status: status, trailers: trailers)) |
1133 | 113k | |
1134 | 113k | case let .requestClosedResponseIdle(state): |
1135 | 0 | self = .requestClosedResponseClosed |
1136 | 0 | return .sendTrailersAndFinish(state.send(status: status, trailers: trailers)) |
1137 | 113k | |
1138 | 113k | case let .requestOpenResponseOpen(state): |
1139 | 59.7k | self = .requestOpenResponseClosed |
1140 | 59.7k | return .sendTrailers(state.send(status: status, trailers: trailers)) |
1141 | 113k | |
1142 | 113k | case let .requestClosedResponseOpen(state): |
1143 | 53.7k | self = .requestClosedResponseClosed |
1144 | 53.7k | return .sendTrailersAndFinish(state.send(status: status, trailers: trailers)) |
1145 | 113k | |
1146 | 113k | case .requestOpenResponseClosed, |
1147 | 0 | .requestClosedResponseClosed: |
1148 | 0 | return .failure(GRPCError.AlreadyComplete()) |
1149 | 113k | } |
1150 | 0 | } |
1151 | | } |
1152 | | |
1153 | | // MARK: - Helpers |
1154 | | |
1155 | | extension HTTP2ToRawGRPCStateMachine { |
1156 | | static func makeResponseHeaders( |
1157 | | contentType: ContentType, |
1158 | | responseEncoding: String?, |
1159 | | acceptableRequestEncoding: String?, |
1160 | | userProvidedHeaders: HPACKHeaders, |
1161 | | normalizeUserProvidedHeaders: Bool |
1162 | 113k | ) -> HPACKHeaders { |
1163 | 113k | // 4 because ':status' and 'content-type' are required. We may send back 'grpc-encoding' and |
1164 | 113k | // 'grpc-accept-encoding' as well. |
1165 | 113k | let capacity = 4 + userProvidedHeaders.count |
1166 | 113k | |
1167 | 113k | var headers = HPACKHeaders() |
1168 | 113k | headers.reserveCapacity(capacity) |
1169 | 113k | |
1170 | 113k | headers.add(name: ":status", value: "200") |
1171 | 113k | headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue) |
1172 | 113k | |
1173 | 113k | if let responseEncoding = responseEncoding { |
1174 | 0 | headers.add(name: GRPCHeaderName.encoding, value: responseEncoding) |
1175 | 0 | } |
1176 | 113k | |
1177 | 113k | if let acceptEncoding = acceptableRequestEncoding { |
1178 | 0 | headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding) |
1179 | 0 | } |
1180 | 113k | |
1181 | 113k | // Add user provided headers, normalizing if required. |
1182 | 113k | headers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders) |
1183 | 113k | |
1184 | 113k | return headers |
1185 | 113k | } |
1186 | | |
1187 | | static func makeResponseTrailersOnly( |
1188 | | for status: GRPCStatus, |
1189 | | contentType: ContentType, |
1190 | | acceptableRequestEncoding: String?, |
1191 | | userProvidedHeaders: HPACKHeaders?, |
1192 | | normalizeUserProvidedHeaders: Bool |
1193 | 14.1k | ) -> HPACKHeaders { |
1194 | 14.1k | // 5 because ':status', 'content-type', 'grpc-status' are required. We may also send back |
1195 | 14.1k | // 'grpc-message' and 'grpc-accept-encoding'. |
1196 | 23.0k | let capacity = 5 + (userProvidedHeaders.map { $0.count } ?? 0) |
1197 | 14.1k | |
1198 | 14.1k | var headers = HPACKHeaders() |
1199 | 14.1k | headers.reserveCapacity(capacity) |
1200 | 14.1k | |
1201 | 14.1k | // Add the required trailers. |
1202 | 14.1k | headers.add(name: ":status", value: "200") |
1203 | 14.1k | headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue) |
1204 | 14.1k | headers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue)) |
1205 | 14.1k | |
1206 | 23.0k | if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) { |
1207 | 14.1k | headers.add(name: GRPCHeaderName.statusMessage, value: message) |
1208 | 14.1k | } |
1209 | 14.1k | |
1210 | 14.1k | // We may include this if the requested encoding was not valid. |
1211 | 14.1k | if let acceptEncoding = acceptableRequestEncoding { |
1212 | 0 | headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding) |
1213 | 0 | } |
1214 | 14.1k | |
1215 | 14.1k | if let userProvided = userProvidedHeaders { |
1216 | 0 | headers.add(contentsOf: userProvided, normalize: normalizeUserProvidedHeaders) |
1217 | 0 | } |
1218 | 14.1k | |
1219 | 14.1k | return headers |
1220 | 14.1k | } |
1221 | | |
1222 | | static func makeResponseTrailers( |
1223 | | for status: GRPCStatus, |
1224 | | userProvidedHeaders: HPACKHeaders, |
1225 | | normalizeUserProvidedHeaders: Bool |
1226 | 113k | ) -> HPACKHeaders { |
1227 | 113k | // Most RPCs should end with status code 'ok' (hopefully!), and if the user didn't provide any |
1228 | 113k | // additional trailers, then we can use a pre-canned set of headers to avoid an extra |
1229 | 113k | // allocation. |
1230 | 113k | if status == .ok, userProvidedHeaders.isEmpty { |
1231 | 65.6k | return Self.gRPCStatusOkTrailers |
1232 | 65.6k | } |
1233 | 47.7k | |
1234 | 47.7k | // 2 because 'grpc-status' is required, we may also send back 'grpc-message'. |
1235 | 47.7k | let capacity = 2 + userProvidedHeaders.count |
1236 | 47.7k | |
1237 | 47.7k | var trailers = HPACKHeaders() |
1238 | 47.7k | trailers.reserveCapacity(capacity) |
1239 | 47.7k | |
1240 | 47.7k | // status code. |
1241 | 47.7k | trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue)) |
1242 | 47.7k | |
1243 | 47.7k | // status message, if present. |
1244 | 72.3k | if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) { |
1245 | 47.7k | trailers.add(name: GRPCHeaderName.statusMessage, value: message) |
1246 | 47.7k | } |
1247 | 47.7k | |
1248 | 47.7k | // user provided trailers. |
1249 | 47.7k | trailers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders) |
1250 | 47.7k | |
1251 | 47.7k | return trailers |
1252 | 113k | } |
1253 | | |
1254 | | private static let gRPCStatusOkTrailers: HPACKHeaders = [ |
1255 | | GRPCHeaderName.statusCode: String(describing: GRPCStatus.Code.ok.rawValue) |
1256 | | ] |
1257 | | } |
1258 | | |
1259 | | extension HPACKHeaders { |
1260 | 230k | fileprivate mutating func add(contentsOf other: HPACKHeaders, normalize: Bool) { |
1261 | 230k | if normalize { |
1262 | 72.3k | self.add( |
1263 | 72.3k | contentsOf: other.lazy.map { name, value, indexable in |
1264 | 0 | (name: name.lowercased(), value: value, indexable: indexable) |
1265 | 0 | } |
1266 | 72.3k | ) |
1267 | 158k | } else { |
1268 | 158k | self.add(contentsOf: other) |
1269 | 158k | } |
1270 | 230k | } |
1271 | | } |