Coverage Report

Created: 2026-02-11 07:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP1
20
import NIOHTTP2
21
22
import struct Foundation.Data
23
24
/// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
25
internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
26
  internal typealias InboundIn = HTTPServerRequestPart
27
  internal typealias InboundOut = HTTP2Frame.FramePayload
28
29
  internal typealias OutboundIn = HTTP2Frame.FramePayload
30
  internal typealias OutboundOut = HTTPServerResponsePart
31
32
  private var stateMachine: StateMachine
33
34
  /// Create a gRPC Web to server HTTP/2 codec.
35
  ///
36
  /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
37
  ///   request headers.
38
11.3k
  init(scheme: String) {
39
11.3k
    self.stateMachine = StateMachine(scheme: scheme)
40
11.3k
  }
41
42
661k
  internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
43
661k
    let action = self.stateMachine.processInbound(
44
661k
      serverRequestPart: self.unwrapInboundIn(data),
45
661k
      allocator: context.channel.allocator
46
661k
    )
47
661k
    self.act(on: action, context: context)
48
661k
  }
49
50
  internal func write(
51
    context: ChannelHandlerContext,
52
    data: NIOAny,
53
    promise: EventLoopPromise<Void>?
54
445k
  ) {
55
445k
    let action = self.stateMachine.processOutbound(
56
445k
      framePayload: self.unwrapOutboundIn(data),
57
445k
      promise: promise,
58
445k
      allocator: context.channel.allocator
59
445k
    )
60
445k
    self.act(on: action, context: context)
61
445k
  }
62
63
  /// Acts on an action returned by the state machine.
64
1.60M
  private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
65
1.60M
    switch action {
66
1.60M
    case .none:
67
4.87k
      ()
68
1.60M
69
1.60M
    case let .fireChannelRead(payload):
70
966k
      context.fireChannelRead(self.wrapInboundOut(payload))
71
1.60M
72
1.60M
    case let .write(write):
73
612k
      if let additionalPart = write.additionalPart {
74
386k
        context.write(self.wrapOutboundOut(write.part), promise: nil)
75
386k
        context.write(self.wrapOutboundOut(additionalPart), promise: write.promise)
76
386k
      } else {
77
226k
        context.write(self.wrapOutboundOut(write.part), promise: write.promise)
78
226k
      }
79
612k
80
612k
      if write.closeChannel {
81
4.07k
        context.close(mode: .all, promise: nil)
82
4.07k
      }
83
1.60M
84
1.60M
    case let .completePromise(promise, result):
85
18.3k
      promise?.completeWith(result)
86
1.60M
    }
87
1.60M
  }
