/src/grpc-swift/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | import NIOCore |
18 | | import NIOHPACK |
19 | | import NIOHTTP1 |
20 | | import NIOHTTP2 |
21 | | |
22 | | import struct Foundation.Data |
23 | | |
24 | | /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads. |
25 | | internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler { |
26 | | internal typealias InboundIn = HTTPServerRequestPart |
27 | | internal typealias InboundOut = HTTP2Frame.FramePayload |
28 | | |
29 | | internal typealias OutboundIn = HTTP2Frame.FramePayload |
30 | | internal typealias OutboundOut = HTTPServerResponsePart |
31 | | |
32 | | private var stateMachine: StateMachine |
33 | | |
34 | | /// Create a gRPC Web to server HTTP/2 codec. |
35 | | /// |
36 | | /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the |
37 | | /// request headers. |
38 | 11.3k | init(scheme: String) { |
39 | 11.3k | self.stateMachine = StateMachine(scheme: scheme) |
40 | 11.3k | } |
41 | | |
42 | 661k | internal func channelRead(context: ChannelHandlerContext, data: NIOAny) { |
43 | 661k | let action = self.stateMachine.processInbound( |
44 | 661k | serverRequestPart: self.unwrapInboundIn(data), |
45 | 661k | allocator: context.channel.allocator |
46 | 661k | ) |
47 | 661k | self.act(on: action, context: context) |
48 | 661k | } |
49 | | |
50 | | internal func write( |
51 | | context: ChannelHandlerContext, |
52 | | data: NIOAny, |
53 | | promise: EventLoopPromise<Void>? |
54 | 445k | ) { |
55 | 445k | let action = self.stateMachine.processOutbound( |
56 | 445k | framePayload: self.unwrapOutboundIn(data), |
57 | 445k | promise: promise, |
58 | 445k | allocator: context.channel.allocator |
59 | 445k | ) |
60 | 445k | self.act(on: action, context: context) |
61 | 445k | } |
62 | | |
63 | | /// Acts on an action returned by the state machine. |
64 | 1.60M | private func act(on action: StateMachine.Action, context: ChannelHandlerContext) { |
65 | 1.60M | switch action { |
66 | 1.60M | case .none: |
67 | 4.87k | () |
68 | 1.60M | |
69 | 1.60M | case let .fireChannelRead(payload): |
70 | 966k | context.fireChannelRead(self.wrapInboundOut(payload)) |
71 | 1.60M | |
72 | 1.60M | case let .write(write): |
73 | 612k | if let additionalPart = write.additionalPart { |
74 | 386k | context.write(self.wrapOutboundOut(write.part), promise: nil) |
75 | 386k | context.write(self.wrapOutboundOut(additionalPart), promise: write.promise) |
76 | 386k | } else { |
77 | 226k | context.write(self.wrapOutboundOut(write.part), promise: write.promise) |
78 | 226k | } |
79 | 612k | |
80 | 612k | if write.closeChannel { |
81 | 4.07k | context.close(mode: .all, promise: nil) |
82 | 4.07k | } |
83 | 1.60M | |
84 | 1.60M | case let .completePromise(promise, result): |
85 | 18.3k | promise?.completeWith(result) |
86 | 1.60M | } |
87 | 1.60M | } |
88 | | } |
89 | | |
90 | | extension GRPCWebToHTTP2ServerCodec { |
91 | | internal struct StateMachine { |
92 | | /// The current state. |
93 | | private var state: State |
94 | | private let scheme: String |
95 | | |
96 | 11.3k | internal init(scheme: String) { |
97 | 11.3k | self.state = .idle |
98 | 11.3k | self.scheme = scheme |
99 | 11.3k | } |
100 | | |
101 | | /// Process the inbound `HTTPServerRequestPart`. |
102 | | internal mutating func processInbound( |
103 | | serverRequestPart: HTTPServerRequestPart, |
104 | | allocator: ByteBufferAllocator |
105 | 661k | ) -> Action { |
106 | 661k | return self.state.processInbound( |
107 | 661k | serverRequestPart: serverRequestPart, |
108 | 661k | scheme: self.scheme, |
109 | 661k | allocator: allocator |
110 | 661k | ) |
111 | 661k | } |
112 | | |
113 | | /// Process the outbound `HTTP2Frame.FramePayload`. |
114 | | internal mutating func processOutbound( |
115 | | framePayload: HTTP2Frame.FramePayload, |
116 | | promise: EventLoopPromise<Void>?, |
117 | | allocator: ByteBufferAllocator |
118 | 445k | ) -> Action { |
119 | 445k | return self.state.processOutbound( |
120 | 445k | framePayload: framePayload, |
121 | 445k | promise: promise, |
122 | 445k | allocator: allocator |
123 | 445k | ) |
124 | 445k | } |
125 | | |
126 | | /// An action to take as a result of interaction with the state machine. |
127 | | internal enum Action { |
128 | | case none |
129 | | case fireChannelRead(HTTP2Frame.FramePayload) |
130 | | case write(Write) |
131 | | case completePromise(EventLoopPromise<Void>?, Result<Void, Error>) |
132 | | |
133 | | internal struct Write { |
134 | | internal var part: HTTPServerResponsePart |
135 | | internal var additionalPart: HTTPServerResponsePart? |
136 | | internal var promise: EventLoopPromise<Void>? |
137 | | internal var closeChannel: Bool |
138 | | |
139 | | internal init( |
140 | | part: HTTPServerResponsePart, |
141 | | additionalPart: HTTPServerResponsePart? = nil, |
142 | | promise: EventLoopPromise<Void>?, |
143 | | closeChannel: Bool |
144 | 428k | ) { |
145 | 428k | self.part = part |
146 | 428k | self.additionalPart = additionalPart |
147 | 428k | self.promise = promise |
148 | 428k | self.closeChannel = closeChannel |
149 | 428k | } |
150 | | } |
151 | | } |
152 | | |
153 | | fileprivate enum State { |
154 | | /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when |
155 | | /// receiving request headers. |
156 | | case idle |
157 | | |
158 | | /// Received request headers. Waiting for the end of request and response streams. |
159 | | case fullyOpen(InboundState, OutboundState) |
160 | | |
161 | | /// The server has closed the response stream, we may receive other request parts from the client. |
162 | | case clientOpenServerClosed(InboundState) |
163 | | |
164 | | /// The client has sent everything, the server still needs to close the response stream. |
165 | | case clientClosedServerOpen(OutboundState) |
166 | | |
167 | | /// Not a real state. |
168 | | case _modifying |
169 | | |
170 | 496k | private var isModifying: Bool { |
171 | 496k | switch self { |
172 | 496k | case ._modifying: |
173 | 0 | return true |
174 | 496k | case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed: |
175 | 496k | return false |
176 | 496k | } |
177 | 496k | } |
178 | | |
179 | 1.60M | private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action { |
180 | 1.60M | self = ._modifying |
181 | 1.60M | defer { |
182 | 1.60M | assert(!self.isModifying) |
183 | 1.60M | } |
184 | 1.60M | return body(&self) |
185 | 1.60M | } |
186 | | } |
187 | | |
188 | | fileprivate struct InboundState { |
189 | | /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text |
190 | | /// is being used, `nil` otherwise. |
191 | | var requestBuffer: ByteBuffer? |
192 | | |
193 | 386k | init(isTextEncoded: Bool, allocator: ByteBufferAllocator) { |
194 | 386k | self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil |
195 | 386k | } |
196 | | } |
197 | | |
198 | | fileprivate struct OutboundState { |
199 | | /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil` |
200 | | /// otherwise. |
201 | | var responseBuffer: CircularBuffer<ByteBuffer>? |
202 | | |
203 | | /// True if the response headers have been sent. |
204 | | var responseHeadersSent: Bool |
205 | | |
206 | | /// True if the server should close the connection when this request is done. |
207 | | var closeConnection: Bool |
208 | | |
209 | 386k | init(isTextEncoded: Bool, closeConnection: Bool) { |
210 | 386k | self.responseHeadersSent = false |
211 | 386k | self.responseBuffer = isTextEncoded ? CircularBuffer() : nil |
212 | 386k | self.closeConnection = closeConnection |
213 | 386k | } |
214 | | } |
215 | | } |
216 | | } |
217 | | |
218 | | extension GRPCWebToHTTP2ServerCodec.StateMachine.State { |
219 | | fileprivate mutating func processInbound( |
220 | | serverRequestPart: HTTPServerRequestPart, |
221 | | scheme: String, |
222 | | allocator: ByteBufferAllocator |
223 | 971k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
224 | 971k | switch serverRequestPart { |
225 | 971k | case let .head(head): |
226 | 386k | return self.processRequestHead(head, scheme: scheme, allocator: allocator) |
227 | 971k | case var .body(buffer): |
228 | 203k | return self.processRequestBody(&buffer) |
229 | 971k | case .end: |
230 | 381k | return self.processRequestEnd(allocator: allocator) |
231 | 971k | } |
232 | 971k | } |
233 | | |
234 | | fileprivate mutating func processOutbound( |
235 | | framePayload: HTTP2Frame.FramePayload, |
236 | | promise: EventLoopPromise<Void>?, |
237 | | allocator: ByteBufferAllocator |
238 | 631k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
239 | 631k | switch framePayload { |
240 | 631k | case let .headers(payload): |
241 | 544k | return self.processResponseHeaders(payload, promise: promise, allocator: allocator) |
242 | 631k | |
243 | 631k | case let .data(payload): |
244 | 86.0k | return self.processResponseData(payload, promise: promise) |
245 | 631k | |
246 | 631k | case .priority, |
247 | 0 | .rstStream, |
248 | 0 | .settings, |
249 | 0 | .pushPromise, |
250 | 0 | .ping, |
251 | 0 | .goAway, |
252 | 0 | .windowUpdate, |
253 | 0 | .alternativeService, |
254 | 0 | .origin: |
255 | 0 | preconditionFailure("Unsupported frame payload") |
256 | 631k | } |
257 | 0 | } |
258 | | } |
259 | | |
260 | | // MARK: - Inbound |
261 | | |
262 | | extension GRPCWebToHTTP2ServerCodec.StateMachine.State { |
263 | | private mutating func processRequestHead( |
264 | | _ head: HTTPRequestHead, |
265 | | scheme: String, |
266 | | allocator: ByteBufferAllocator |
267 | 386k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
268 | 386k | switch self { |
269 | 386k | case .idle: |
270 | 386k | return self.withStateAvoidingCoWs { state in |
271 | 386k | let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true) |
272 | 386k | |
273 | 386k | // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to |
274 | 386k | // allocate a second headers block to use the normalization provided by NIO HTTP/2. |
275 | 386k | // |
276 | 386k | // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the |
277 | 386k | // extra copy. |
278 | 386k | var headers = HPACKHeaders() |
279 | 386k | headers.reserveCapacity(normalized.count + 4) |
280 | 386k | headers.add(name: ":path", value: head.uri) |
281 | 386k | headers.add(name: ":method", value: head.method.rawValue) |
282 | 386k | headers.add(name: ":scheme", value: scheme) |
283 | 386k | if let host = head.headers.first(name: "host") { |
284 | 701 | headers.add(name: ":authority", value: host) |
285 | 701 | } |
286 | 386k | headers.add(contentsOf: normalized) |
287 | 386k | |
288 | 386k | // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type |
289 | 386k | // that will be done at the HTTP/2 level. |
290 | 386k | let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) |
291 | 386k | let isWebText = contentType == .some(.webTextProtobuf) |
292 | 386k | |
293 | 386k | let closeConnection = head.headers[canonicalForm: "connection"].contains("close") |
294 | 386k | |
295 | 386k | state = .fullyOpen( |
296 | 386k | .init(isTextEncoded: isWebText, allocator: allocator), |
297 | 386k | .init(isTextEncoded: isWebText, closeConnection: closeConnection) |
298 | 386k | ) |
299 | 386k | return .fireChannelRead(.headers(.init(headers: headers))) |
300 | 386k | } |
301 | 386k | |
302 | 386k | case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen: |
303 | 0 | preconditionFailure("Invalid state: already received request head") |
304 | 386k | |
305 | 386k | case ._modifying: |
306 | 0 | preconditionFailure("Left in modifying state") |
307 | 386k | } |
308 | 0 | } |
309 | | |
310 | | private mutating func processRequestBody( |
311 | | _ buffer: inout ByteBuffer |
312 | 203k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
313 | 203k | switch self { |
314 | 203k | case .idle: |
315 | 0 | preconditionFailure("Invalid state: haven't received request head") |
316 | 203k | |
317 | 203k | case .fullyOpen(var inbound, let outbound): |
318 | 136k | return self.withStateAvoidingCoWs { state in |
319 | 136k | let action = inbound.processInboundData(buffer: &buffer) |
320 | 136k | state = .fullyOpen(inbound, outbound) |
321 | 136k | return action |
322 | 136k | } |
323 | 203k | |
324 | 203k | case var .clientOpenServerClosed(inbound): |
325 | 66.9k | // The server is already done, but it's not our place to drop the request. |
326 | 66.9k | return self.withStateAvoidingCoWs { state in |
327 | 66.9k | let action = inbound.processInboundData(buffer: &buffer) |
328 | 66.9k | state = .clientOpenServerClosed(inbound) |
329 | 66.9k | return action |
330 | 66.9k | } |
331 | 203k | |
332 | 203k | case .clientClosedServerOpen: |
333 | 0 | preconditionFailure("End of request stream already received") |
334 | 203k | |
335 | 203k | case ._modifying: |
336 | 0 | preconditionFailure("Left in modifying state") |
337 | 203k | } |
338 | 0 | } |
339 | | |
340 | | private mutating func processRequestEnd( |
341 | | allocator: ByteBufferAllocator |
342 | 381k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
343 | 381k | switch self { |
344 | 381k | case .idle: |
345 | 0 | preconditionFailure("Invalid state: haven't received request head") |
346 | 381k | |
347 | 381k | case let .fullyOpen(_, outbound): |
348 | 69.6k | return self.withStateAvoidingCoWs { state in |
349 | 69.6k | // We're done with inbound state. |
350 | 69.6k | state = .clientClosedServerOpen(outbound) |
351 | 69.6k | |
352 | 69.6k | // Send an empty DATA frame with the end stream flag set. |
353 | 69.6k | let empty = allocator.buffer(capacity: 0) |
354 | 69.6k | return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true))) |
355 | 69.6k | } |
356 | 381k | |
357 | 381k | case .clientClosedServerOpen: |
358 | 0 | preconditionFailure("End of request stream already received") |
359 | 381k | |
360 | 381k | case .clientOpenServerClosed: |
361 | 311k | return self.withStateAvoidingCoWs { state in |
362 | 311k | // Both sides are closed now, back to idle. Don't forget to pass on the .end, as |
363 | 311k | // it's necessary to communicate to the other peers that the response is done. |
364 | 311k | state = .idle |
365 | 311k | |
366 | 311k | // Send an empty DATA frame with the end stream flag set. |
367 | 311k | let empty = allocator.buffer(capacity: 0) |
368 | 311k | return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true))) |
369 | 311k | } |
370 | 381k | |
371 | 381k | case ._modifying: |
372 | 0 | preconditionFailure("Left in modifying state") |
373 | 381k | } |
374 | 0 | } |
375 | | } |
376 | | |
377 | | // MARK: - Outbound |
378 | | |
379 | | extension GRPCWebToHTTP2ServerCodec.StateMachine.State { |
380 | | private mutating func processResponseTrailers( |
381 | | _ trailers: HPACKHeaders, |
382 | | promise: EventLoopPromise<Void>?, |
383 | | allocator: ByteBufferAllocator |
384 | 158k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
385 | 158k | switch self { |
386 | 158k | case .idle: |
387 | 0 | preconditionFailure("Invalid state: haven't received request head") |
388 | 158k | |
389 | 158k | case .fullyOpen(let inbound, var outbound): |
390 | 88.9k | return self.withStateAvoidingCoWs { state in |
391 | 88.9k | // Double check these are trailers. |
392 | 88.9k | assert(outbound.responseHeadersSent) |
393 | 88.9k | |
394 | 88.9k | // We haven't seen the end of the request stream yet. |
395 | 88.9k | state = .clientOpenServerClosed(inbound) |
396 | 88.9k | |
397 | 88.9k | // Avoid CoW-ing the buffers. |
398 | 88.9k | let responseBuffers = outbound.responseBuffer |
399 | 88.9k | outbound.responseBuffer = nil |
400 | 88.9k | |
401 | 88.9k | return Self.processTrailers( |
402 | 88.9k | responseBuffers: responseBuffers, |
403 | 88.9k | trailers: trailers, |
404 | 88.9k | promise: promise, |
405 | 88.9k | allocator: allocator, |
406 | 88.9k | closeChannel: outbound.closeConnection |
407 | 88.9k | ) |
408 | 88.9k | } |
409 | 158k | |
410 | 158k | case var .clientClosedServerOpen(state): |
411 | 69.6k | return self.withStateAvoidingCoWs { nextState in |
412 | 69.6k | // Client is closed and now so is the server. |
413 | 69.6k | nextState = .idle |
414 | 69.6k | |
415 | 69.6k | // Avoid CoW-ing the buffers. |
416 | 69.6k | let responseBuffers = state.responseBuffer |
417 | 69.6k | state.responseBuffer = nil |
418 | 69.6k | |
419 | 69.6k | return Self.processTrailers( |
420 | 69.6k | responseBuffers: responseBuffers, |
421 | 69.6k | trailers: trailers, |
422 | 69.6k | promise: promise, |
423 | 69.6k | allocator: allocator, |
424 | 69.6k | closeChannel: state.closeConnection |
425 | 69.6k | ) |
426 | 69.6k | } |
427 | 158k | |
428 | 158k | case .clientOpenServerClosed: |
429 | 0 | preconditionFailure("Already seen end of response stream") |
430 | 158k | |
431 | 158k | case ._modifying: |
432 | 0 | preconditionFailure("Left in modifying state") |
433 | 158k | } |
434 | 0 | } |
435 | | |
436 | | private static func processTrailers( |
437 | | responseBuffers: CircularBuffer<ByteBuffer>?, |
438 | | trailers: HPACKHeaders, |
439 | | promise: EventLoopPromise<Void>?, |
440 | | allocator: ByteBufferAllocator, |
441 | | closeChannel: Bool |
442 | 158k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
443 | 158k | if var responseBuffers = responseBuffers { |
444 | 20.0k | let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers( |
445 | 20.0k | &responseBuffers, |
446 | 20.0k | trailers: trailers, |
447 | 20.0k | allocator: allocator |
448 | 20.0k | ) |
449 | 20.0k | return .write( |
450 | 20.0k | .init( |
451 | 20.0k | part: .body(.byteBuffer(buffer)), |
452 | 20.0k | additionalPart: .end(nil), |
453 | 20.0k | promise: promise, |
454 | 20.0k | closeChannel: closeChannel |
455 | 20.0k | ) |
456 | 20.0k | ) |
457 | 138k | } else { |
458 | 138k | // No response buffer; plain gRPC Web. Trailers are encoded into the body as a regular |
459 | 138k | // length-prefixed message. |
460 | 138k | let buffer = GRPCWebToHTTP2ServerCodec.formatTrailers(trailers, allocator: allocator) |
461 | 138k | return .write( |
462 | 138k | .init( |
463 | 138k | part: .body(.byteBuffer(buffer)), |
464 | 138k | additionalPart: .end(nil), |
465 | 138k | promise: promise, |
466 | 138k | closeChannel: closeChannel |
467 | 138k | ) |
468 | 138k | ) |
469 | 138k | } |
470 | 158k | } |
471 | | |
472 | | private mutating func processResponseTrailersOnly( |
473 | | _ trailers: HPACKHeaders, |
474 | | promise: EventLoopPromise<Void>? |
475 | 227k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
476 | 227k | switch self { |
477 | 227k | case .idle: |
478 | 0 | preconditionFailure("Invalid state: haven't received request head") |
479 | 227k | |
480 | 227k | case let .fullyOpen(inbound, outbound): |
481 | 227k | return self.withStateAvoidingCoWs { state in |
482 | 227k | // We still haven't seen the end of the request stream. |
483 | 227k | state = .clientOpenServerClosed(inbound) |
484 | 227k | |
485 | 227k | let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( |
486 | 227k | hpackHeaders: trailers, |
487 | 227k | closeConnection: outbound.closeConnection |
488 | 227k | ) |
489 | 227k | |
490 | 227k | return .write( |
491 | 227k | .init( |
492 | 227k | part: .head(head), |
493 | 227k | additionalPart: .end(nil), |
494 | 227k | promise: promise, |
495 | 227k | closeChannel: outbound.closeConnection |
496 | 227k | ) |
497 | 227k | ) |
498 | 227k | } |
499 | 227k | |
500 | 227k | case let .clientClosedServerOpen(outbound): |
501 | 0 | return self.withStateAvoidingCoWs { state in |
502 | 0 | // We're done, back to idle. |
503 | 0 | state = .idle |
504 | 0 |
|
505 | 0 | let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( |
506 | 0 | hpackHeaders: trailers, |
507 | 0 | closeConnection: outbound.closeConnection |
508 | 0 | ) |
509 | 0 |
|
510 | 0 | return .write( |
511 | 0 | .init( |
512 | 0 | part: .head(head), |
513 | 0 | additionalPart: .end(nil), |
514 | 0 | promise: promise, |
515 | 0 | closeChannel: outbound.closeConnection |
516 | 0 | ) |
517 | 0 | ) |
518 | 0 | } |
519 | 227k | |
520 | 227k | case .clientOpenServerClosed: |
521 | 0 | preconditionFailure("Already seen end of response stream") |
522 | 227k | |
523 | 227k | case ._modifying: |
524 | 0 | preconditionFailure("Left in modifying state") |
525 | 227k | } |
526 | 0 | } |
527 | | |
528 | | private mutating func processResponseHeaders( |
529 | | _ headers: HPACKHeaders, |
530 | | promise: EventLoopPromise<Void>? |
531 | 158k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
532 | 158k | switch self { |
533 | 158k | case .idle: |
534 | 0 | preconditionFailure("Invalid state: haven't received request head") |
535 | 158k | |
536 | 158k | case .fullyOpen(let inbound, var outbound): |
537 | 158k | return self.withStateAvoidingCoWs { state in |
538 | 158k | outbound.responseHeadersSent = true |
539 | 158k | state = .fullyOpen(inbound, outbound) |
540 | 158k | |
541 | 158k | let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( |
542 | 158k | hpackHeaders: headers, |
543 | 158k | closeConnection: outbound.closeConnection |
544 | 158k | ) |
545 | 158k | return .write(.init(part: .head(head), promise: promise, closeChannel: false)) |
546 | 158k | } |
547 | 158k | |
548 | 158k | case var .clientClosedServerOpen(outbound): |
549 | 0 | return self.withStateAvoidingCoWs { state in |
550 | 0 | outbound.responseHeadersSent = true |
551 | 0 | state = .clientClosedServerOpen(outbound) |
552 | 0 |
|
553 | 0 | let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( |
554 | 0 | hpackHeaders: headers, |
555 | 0 | closeConnection: outbound.closeConnection |
556 | 0 | ) |
557 | 0 | return .write(.init(part: .head(head), promise: promise, closeChannel: false)) |
558 | 0 | } |
559 | 158k | |
560 | 158k | case .clientOpenServerClosed: |
561 | 0 | preconditionFailure("Already seen end of response stream") |
562 | 158k | |
563 | 158k | case ._modifying: |
564 | 0 | preconditionFailure("Left in modifying state") |
565 | 158k | } |
566 | 0 | } |
567 | | |
568 | | private mutating func processResponseHeaders( |
569 | | _ payload: HTTP2Frame.FramePayload.Headers, |
570 | | promise: EventLoopPromise<Void>?, |
571 | | allocator: ByteBufferAllocator |
572 | 544k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
573 | 544k | switch self { |
574 | 544k | case .idle: |
575 | 0 | preconditionFailure("Invalid state: haven't received request head") |
576 | 544k | |
577 | 544k | case let .fullyOpen(_, outbound), |
578 | 544k | let .clientClosedServerOpen(outbound): |
579 | 544k | if outbound.responseHeadersSent { |
580 | 158k | // Headers have been sent, these must be trailers, so end stream must be set. |
581 | 158k | assert(payload.endStream) |
582 | 158k | return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator) |
583 | 386k | } else if payload.endStream { |
584 | 227k | // Headers haven't been sent yet and end stream is set: this is a trailers only response |
585 | 227k | // so we need to send 'end' as well. |
586 | 227k | return self.processResponseTrailersOnly(payload.headers, promise: promise) |
587 | 227k | } else { |
588 | 158k | return self.processResponseHeaders(payload.headers, promise: promise) |
589 | 158k | } |
590 | 544k | |
591 | 544k | case .clientOpenServerClosed: |
592 | 0 | // We've already sent end. |
593 | 0 | return .completePromise(promise, .failure(GRPCError.AlreadyComplete())) |
594 | 544k | |
595 | 544k | case ._modifying: |
596 | 0 | preconditionFailure("Left in modifying state") |
597 | 544k | } |
598 | 0 | } |
599 | | |
600 | | private static func processResponseData( |
601 | | _ payload: HTTP2Frame.FramePayload.Data, |
602 | | promise: EventLoopPromise<Void>?, |
603 | | state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState |
604 | 86.0k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
605 | 86.0k | if state.responseBuffer == nil { |
606 | 67.7k | // Not gRPC Web Text; just write the body. |
607 | 67.7k | return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false)) |
608 | 67.7k | } else { |
609 | 18.3k | switch payload.data { |
610 | 18.3k | case let .byteBuffer(buffer): |
611 | 18.3k | // '!' is fine, we checked above. |
612 | 18.3k | state.responseBuffer!.append(buffer) |
613 | 18.3k | |
614 | 18.3k | case .fileRegion: |
615 | 0 | preconditionFailure("Unexpected IOData.fileRegion") |
616 | 18.3k | } |
617 | 18.3k | |
618 | 18.3k | // The response is buffered, we can consider it dealt with. |
619 | 18.3k | return .completePromise(promise, .success(())) |
620 | 18.3k | } |
621 | 86.0k | } |
622 | | |
623 | | private mutating func processResponseData( |
624 | | _ payload: HTTP2Frame.FramePayload.Data, |
625 | | promise: EventLoopPromise<Void>? |
626 | 86.0k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
627 | 86.0k | switch self { |
628 | 86.0k | case .idle: |
629 | 0 | preconditionFailure("Invalid state: haven't received request head") |
630 | 86.0k | |
631 | 86.0k | case .fullyOpen(let inbound, var outbound): |
632 | 34.4k | return self.withStateAvoidingCoWs { state in |
633 | 34.4k | let action = Self.processResponseData(payload, promise: promise, state: &outbound) |
634 | 34.4k | state = .fullyOpen(inbound, outbound) |
635 | 34.4k | return action |
636 | 34.4k | } |
637 | 86.0k | |
638 | 86.0k | case var .clientClosedServerOpen(outbound): |
639 | 51.6k | return self.withStateAvoidingCoWs { state in |
640 | 51.6k | let action = Self.processResponseData(payload, promise: promise, state: &outbound) |
641 | 51.6k | state = .clientClosedServerOpen(outbound) |
642 | 51.6k | return action |
643 | 51.6k | } |
644 | 86.0k | |
645 | 86.0k | case .clientOpenServerClosed: |
646 | 0 | return .completePromise(promise, .failure(GRPCError.AlreadyComplete())) |
647 | 86.0k | |
648 | 86.0k | case ._modifying: |
649 | 0 | preconditionFailure("Left in modifying state") |
650 | 86.0k | } |
651 | 0 | } |
652 | | } |
653 | | |
654 | | // MARK: - Helpers |
655 | | |
656 | | extension GRPCWebToHTTP2ServerCodec { |
657 | | private static func makeResponseHead( |
658 | | hpackHeaders: HPACKHeaders, |
659 | | closeConnection: Bool |
660 | 386k | ) -> HTTPResponseHead { |
661 | 386k | var headers = HTTPHeaders(hpackHeaders: hpackHeaders) |
662 | 386k | |
663 | 386k | if closeConnection { |
664 | 4.07k | headers.add(name: "connection", value: "close") |
665 | 4.07k | } |
666 | 386k | |
667 | 386k | // Grab the status, if this is missing we've messed up in another handler. |
668 | 386k | guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else { |
669 | 0 | preconditionFailure("Invalid state: missing ':status' pseudo header") |
670 | 386k | } |
671 | 386k | |
672 | 386k | return HTTPResponseHead( |
673 | 386k | version: .init(major: 1, minor: 1), |
674 | 386k | status: .init(statusCode: statusCode), |
675 | 386k | headers: headers |
676 | 386k | ) |
677 | 386k | } |
678 | | |
679 | | private static func formatTrailers( |
680 | | _ trailers: HPACKHeaders, |
681 | | allocator: ByteBufferAllocator |
682 | 158k | ) -> ByteBuffer { |
683 | 158k | // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md |
684 | 230k | let length = trailers.reduce(0) { partial, trailer in |
685 | 230k | // +4 for: ":", " ", "\r", "\n" |
686 | 230k | return partial + trailer.name.utf8.count + trailer.value.utf8.count + 4 |
687 | 230k | } |
688 | 158k | var buffer = allocator.buffer(capacity: 5 + length) |
689 | 158k | |
690 | 158k | // Uncompressed trailer byte. |
691 | 158k | buffer.writeInteger(UInt8(0x80)) |
692 | 158k | // Length. |
693 | 158k | let lengthIndex = buffer.writerIndex |
694 | 158k | buffer.writeInteger(UInt32(0)) |
695 | 158k | |
696 | 158k | var bytesWritten = 0 |
697 | 230k | for (name, value, _) in trailers { |
698 | 230k | bytesWritten += buffer.writeString(name) |
699 | 230k | bytesWritten += buffer.writeString(": ") |
700 | 230k | bytesWritten += buffer.writeString(value) |
701 | 230k | bytesWritten += buffer.writeString("\r\n") |
702 | 230k | } |
703 | 158k | |
704 | 158k | buffer.setInteger(UInt32(bytesWritten), at: lengthIndex) |
705 | 158k | return buffer |
706 | 158k | } |
707 | | |
708 | | private static func encodeResponsesAndTrailers( |
709 | | _ responses: inout CircularBuffer<ByteBuffer>, |
710 | | trailers: HPACKHeaders, |
711 | | allocator: ByteBufferAllocator |
712 | 20.0k | ) -> ByteBuffer { |
713 | 20.0k | // We need to encode the trailers along with any responses we're holding. |
714 | 20.0k | responses.append(self.formatTrailers(trailers, allocator: allocator)) |
715 | 20.0k | |
716 | 38.4k | let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +) |
717 | 20.0k | // '!' is fine: responses isn't empty, we just appended the trailers. |
718 | 20.0k | var buffer = responses.popFirst()! |
719 | 20.0k | |
720 | 20.0k | // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth |
721 | 20.0k | // but this is fine for now. |
722 | 20.0k | var accumulatedData = buffer.readData(length: buffer.readableBytes)! |
723 | 20.0k | accumulatedData.reserveCapacity(capacity) |
724 | 20.0k | while let buffer = responses.popFirst() { |
725 | 18.3k | accumulatedData.append(contentsOf: buffer.readableBytesView) |
726 | 20.0k | } |
727 | 20.0k | |
728 | 20.0k | // We can reuse the popped buffer. |
729 | 20.0k | let base64Encoded = accumulatedData.base64EncodedString() |
730 | 20.0k | buffer.clear(minimumCapacity: base64Encoded.utf8.count) |
731 | 20.0k | buffer.writeString(base64Encoded) |
732 | 20.0k | |
733 | 20.0k | return buffer |
734 | 20.0k | } |
735 | | } |
736 | | |
737 | | extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState { |
738 | | fileprivate mutating func processInboundData( |
739 | | buffer: inout ByteBuffer |
740 | 203k | ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { |
741 | 203k | if self.requestBuffer == nil { |
742 | 189k | // We're not dealing with gRPC Web Text: just forward the buffer. |
743 | 189k | return .fireChannelRead(.data(.init(data: .byteBuffer(buffer)))) |
744 | 189k | } |
745 | 14.7k | |
746 | 14.7k | if self.requestBuffer!.readableBytes == 0 { |
747 | 11.4k | self.requestBuffer = buffer |
748 | 11.4k | } else { |
749 | 3.22k | self.requestBuffer!.writeBuffer(&buffer) |
750 | 3.22k | } |
751 | 14.7k | |
752 | 14.7k | let readableBytes = self.requestBuffer!.readableBytes |
753 | 14.7k | // The length of base64 encoded data must be a multiple of 4. |
754 | 14.7k | let bytesToRead = readableBytes - (readableBytes % 4) |
755 | 14.7k | |
756 | 14.7k | let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action |
757 | 14.7k | |
758 | 14.7k | if bytesToRead > 0, |
759 | 14.7k | let base64Encoded = self.requestBuffer!.readString(length: bytesToRead), |
760 | 14.7k | let base64Decoded = Data(base64Encoded: base64Encoded) |
761 | 14.7k | { |
762 | 9.84k | // Recycle the input buffer and restore the request buffer. |
763 | 9.84k | buffer.clear() |
764 | 9.84k | buffer.writeContiguousBytes(base64Decoded) |
765 | 9.84k | action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer)))) |
766 | 9.84k | } else { |
767 | 4.87k | action = .none |
768 | 4.87k | } |
769 | 14.7k | |
770 | 14.7k | return action |
771 | 203k | } |
772 | | } |
773 | | |
774 | | extension HTTPHeaders { |
775 | 386k | fileprivate init(hpackHeaders headers: HPACKHeaders) { |
776 | 386k | self.init() |
777 | 386k | self.reserveCapacity(headers.count) |
778 | 386k | |
779 | 386k | // Pseudo-headers are at the start of the block, so drop them and then add the remaining. |
780 | 567k | let regularHeaders = headers.drop { name, _, _ in |
781 | 567k | name.utf8.first == .some(UInt8(ascii: ":")) |
782 | 567k | }.lazy.map { name, value, _ in |
783 | 227k | (name, value) |
784 | 227k | } |
785 | 386k | |
786 | 386k | self.add(contentsOf: regularHeaders) |
787 | 386k | } |
788 | | } |