88
}
89
90
extension GRPCWebToHTTP2ServerCodec {
91
  internal struct StateMachine {
92
    /// The current state.
93
    private var state: State
94
    private let scheme: String
95
96
11.3k
    internal init(scheme: String) {
97
11.3k
      self.state = .idle
98
11.3k
      self.scheme = scheme
99
11.3k
    }
100
101
    /// Process the inbound `HTTPServerRequestPart`.
102
    internal mutating func processInbound(
103
      serverRequestPart: HTTPServerRequestPart,
104
      allocator: ByteBufferAllocator
105
661k
    ) -> Action {
106
661k
      return self.state.processInbound(
107
661k
        serverRequestPart: serverRequestPart,
108
661k
        scheme: self.scheme,
109
661k
        allocator: allocator
110
661k
      )
111
661k
    }
112
113
    /// Process the outbound `HTTP2Frame.FramePayload`.
114
    internal mutating func processOutbound(
115
      framePayload: HTTP2Frame.FramePayload,
116
      promise: EventLoopPromise<Void>?,
117
      allocator: ByteBufferAllocator
118
445k
    ) -> Action {
119
445k
      return self.state.processOutbound(
120
445k
        framePayload: framePayload,
121
445k
        promise: promise,
122
445k
        allocator: allocator
123
445k
      )
124
445k
    }
125
126
    /// An action to take as a result of interaction with the state machine.
127
    internal enum Action {
128
      case none
129
      case fireChannelRead(HTTP2Frame.FramePayload)
130
      case write(Write)
131
      case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
132
133
      internal struct Write {
134
        internal var part: HTTPServerResponsePart
135
        internal var additionalPart: HTTPServerResponsePart?
136
        internal var promise: EventLoopPromise<Void>?
137
        internal var closeChannel: Bool
138
139
        internal init(
140
          part: HTTPServerResponsePart,
141
          additionalPart: HTTPServerResponsePart? = nil,
142
          promise: EventLoopPromise<Void>?,
143
          closeChannel: Bool
144
428k
        ) {
145
428k
          self.part = part
146
428k
          self.additionalPart = additionalPart
147
428k
          self.promise = promise
148
428k
          self.closeChannel = closeChannel
149
428k
        }
150
      }
151
    }
152
153
    fileprivate enum State {
154
      /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when
155
      /// receiving request headers.
156
      case idle
157
158
      /// Received request headers. Waiting for the end of request and response streams.
159
      case fullyOpen(InboundState, OutboundState)
160
161
      /// The server has closed the response stream, we may receive other request parts from the client.
162
      case clientOpenServerClosed(InboundState)
163
164
      /// The client has sent everything, the server still needs to close the response stream.
165
      case clientClosedServerOpen(OutboundState)
166
167
      /// Not a real state.
168
      case _modifying
169
170
496k
      private var isModifying: Bool {
171
496k
        switch self {
172
496k
        case ._modifying:
173
0
          return true
174
496k
        case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed:
175
496k
          return false
176
496k
        }
177
496k
      }
178
179
1.60M
      private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
180
1.60M
        self = ._modifying
181
1.60M
        defer {
182
1.60M
          assert(!self.isModifying)
183
1.60M
        }
184
1.60M
        return body(&self)
185
1.60M
      }
186
    }
187
188
    fileprivate struct InboundState {
189
      /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
190
      /// is being used, `nil` otherwise.
191
      var requestBuffer: ByteBuffer?
192
193
386k
      init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
194
386k
        self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil
195
386k
      }
196
    }
197
198
    fileprivate struct OutboundState {
199
      /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
200
      /// otherwise.
201
      var responseBuffer: CircularBuffer<ByteBuffer>?
202
203
      /// True if the response headers have been sent.
204
      var responseHeadersSent: Bool
205
206
      /// True if the server should close the connection when this request is done.
207
      var closeConnection: Bool
208
209
386k
      init(isTextEncoded: Bool, closeConnection: Bool) {
210
386k
        self.responseHeadersSent = false
211
386k
        self.responseBuffer = isTextEncoded ? CircularBuffer() : nil
212
386k
        self.closeConnection = closeConnection
213
386k
      }
214
    }
215
  }
216
}
217
218
extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
219
  fileprivate mutating func processInbound(
220
    serverRequestPart: HTTPServerRequestPart,
221
    scheme: String,
222
    allocator: ByteBufferAllocator
223
971k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
224
971k
    switch serverRequestPart {
225
971k
    case let .head(head):
226
386k
      return self.processRequestHead(head, scheme: scheme, allocator: allocator)
227
971k
    case var .body(buffer):
228
203k
      return self.processRequestBody(&buffer)
229
971k
    case .end:
230
381k
      return self.processRequestEnd(allocator: allocator)
231
971k
    }
232
971k
  }
233
234
  fileprivate mutating func processOutbound(
235
    framePayload: HTTP2Frame.FramePayload,
236
    promise: EventLoopPromise<Void>?,
237
    allocator: ByteBufferAllocator
238
631k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
239
631k
    switch framePayload {
240
631k
    case let .headers(payload):
241
544k
      return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
242
631k
243
631k
    case let .data(payload):
244
86.0k
      return self.processResponseData(payload, promise: promise)
245
631k
246
631k
    case .priority,
247
0
      .rstStream,
248
0
      .settings,
249
0
      .pushPromise,
250
0
      .ping,
251
0
      .goAway,
252
0
      .windowUpdate,
253
0
      .alternativeService,
254
0
      .origin:
255
0
      preconditionFailure("Unsupported frame payload")
256
631k
    }
257
0
  }
258
}
259
260
// MARK: - Inbound
261
262
extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
263
  private mutating func processRequestHead(
264
    _ head: HTTPRequestHead,
265
    scheme: String,
266
    allocator: ByteBufferAllocator
267
386k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
268
386k
    switch self {
269
386k
    case .idle:
270
386k
      return self.withStateAvoidingCoWs { state in
271
386k
        let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
272
386k
273
386k
        // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
274
386k
        // allocate a second headers block to use the normalization provided by NIO HTTP/2.
275
386k
        //
276
386k
        // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
277
386k
        // extra copy.
278
386k
        var headers = HPACKHeaders()
279
386k
        headers.reserveCapacity(normalized.count + 4)
280
386k
        headers.add(name: ":path", value: head.uri)
281
386k
        headers.add(name: ":method", value: head.method.rawValue)
282
386k
        headers.add(name: ":scheme", value: scheme)
283
386k
        if let host = head.headers.first(name: "host") {
284
701
          headers.add(name: ":authority", value: host)
285
701
        }
286
386k
        headers.add(contentsOf: normalized)
287
386k
288
386k
        // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
289
386k
        // that will be done at the HTTP/2 level.
290
386k
        let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
291
386k
        let isWebText = contentType == .some(.webTextProtobuf)
292
386k
293
386k
        let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
294
386k
295
386k
        state = .fullyOpen(
296
386k
          .init(isTextEncoded: isWebText, allocator: allocator),
297
386k
          .init(isTextEncoded: isWebText, closeConnection: closeConnection)
298
386k
        )
299
386k
        return .fireChannelRead(.headers(.init(headers: headers)))
300
386k
      }
301
386k
302
386k
    case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
303
0
      preconditionFailure("Invalid state: already received request head")
304
386k
305
386k
    case ._modifying:
306
0
      preconditionFailure("Left in modifying state")
307
386k
    }
308
0
  }
309
310
  private mutating func processRequestBody(
311
    _ buffer: inout ByteBuffer
312
203k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
313
203k
    switch self {
314
203k
    case .idle:
315
0
      preconditionFailure("Invalid state: haven't received request head")
316
203k
317
203k
    case .fullyOpen(var inbound, let outbound):
318
136k
      return self.withStateAvoidingCoWs { state in
319
136k
        let action = inbound.processInboundData(buffer: &buffer)
320
136k
        state = .fullyOpen(inbound, outbound)
321
136k
        return action
322
136k
      }
323
203k
324
203k
    case var .clientOpenServerClosed(inbound):
325
66.9k
      // The server is already done, but it's not our place to drop the request.
326
66.9k
      return self.withStateAvoidingCoWs { state in
327
66.9k
        let action = inbound.processInboundData(buffer: &buffer)
328
66.9k
        state = .clientOpenServerClosed(inbound)
329
66.9k
        return action
330
66.9k
      }
331
203k
332
203k
    case .clientClosedServerOpen:
333
0
      preconditionFailure("End of request stream already received")
334
203k
335
203k
    case ._modifying:
336
0
      preconditionFailure("Left in modifying state")
337
203k
    }
338
0
  }
339
340
  private mutating func processRequestEnd(
341
    allocator: ByteBufferAllocator
342
381k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
343
381k
    switch self {
344
381k
    case .idle:
345
0
      preconditionFailure("Invalid state: haven't received request head")
346
381k
347
381k
    case let .fullyOpen(_, outbound):
348
69.6k
      return self.withStateAvoidingCoWs { state in
349
69.6k
        // We're done with inbound state.
350
69.6k
        state = .clientClosedServerOpen(outbound)
351
69.6k
352
69.6k
        // Send an empty DATA frame with the end stream flag set.
353
69.6k
        let empty = allocator.buffer(capacity: 0)
354
69.6k
        return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
355
69.6k
      }
356
381k
357
381k
    case .clientClosedServerOpen:
358
0
      preconditionFailure("End of request stream already received")
359
381k
360
381k
    case .clientOpenServerClosed:
361
311k
      return self.withStateAvoidingCoWs { state in
362
311k
        // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
363
311k
        // it's necessary to communicate to the other peers that the response is done.
364
311k
        state = .idle
365
311k
366
311k
        // Send an empty DATA frame with the end stream flag set.
367
311k
        let empty = allocator.buffer(capacity: 0)
368
311k
        return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
369
311k
      }
370
381k
371
381k
    case ._modifying:
372
0
      preconditionFailure("Left in modifying state")
373
381k
    }
374
0
  }
375
}
376
377
// MARK: - Outbound
378
379
extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
380
  private mutating func processResponseTrailers(
381
    _ trailers: HPACKHeaders,
382
    promise: EventLoopPromise<Void>?,
383
    allocator: ByteBufferAllocator
384
158k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
385
158k
    switch self {
386
158k
    case .idle:
387
0
      preconditionFailure("Invalid state: haven't received request head")
388
158k
389
158k
    case .fullyOpen(let inbound, var outbound):
390
88.9k
      return self.withStateAvoidingCoWs { state in
391
88.9k
        // Double check these are trailers.
392
88.9k
        assert(outbound.responseHeadersSent)
393
88.9k
394
88.9k
        // We haven't seen the end of the request stream yet.
395
88.9k
        state = .clientOpenServerClosed(inbound)
396
88.9k
397
88.9k
        // Avoid CoW-ing the buffers.
398
88.9k
        let responseBuffers = outbound.responseBuffer
399
88.9k
        outbound.responseBuffer = nil
400
88.9k
401
88.9k
        return Self.processTrailers(
402
88.9k
          responseBuffers: responseBuffers,
403
88.9k
          trailers: trailers,
404
88.9k
          promise: promise,
405
88.9k
          allocator: allocator,
406
88.9k
          closeChannel: outbound.closeConnection
407
88.9k
        )
408
88.9k
      }
409
158k
410
158k
    case var .clientClosedServerOpen(state):
411
69.6k
      return self.withStateAvoidingCoWs { nextState in
412
69.6k
        // Client is closed and now so is the server.
413
69.6k
        nextState = .idle
414
69.6k
415
69.6k
        // Avoid CoW-ing the buffers.
416
69.6k
        let responseBuffers = state.responseBuffer
417
69.6k
        state.responseBuffer = nil
418
69.6k
419
69.6k
        return Self.processTrailers(
420
69.6k
          responseBuffers: responseBuffers,
421
69.6k
          trailers: trailers,
422
69.6k
          promise: promise,
423
69.6k
          allocator: allocator,
424
69.6k
          closeChannel: state.closeConnection
425
69.6k
        )
426
69.6k
      }
427
158k
428
158k
    case .clientOpenServerClosed:
429
0
      preconditionFailure("Already seen end of response stream")
430
158k
431
158k
    case ._modifying:
432
0
      preconditionFailure("Left in modifying state")
433
158k
    }
434
0
  }
435
436
  private static func processTrailers(
437
    responseBuffers: CircularBuffer<ByteBuffer>?,
438
    trailers: HPACKHeaders,
439
    promise: EventLoopPromise<Void>?,
440
    allocator: ByteBufferAllocator,
441
    closeChannel: Bool
442
158k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
443
158k
    if var responseBuffers = responseBuffers {
444
20.0k
      let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
445
20.0k
        &responseBuffers,
446
20.0k
        trailers: trailers,
447
20.0k
        allocator: allocator
448
20.0k
      )
449
20.0k
      return .write(
450
20.0k
        .init(
451
20.0k
          part: .body(.byteBuffer(buffer)),
452
20.0k
          additionalPart: .end(nil),
453
20.0k
          promise: promise,
454
20.0k
          closeChannel: closeChannel
455
20.0k
        )
456
20.0k
      )
457
138k
    } else {
458
138k
      // No response buffer; plain gRPC Web. Trailers are encoded into the body as a regular
459
138k
      // length-prefixed message.
460
138k
      let buffer = GRPCWebToHTTP2ServerCodec.formatTrailers(trailers, allocator: allocator)
461
138k
      return .write(
462
138k
        .init(
463
138k
          part: .body(.byteBuffer(buffer)),
464
138k
          additionalPart: .end(nil),
465
138k
          promise: promise,
466
138k
          closeChannel: closeChannel
467
138k
        )
468
138k
      )
469
138k
    }
470
158k
  }
471
472
  private mutating func processResponseTrailersOnly(
473
    _ trailers: HPACKHeaders,
474
    promise: EventLoopPromise<Void>?
475
227k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
476
227k
    switch self {
477
227k
    case .idle:
478
0
      preconditionFailure("Invalid state: haven't received request head")
479
227k
480
227k
    case let .fullyOpen(inbound, outbound):
481
227k
      return self.withStateAvoidingCoWs { state in
482
227k
        // We still haven't seen the end of the request stream.
483
227k
        state = .clientOpenServerClosed(inbound)
484
227k
485
227k
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
486
227k
          hpackHeaders: trailers,
487
227k
          closeConnection: outbound.closeConnection
488
227k
        )
489
227k
490
227k
        return .write(
491
227k
          .init(
492
227k
            part: .head(head),
493
227k
            additionalPart: .end(nil),
494
227k
            promise: promise,
495
227k
            closeChannel: outbound.closeConnection
496
227k
          )
497
227k
        )
498
227k
      }
499
227k
500
227k
    case let .clientClosedServerOpen(outbound):
501
0
      return self.withStateAvoidingCoWs { state in
502
0
        // We're done, back to idle.
503
0
        state = .idle
504
0
505
0
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
506
0
          hpackHeaders: trailers,
507
0
          closeConnection: outbound.closeConnection
508
0
        )
509
0
510
0
        return .write(
511
0
          .init(
512
0
            part: .head(head),
513
0
            additionalPart: .end(nil),
514
0
            promise: promise,
515
0
            closeChannel: outbound.closeConnection
516
0
          )
517
0
        )
518
0
      }
519
227k
520
227k
    case .clientOpenServerClosed:
521
0
      preconditionFailure("Already seen end of response stream")
522
227k
523
227k
    case ._modifying:
524
0
      preconditionFailure("Left in modifying state")
525
227k
    }
526
0
  }
527
528
  private mutating func processResponseHeaders(
529
    _ headers: HPACKHeaders,
530
    promise: EventLoopPromise<Void>?
531
158k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
532
158k
    switch self {
533
158k
    case .idle:
534
0
      preconditionFailure("Invalid state: haven't received request head")
535
158k
536
158k
    case .fullyOpen(let inbound, var outbound):
537
158k
      return self.withStateAvoidingCoWs { state in
538
158k
        outbound.responseHeadersSent = true
539
158k
        state = .fullyOpen(inbound, outbound)
540
158k
541
158k
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
542
158k
          hpackHeaders: headers,
543
158k
          closeConnection: outbound.closeConnection
544
158k
        )
545
158k
        return .write(.init(part: .head(head), promise: promise, closeChannel: false))
546
158k
      }
547
158k
548
158k
    case var .clientClosedServerOpen(outbound):
549
0
      return self.withStateAvoidingCoWs { state in
550
0
        outbound.responseHeadersSent = true
551
0
        state = .clientClosedServerOpen(outbound)
552
0
553
0
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
554
0
          hpackHeaders: headers,
555
0
          closeConnection: outbound.closeConnection
556
0
        )
557
0
        return .write(.init(part: .head(head), promise: promise, closeChannel: false))
558
0
      }
559
158k
560
158k
    case .clientOpenServerClosed:
561
0
      preconditionFailure("Already seen end of response stream")
562
158k
563
158k
    case ._modifying:
564
0
      preconditionFailure("Left in modifying state")
565
158k
    }
566
0
  }
567
568
  private mutating func processResponseHeaders(
569
    _ payload: HTTP2Frame.FramePayload.Headers,
570
    promise: EventLoopPromise<Void>?,
571
    allocator: ByteBufferAllocator
572
544k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
573
544k
    switch self {
574
544k
    case .idle:
575
0
      preconditionFailure("Invalid state: haven't received request head")
576
544k
577
544k
    case let .fullyOpen(_, outbound),
578
544k
      let .clientClosedServerOpen(outbound):
579
544k
      if outbound.responseHeadersSent {
580
158k
        // Headers have been sent, these must be trailers, so end stream must be set.
581
158k
        assert(payload.endStream)
582
158k
        return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator)
583
386k
      } else if payload.endStream {
584
227k
        // Headers haven't been sent yet and end stream is set: this is a trailers only response
585
227k
        // so we need to send 'end' as well.
586
227k
        return self.processResponseTrailersOnly(payload.headers, promise: promise)
587
227k
      } else {
588
158k
        return self.processResponseHeaders(payload.headers, promise: promise)
589
158k
      }
590
544k
591
544k
    case .clientOpenServerClosed:
592
0
      // We've already sent end.
593
0
      return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
594
544k
595
544k
    case ._modifying:
596
0
      preconditionFailure("Left in modifying state")
597
544k
    }
598
0
  }
599
600
  private static func processResponseData(
601
    _ payload: HTTP2Frame.FramePayload.Data,
602
    promise: EventLoopPromise<Void>?,
603
    state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
604
86.0k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
605
86.0k
    if state.responseBuffer == nil {
606
67.7k
      // Not gRPC Web Text; just write the body.
607
67.7k
      return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false))
608
67.7k
    } else {
609
18.3k
      switch payload.data {
610
18.3k
      case let .byteBuffer(buffer):
611
18.3k
        // '!' is fine, we checked above.
612
18.3k
        state.responseBuffer!.append(buffer)
613
18.3k
614
18.3k
      case .fileRegion:
615
0
        preconditionFailure("Unexpected IOData.fileRegion")
616
18.3k
      }
617
18.3k
618
18.3k
      // The response is buffered, we can consider it dealt with.
619
18.3k
      return .completePromise(promise, .success(()))
620
18.3k
    }
621
86.0k
  }
622
623
  private mutating func processResponseData(
624
    _ payload: HTTP2Frame.FramePayload.Data,
625
    promise: EventLoopPromise<Void>?
626
86.0k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
627
86.0k
    switch self {
628
86.0k
    case .idle:
629
0
      preconditionFailure("Invalid state: haven't received request head")
630
86.0k
631
86.0k
    case .fullyOpen(let inbound, var outbound):
632
34.4k
      return self.withStateAvoidingCoWs { state in
633
34.4k
        let action = Self.processResponseData(payload, promise: promise, state: &outbound)
634
34.4k
        state = .fullyOpen(inbound, outbound)
635
34.4k
        return action
636
34.4k
      }
637
86.0k
638
86.0k
    case var .clientClosedServerOpen(outbound):
639
51.6k
      return self.withStateAvoidingCoWs { state in
640
51.6k
        let action = Self.processResponseData(payload, promise: promise, state: &outbound)
641
51.6k
        state = .clientClosedServerOpen(outbound)
642
51.6k
        return action
643
51.6k
      }
644
86.0k
645
86.0k
    case .clientOpenServerClosed:
646
0
      return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
647
86.0k
648
86.0k
    case ._modifying:
649
0
      preconditionFailure("Left in modifying state")
650
86.0k
    }
651
0
  }
652
}
653
654
// MARK: - Helpers
655
656
extension GRPCWebToHTTP2ServerCodec {
657
  private static func makeResponseHead(
658
    hpackHeaders: HPACKHeaders,
659
    closeConnection: Bool
660
386k
  ) -> HTTPResponseHead {
661
386k
    var headers = HTTPHeaders(hpackHeaders: hpackHeaders)
662
386k
663
386k
    if closeConnection {
664
4.07k
      headers.add(name: "connection", value: "close")
665
4.07k
    }
666
386k
667
386k
    // Grab the status, if this is missing we've messed up in another handler.
668
386k
    guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
669
0
      preconditionFailure("Invalid state: missing ':status' pseudo header")
670
386k
    }
671
386k
672
386k
    return HTTPResponseHead(
673
386k
      version: .init(major: 1, minor: 1),
674
386k
      status: .init(statusCode: statusCode),
675
386k
      headers: headers
676
386k
    )
677
386k
  }
678
679
  private static func formatTrailers(
680
    _ trailers: HPACKHeaders,
681
    allocator: ByteBufferAllocator
682
158k
  ) -> ByteBuffer {
683
158k
    // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
684
230k
    let length = trailers.reduce(0) { partial, trailer in
685
230k
      // +4 for: ":", " ", "\r", "\n"
686
230k
      return partial + trailer.name.utf8.count + trailer.value.utf8.count + 4
687
230k
    }
688
158k
    var buffer = allocator.buffer(capacity: 5 + length)
689
158k
690
158k
    // Uncompressed trailer byte.
691
158k
    buffer.writeInteger(UInt8(0x80))
692
158k
    // Length.
693
158k
    let lengthIndex = buffer.writerIndex
694
158k
    buffer.writeInteger(UInt32(0))
695
158k
696
158k
    var bytesWritten = 0
697
230k
    for (name, value, _) in trailers {
698
230k
      bytesWritten += buffer.writeString(name)
699
230k
      bytesWritten += buffer.writeString(": ")
700
230k
      bytesWritten += buffer.writeString(value)
701
230k
      bytesWritten += buffer.writeString("\r\n")
702
230k
    }
703
158k
704
158k
    buffer.setInteger(UInt32(bytesWritten), at: lengthIndex)
705
158k
    return buffer
706
158k
  }
707
708
  private static func encodeResponsesAndTrailers(
709
    _ responses: inout CircularBuffer<ByteBuffer>,
710
    trailers: HPACKHeaders,
711
    allocator: ByteBufferAllocator
712
20.0k
  ) -> ByteBuffer {
713
20.0k
    // We need to encode the trailers along with any responses we're holding.
714
20.0k
    responses.append(self.formatTrailers(trailers, allocator: allocator))
715
20.0k
716
38.4k
    let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
717
20.0k
    // '!' is fine: responses isn't empty, we just appended the trailers.
718
20.0k
    var buffer = responses.popFirst()!
719
20.0k
720
20.0k
    // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
721
20.0k
    // but this is fine for now.
722
20.0k
    var accumulatedData = buffer.readData(length: buffer.readableBytes)!
723
20.0k
    accumulatedData.reserveCapacity(capacity)
724
20.0k
    while let buffer = responses.popFirst() {
725
18.3k
      accumulatedData.append(contentsOf: buffer.readableBytesView)
726
20.0k
    }
727
20.0k
728
20.0k
    // We can reuse the popped buffer.
729
20.0k
    let base64Encoded = accumulatedData.base64EncodedString()
730
20.0k
    buffer.clear(minimumCapacity: base64Encoded.utf8.count)
731
20.0k
    buffer.writeString(base64Encoded)
732
20.0k
733
20.0k
    return buffer
734
20.0k
  }
735
}
736
737
extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState {
738
  fileprivate mutating func processInboundData(
739
    buffer: inout ByteBuffer
740
203k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
741
203k
    if self.requestBuffer == nil {
742
189k
      // We're not dealing with gRPC Web Text: just forward the buffer.
743
189k
      return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
744
189k
    }
745
14.7k
746
14.7k
    if self.requestBuffer!.readableBytes == 0 {
747
11.4k
      self.requestBuffer = buffer
748
11.4k
    } else {
749
3.22k
      self.requestBuffer!.writeBuffer(&buffer)
750
3.22k
    }
751
14.7k
752
14.7k
    let readableBytes = self.requestBuffer!.readableBytes
753
14.7k
    // The length of base64 encoded data must be a multiple of 4.
754
14.7k
    let bytesToRead = readableBytes - (readableBytes % 4)
755
14.7k
756
14.7k
    let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
757
14.7k
758
14.7k
    if bytesToRead > 0,
759
14.7k
      let base64Encoded = self.requestBuffer!.readString(length: bytesToRead),
760
14.7k
      let base64Decoded = Data(base64Encoded: base64Encoded)
761
14.7k
    {
762
9.84k
      // Recycle the input buffer and restore the request buffer.
763
9.84k
      buffer.clear()
764
9.84k
      buffer.writeContiguousBytes(base64Decoded)
765
9.84k
      action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
766
9.84k
    } else {
767
4.87k
      action = .none
768
4.87k
    }
769
14.7k
770
14.7k
    return action
771
203k
  }
772
}
773
774
extension HTTPHeaders {
775
386k
  fileprivate init(hpackHeaders headers: HPACKHeaders) {
776
386k
    self.init()
777
386k
    self.reserveCapacity(headers.count)
778
386k
779
386k
    // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
780
567k
    let regularHeaders = headers.drop { name, _, _ in
781
567k
      name.utf8.first == .some(UInt8(ascii: ":"))
782
567k
    }.lazy.map { name, value, _ in
783
227k
      (name, value)
784
227k
    }
785
386k
786
386k
    self.add(contentsOf: regularHeaders)
787
386k
  }
788
}