Coverage Report

Created: 2026-06-07 06:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP1
20
import NIOHTTP2
21
22
import struct Foundation.Data
23
24
/// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
25
internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
26
  internal typealias InboundIn = HTTPServerRequestPart
27
  internal typealias InboundOut = HTTP2Frame.FramePayload
28
29
  internal typealias OutboundIn = HTTP2Frame.FramePayload
30
  internal typealias OutboundOut = HTTPServerResponsePart
31
32
  private var stateMachine: StateMachine
33
34
  /// Create a gRPC Web to server HTTP/2 codec.
35
  ///
36
  /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
37
  ///   request headers.
38
9.91k
  init(scheme: String) {
39
9.91k
    self.stateMachine = StateMachine(scheme: scheme)
40
9.91k
  }
41
42
366k
  internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
43
366k
    let action = self.stateMachine.processInbound(
44
366k
      serverRequestPart: self.unwrapInboundIn(data),
45
366k
      allocator: context.channel.allocator
46
366k
    )
47
366k
    self.act(on: action, context: context)
48
366k
  }
49
50
  internal func write(
51
    context: ChannelHandlerContext,
52
    data: NIOAny,
53
    promise: EventLoopPromise<Void>?
54
270k
  ) {
55
270k
    let action = self.stateMachine.processOutbound(
56
270k
      framePayload: self.unwrapOutboundIn(data),
57
270k
      promise: promise,
58
270k
      allocator: context.channel.allocator
59
270k
    )
60
270k
    self.act(on: action, context: context)
61
270k
  }
62
63
  /// Acts on an action returned by the state machine.
64
636k
  private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
65
636k
    switch action {
66
636k
    case .none:
67
2.57k
      ()
68
636k
69
636k
    case let .fireChannelRead(payload):
70
363k
      context.fireChannelRead(self.wrapInboundOut(payload))
71
636k
72
636k
    case let .write(write):
73
257k
      if let additionalPart = write.additionalPart {
74
150k
        context.write(self.wrapOutboundOut(write.part), promise: nil)
75
150k
        context.write(self.wrapOutboundOut(additionalPart), promise: write.promise)
76
150k
      } else {
77
106k
        context.write(self.wrapOutboundOut(write.part), promise: write.promise)
78
106k
      }
79
257k
80
257k
      if write.closeChannel {
81
1.89k
        context.close(mode: .all, promise: nil)
82
1.89k
      }
83
636k
84
636k
    case let .completePromise(promise, result):
85
12.7k
      promise?.completeWith(result)
86
636k
    }
87
636k
  }
88
}
89
90
extension GRPCWebToHTTP2ServerCodec {
91
  internal struct StateMachine {
92
    /// The current state.
93
    private var state: State
94
    private let scheme: String
95
96
9.91k
    internal init(scheme: String) {
97
9.91k
      self.state = .idle
98
9.91k
      self.scheme = scheme
99
9.91k
    }
100
101
    /// Process the inbound `HTTPServerRequestPart`.
102
    internal mutating func processInbound(
103
      serverRequestPart: HTTPServerRequestPart,
104
      allocator: ByteBufferAllocator
105
366k
    ) -> Action {
106
366k
      return self.state.processInbound(
107
366k
        serverRequestPart: serverRequestPart,
108
366k
        scheme: self.scheme,
109
366k
        allocator: allocator
110
366k
      )
111
366k
    }
112
113
    /// Process the outbound `HTTP2Frame.FramePayload`.
114
    internal mutating func processOutbound(
115
      framePayload: HTTP2Frame.FramePayload,
116
      promise: EventLoopPromise<Void>?,
117
      allocator: ByteBufferAllocator
118
270k
    ) -> Action {
119
270k
      return self.state.processOutbound(
120
270k
        framePayload: framePayload,
121
270k
        promise: promise,
122
270k
        allocator: allocator
123
270k
      )
124
270k
    }
125
126
    /// An action to take as a result of interaction with the state machine.
127
    internal enum Action {
128
      case none
129
      case fireChannelRead(HTTP2Frame.FramePayload)
130
      case write(Write)
131
      case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
132
133
      internal struct Write {
134
        internal var part: HTTPServerResponsePart
135
        internal var additionalPart: HTTPServerResponsePart?
136
        internal var promise: EventLoopPromise<Void>?
137
        internal var closeChannel: Bool
138
139
        internal init(
140
          part: HTTPServerResponsePart,
141
          additionalPart: HTTPServerResponsePart? = nil,
142
          promise: EventLoopPromise<Void>?,
143
          closeChannel: Bool
144
257k
        ) {
145
257k
          self.part = part
146
257k
          self.additionalPart = additionalPart
147
257k
          self.promise = promise
148
257k
          self.closeChannel = closeChannel
149
257k
        }
150
      }
151
    }
152
153
    fileprivate enum State {
154
      /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when
155
      /// receiving request headers.
156
      case idle
157
158
      /// Received request headers. Waiting for the end of request and response streams.
159
      case fullyOpen(InboundState, OutboundState)
160
161
      /// The server has closed the response stream, we may receive other request parts from the client.
162
      case clientOpenServerClosed(InboundState)
163
164
      /// The client has sent everything, the server still needs to close the response stream.
165
      case clientClosedServerOpen(OutboundState)
166
167
      /// Not a real state.
168
      case _modifying
169
170
0
      private var isModifying: Bool {
171
0
        switch self {
172
0
        case ._modifying:
173
0
          return true
174
0
        case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed:
175
0
          return false
176
0
        }
177
0
      }
178
179
636k
      private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
180
636k
        self = ._modifying
181
636k
        defer {
182
636k
          assert(!self.isModifying)
183
636k
        }
184
636k
        return body(&self)
185
636k
      }
186
    }
187
188
    fileprivate struct InboundState {
189
      /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
190
      /// is being used, `nil` otherwise.
191
      var requestBuffer: ByteBuffer?
192
193
150k
      init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
194
150k
        self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil
195
150k
      }
196
    }
197
198
    fileprivate struct OutboundState {
199
      /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
200
      /// otherwise.
201
      var responseBuffer: CircularBuffer<ByteBuffer>?
202
203
      /// True if the response headers have been sent.
204
      var responseHeadersSent: Bool
205
206
      /// True if the server should close the connection when this request is done.
207
      var closeConnection: Bool
208
209
150k
      init(isTextEncoded: Bool, closeConnection: Bool) {
210
150k
        self.responseHeadersSent = false
211
150k
        self.responseBuffer = isTextEncoded ? CircularBuffer() : nil
212
150k
        self.closeConnection = closeConnection
213
150k
      }
214
    }
215
  }
216
}
217
218
extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
219
  fileprivate mutating func processInbound(
220
    serverRequestPart: HTTPServerRequestPart,
221
    scheme: String,
222
    allocator: ByteBufferAllocator
223
366k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
224
366k
    switch serverRequestPart {
225
366k
    case let .head(head):
226
150k
      return self.processRequestHead(head, scheme: scheme, allocator: allocator)
227
366k
    case var .body(buffer):
228
67.8k
      return self.processRequestBody(&buffer)
229
366k
    case .end:
230
147k
      return self.processRequestEnd(allocator: allocator)
231
366k
    }
232
366k
  }
233
234
  fileprivate mutating func processOutbound(
235
    framePayload: HTTP2Frame.FramePayload,
236
    promise: EventLoopPromise<Void>?,
237
    allocator: ByteBufferAllocator
238
270k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
239
270k
    switch framePayload {
240
270k
    case let .headers(payload):
241
219k
      return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
242
270k
243
270k
    case let .data(payload):
244
50.3k
      return self.processResponseData(payload, promise: promise)
245
270k
246
270k
    case .priority,
247
0
      .rstStream,
248
0
      .settings,
249
0
      .pushPromise,
250
0
      .ping,
251
0
      .goAway,
252
0
      .windowUpdate,
253
0
      .alternativeService,
254
0
      .origin:
255
0
      preconditionFailure("Unsupported frame payload")
256
270k
    }
257
0
  }
258
}
259
260
// MARK: - Inbound
261
262
extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
263
  private mutating func processRequestHead(
264
    _ head: HTTPRequestHead,
265
    scheme: String,
266
    allocator: ByteBufferAllocator
267
150k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
268
150k
    switch self {
269
150k
    case .idle:
270
150k
      return self.withStateAvoidingCoWs { state in
271
150k
        let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
272
150k
273
150k
        // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
274
150k
        // allocate a second headers block to use the normalization provided by NIO HTTP/2.
275
150k
        //
276
150k
        // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
277
150k
        // extra copy.
278
150k
        var headers = HPACKHeaders()
279
150k
        headers.reserveCapacity(normalized.count + 4)
280
150k
        headers.add(name: ":path", value: head.uri)
281
150k
        headers.add(name: ":method", value: head.method.rawValue)
282
150k
        headers.add(name: ":scheme", value: scheme)
283
150k
        if let host = head.headers.first(name: "host") {
284
591
          headers.add(name: ":authority", value: host)
285
591
        }
286
150k
        headers.add(contentsOf: normalized)
287
150k
288
150k
        // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
289
150k
        // that will be done at the HTTP/2 level.
290
150k
        let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
291
150k
        let isWebText = contentType == .some(.webTextProtobuf)
292
150k
293
150k
        let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
294
150k
295
150k
        state = .fullyOpen(
296
150k
          .init(isTextEncoded: isWebText, allocator: allocator),
297
150k
          .init(isTextEncoded: isWebText, closeConnection: closeConnection)
298
150k
        )
299
150k
        return .fireChannelRead(.headers(.init(headers: headers)))
300
150k
      }
301
150k
302
150k
    case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
303
0
      preconditionFailure("Invalid state: already received request head")
304
150k
305
150k
    case ._modifying:
306
0
      preconditionFailure("Left in modifying state")
307
150k
    }
308
0
  }
309
310
  private mutating func processRequestBody(
311
    _ buffer: inout ByteBuffer
312
67.8k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
313
67.8k
    switch self {
314
67.8k
    case .idle:
315
0
      preconditionFailure("Invalid state: haven't received request head")
316
67.8k
317
67.8k
    case .fullyOpen(var inbound, let outbound):
318
45.1k
      return self.withStateAvoidingCoWs { state in
319
45.1k
        let action = inbound.processInboundData(buffer: &buffer)
320
45.1k
        state = .fullyOpen(inbound, outbound)
321
45.1k
        return action
322
45.1k
      }
323
67.8k
324
67.8k
    case var .clientOpenServerClosed(inbound):
325
22.7k
      // The server is already done, but it's not our place to drop the request.
326
22.7k
      return self.withStateAvoidingCoWs { state in
327
22.7k
        let action = inbound.processInboundData(buffer: &buffer)
328
22.7k
        state = .clientOpenServerClosed(inbound)
329
22.7k
        return action
330
22.7k
      }
331
67.8k
332
67.8k
    case .clientClosedServerOpen:
333
0
      preconditionFailure("End of request stream already received")
334
67.8k
335
67.8k
    case ._modifying:
336
0
      preconditionFailure("Left in modifying state")
337
67.8k
    }
338
0
  }
339
340
  private mutating func processRequestEnd(
341
    allocator: ByteBufferAllocator
342
147k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
343
147k
    switch self {
344
147k
    case .idle:
345
0
      preconditionFailure("Invalid state: haven't received request head")
346
147k
347
147k
    case let .fullyOpen(_, outbound):
348
44.3k
      return self.withStateAvoidingCoWs { state in
349
44.3k
        // We're done with inbound state.
350
44.3k
        state = .clientClosedServerOpen(outbound)
351
44.3k
352
44.3k
        // Send an empty DATA frame with the end stream flag set.
353
44.3k
        let empty = allocator.buffer(capacity: 0)
354
44.3k
        return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
355
44.3k
      }
356
147k
357
147k
    case .clientClosedServerOpen:
358
0
      preconditionFailure("End of request stream already received")
359
147k
360
147k
    case .clientOpenServerClosed:
361
103k
      return self.withStateAvoidingCoWs { state in
362
103k
        // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
363
103k
        // it's necessary to communicate to the other peers that the response is done.
364
103k
        state = .idle
365
103k
366
103k
        // Send an empty DATA frame with the end stream flag set.
367
103k
        let empty = allocator.buffer(capacity: 0)
368
103k
        return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
369
103k
      }
370
147k
371
147k
    case ._modifying:
372
0
      preconditionFailure("Left in modifying state")
373
147k
    }
374
0
  }
375
}
376
377
// MARK: - Outbound
378
379
extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
380
  private mutating func processResponseTrailers(
381
    _ trailers: HPACKHeaders,
382
    promise: EventLoopPromise<Void>?,
383
    allocator: ByteBufferAllocator
384
69.3k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
385
69.3k
    switch self {
386
69.3k
    case .idle:
387
0
      preconditionFailure("Invalid state: haven't received request head")
388
69.3k
389
69.3k
    case .fullyOpen(let inbound, var outbound):
390
24.9k
      return self.withStateAvoidingCoWs { state in
391
24.9k
        // Double check these are trailers.
392
24.9k
        assert(outbound.responseHeadersSent)
393
24.9k
394
24.9k
        // We haven't seen the end of the request stream yet.
395
24.9k
        state = .clientOpenServerClosed(inbound)
396
24.9k
397
24.9k
        // Avoid CoW-ing the buffers.
398
24.9k
        let responseBuffers = outbound.responseBuffer
399
24.9k
        outbound.responseBuffer = nil
400
24.9k
401
24.9k
        return Self.processTrailers(
402
24.9k
          responseBuffers: responseBuffers,
403
24.9k
          trailers: trailers,
404
24.9k
          promise: promise,
405
24.9k
          allocator: allocator,
406
24.9k
          closeChannel: outbound.closeConnection
407
24.9k
        )
408
24.9k
      }
409
69.3k
410
69.3k
    case var .clientClosedServerOpen(state):
411
44.3k
      return self.withStateAvoidingCoWs { nextState in
412
44.3k
        // Client is closed and now so is the server.
413
44.3k
        nextState = .idle
414
44.3k
415
44.3k
        // Avoid CoW-ing the buffers.
416
44.3k
        let responseBuffers = state.responseBuffer
417
44.3k
        state.responseBuffer = nil
418
44.3k
419
44.3k
        return Self.processTrailers(
420
44.3k
          responseBuffers: responseBuffers,
421
44.3k
          trailers: trailers,
422
44.3k
          promise: promise,
423
44.3k
          allocator: allocator,
424
44.3k
          closeChannel: state.closeConnection
425
44.3k
        )
426
44.3k
      }
427
69.3k
428
69.3k
    case .clientOpenServerClosed:
429
0
      preconditionFailure("Already seen end of response stream")
430
69.3k
431
69.3k
    case ._modifying:
432
0
      preconditionFailure("Left in modifying state")
433
69.3k
    }
434
0
  }
435
436
  private static func processTrailers(
437
    responseBuffers: CircularBuffer<ByteBuffer>?,
438
    trailers: HPACKHeaders,
439
    promise: EventLoopPromise<Void>?,
440
    allocator: ByteBufferAllocator,
441
    closeChannel: Bool
442
69.3k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
443
69.3k
    if var responseBuffers = responseBuffers {
444
13.4k
      let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
445
13.4k
        &responseBuffers,
446
13.4k
        trailers: trailers,
447
13.4k
        allocator: allocator
448
13.4k
      )
449
13.4k
      return .write(
450
13.4k
        .init(
451
13.4k
          part: .body(.byteBuffer(buffer)),
452
13.4k
          additionalPart: .end(nil),
453
13.4k
          promise: promise,
454
13.4k
          closeChannel: closeChannel
455
13.4k
        )
456
13.4k
      )
457
55.9k
    } else {
458
55.9k
      // No response buffer; plain gRPC Web. Trailers are encoded into the body as a regular
459
55.9k
      // length-prefixed message.
460
55.9k
      let buffer = GRPCWebToHTTP2ServerCodec.formatTrailers(trailers, allocator: allocator)
461
55.9k
      return .write(
462
55.9k
        .init(
463
55.9k
          part: .body(.byteBuffer(buffer)),
464
55.9k
          additionalPart: .end(nil),
465
55.9k
          promise: promise,
466
55.9k
          closeChannel: closeChannel
467
55.9k
        )
468
55.9k
      )
469
55.9k
    }
470
69.3k
  }
471
472
  private mutating func processResponseTrailersOnly(
473
    _ trailers: HPACKHeaders,
474
    promise: EventLoopPromise<Void>?
475
81.0k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
476
81.0k
    switch self {
477
81.0k
    case .idle:
478
0
      preconditionFailure("Invalid state: haven't received request head")
479
81.0k
480
81.0k
    case let .fullyOpen(inbound, outbound):
481
81.0k
      return self.withStateAvoidingCoWs { state in
482
81.0k
        // We still haven't seen the end of the request stream.
483
81.0k
        state = .clientOpenServerClosed(inbound)
484
81.0k
485
81.0k
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
486
81.0k
          hpackHeaders: trailers,
487
81.0k
          closeConnection: outbound.closeConnection
488
81.0k
        )
489
81.0k
490
81.0k
        return .write(
491
81.0k
          .init(
492
81.0k
            part: .head(head),
493
81.0k
            additionalPart: .end(nil),
494
81.0k
            promise: promise,
495
81.0k
            closeChannel: outbound.closeConnection
496
81.0k
          )
497
81.0k
        )
498
81.0k
      }
499
81.0k
500
81.0k
    case let .clientClosedServerOpen(outbound):
501
0
      return self.withStateAvoidingCoWs { state in
502
0
        // We're done, back to idle.
503
0
        state = .idle
504
0
505
0
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
506
0
          hpackHeaders: trailers,
507
0
          closeConnection: outbound.closeConnection
508
0
        )
509
0
510
0
        return .write(
511
0
          .init(
512
0
            part: .head(head),
513
0
            additionalPart: .end(nil),
514
0
            promise: promise,
515
0
            closeChannel: outbound.closeConnection
516
0
          )
517
0
        )
518
0
      }
519
81.0k
520
81.0k
    case .clientOpenServerClosed:
521
0
      preconditionFailure("Already seen end of response stream")
522
81.0k
523
81.0k
    case ._modifying:
524
0
      preconditionFailure("Left in modifying state")
525
81.0k
    }
526
0
  }
527
528
  private mutating func processResponseHeaders(
529
    _ headers: HPACKHeaders,
530
    promise: EventLoopPromise<Void>?
531
69.3k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
532
69.3k
    switch self {
533
69.3k
    case .idle:
534
0
      preconditionFailure("Invalid state: haven't received request head")
535
69.3k
536
69.3k
    case .fullyOpen(let inbound, var outbound):
537
69.3k
      return self.withStateAvoidingCoWs { state in
538
69.3k
        outbound.responseHeadersSent = true
539
69.3k
        state = .fullyOpen(inbound, outbound)
540
69.3k
541
69.3k
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
542
69.3k
          hpackHeaders: headers,
543
69.3k
          closeConnection: outbound.closeConnection
544
69.3k
        )
545
69.3k
        return .write(.init(part: .head(head), promise: promise, closeChannel: false))
546
69.3k
      }
547
69.3k
548
69.3k
    case var .clientClosedServerOpen(outbound):
549
0
      return self.withStateAvoidingCoWs { state in
550
0
        outbound.responseHeadersSent = true
551
0
        state = .clientClosedServerOpen(outbound)
552
0
553
0
        let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
554
0
          hpackHeaders: headers,
555
0
          closeConnection: outbound.closeConnection
556
0
        )
557
0
        return .write(.init(part: .head(head), promise: promise, closeChannel: false))
558
0
      }
559
69.3k
560
69.3k
    case .clientOpenServerClosed:
561
0
      preconditionFailure("Already seen end of response stream")
562
69.3k
563
69.3k
    case ._modifying:
564
0
      preconditionFailure("Left in modifying state")
565
69.3k
    }
566
0
  }
567
568
  private mutating func processResponseHeaders(
569
    _ payload: HTTP2Frame.FramePayload.Headers,
570
    promise: EventLoopPromise<Void>?,
571
    allocator: ByteBufferAllocator
572
219k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
573
219k
    switch self {
574
219k
    case .idle:
575
0
      preconditionFailure("Invalid state: haven't received request head")
576
219k
577
219k
    case let .fullyOpen(_, outbound),
578
219k
      let .clientClosedServerOpen(outbound):
579
219k
      if outbound.responseHeadersSent {
580
69.3k
        // Headers have been sent, these must be trailers, so end stream must be set.
581
69.3k
        assert(payload.endStream)
582
69.3k
        return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator)
583
150k
      } else if payload.endStream {
584
81.0k
        // Headers haven't been sent yet and end stream is set: this is a trailers only response
585
81.0k
        // so we need to send 'end' as well.
586
81.0k
        return self.processResponseTrailersOnly(payload.headers, promise: promise)
587
81.0k
      } else {
588
69.3k
        return self.processResponseHeaders(payload.headers, promise: promise)
589
69.3k
      }
590
219k
591
219k
    case .clientOpenServerClosed:
592
0
      // We've already sent end.
593
0
      return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
594
219k
595
219k
    case ._modifying:
596
0
      preconditionFailure("Left in modifying state")
597
219k
    }
598
0
  }
599
600
  private static func processResponseData(
601
    _ payload: HTTP2Frame.FramePayload.Data,
602
    promise: EventLoopPromise<Void>?,
603
    state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
604
50.3k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
605
50.3k
    if state.responseBuffer == nil {
606
37.5k
      // Not gRPC Web Text; just write the body.
607
37.5k
      return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false))
608
37.5k
    } else {
609
12.7k
      switch payload.data {
610
12.7k
      case let .byteBuffer(buffer):
611
12.7k
        // '!' is fine, we checked above.
612
12.7k
        state.responseBuffer!.append(buffer)
613
12.7k
614
12.7k
      case .fileRegion:
615
0
        preconditionFailure("Unexpected IOData.fileRegion")
616
12.7k
      }
617
12.7k
618
12.7k
      // The response is buffered, we can consider it dealt with.
619
12.7k
      return .completePromise(promise, .success(()))
620
12.7k
    }
621
50.3k
  }
622
623
  private mutating func processResponseData(
624
    _ payload: HTTP2Frame.FramePayload.Data,
625
    promise: EventLoopPromise<Void>?
626
50.3k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
627
50.3k
    switch self {
628
50.3k
    case .idle:
629
0
      preconditionFailure("Invalid state: haven't received request head")
630
50.3k
631
50.3k
    case .fullyOpen(let inbound, var outbound):
632
12.5k
      return self.withStateAvoidingCoWs { state in
633
12.5k
        let action = Self.processResponseData(payload, promise: promise, state: &outbound)
634
12.5k
        state = .fullyOpen(inbound, outbound)
635
12.5k
        return action
636
12.5k
      }
637
50.3k
638
50.3k
    case var .clientClosedServerOpen(outbound):
639
37.7k
      return self.withStateAvoidingCoWs { state in
640
37.7k
        let action = Self.processResponseData(payload, promise: promise, state: &outbound)
641
37.7k
        state = .clientClosedServerOpen(outbound)
642
37.7k
        return action
643
37.7k
      }
644
50.3k
645
50.3k
    case .clientOpenServerClosed:
646
0
      return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
647
50.3k
648
50.3k
    case ._modifying:
649
0
      preconditionFailure("Left in modifying state")
650
50.3k
    }
651
0
  }
652
}
653
654
// MARK: - Helpers
655
656
extension GRPCWebToHTTP2ServerCodec {
657
  private static func makeResponseHead(
658
    hpackHeaders: HPACKHeaders,
659
    closeConnection: Bool
660
150k
  ) -> HTTPResponseHead {
661
150k
    var headers = HTTPHeaders(hpackHeaders: hpackHeaders)
662
150k
663
150k
    if closeConnection {
664
1.89k
      headers.add(name: "connection", value: "close")
665
1.89k
    }
666
150k
667
150k
    // Grab the status, if this is missing we've messed up in another handler.
668
150k
    guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
669
0
      preconditionFailure("Invalid state: missing ':status' pseudo header")
670
150k
    }
671
150k
672
150k
    return HTTPResponseHead(
673
150k
      version: .init(major: 1, minor: 1),
674
150k
      status: .init(statusCode: statusCode),
675
150k
      headers: headers
676
150k
    )
677
150k
  }
678
679
  private static func formatTrailers(
680
    _ trailers: HPACKHeaders,
681
    allocator: ByteBufferAllocator
682
69.3k
  ) -> ByteBuffer {
683
69.3k
    // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
684
88.6k
    let length = trailers.reduce(0) { partial, trailer in
685
88.6k
      // +4 for: ":", " ", "\r", "\n"
686
88.6k
      return partial + trailer.name.utf8.count + trailer.value.utf8.count + 4
687
88.6k
    }
688
69.3k
    var buffer = allocator.buffer(capacity: 5 + length)
689
69.3k
690
69.3k
    // Uncompressed trailer byte.
691
69.3k
    buffer.writeInteger(UInt8(0x80))
692
69.3k
    // Length.
693
69.3k
    let lengthIndex = buffer.writerIndex
694
69.3k
    buffer.writeInteger(UInt32(0))
695
69.3k
696
69.3k
    var bytesWritten = 0
697
88.6k
    for (name, value, _) in trailers {
698
88.6k
      bytesWritten += buffer.writeString(name)
699
88.6k
      bytesWritten += buffer.writeString(": ")
700
88.6k
      bytesWritten += buffer.writeString(value)
701
88.6k
      bytesWritten += buffer.writeString("\r\n")
702
88.6k
    }
703
69.3k
704
69.3k
    buffer.setInteger(UInt32(bytesWritten), at: lengthIndex)
705
69.3k
    return buffer
706
69.3k
  }
707
708
  private static func encodeResponsesAndTrailers(
709
    _ responses: inout CircularBuffer<ByteBuffer>,
710
    trailers: HPACKHeaders,
711
    allocator: ByteBufferAllocator
712
13.4k
  ) -> ByteBuffer {
713
13.4k
    // We need to encode the trailers along with any responses we're holding.
714
13.4k
    responses.append(self.formatTrailers(trailers, allocator: allocator))
715
13.4k
716
26.1k
    let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
717
13.4k
    // '!' is fine: responses isn't empty, we just appended the trailers.
718
13.4k
    var buffer = responses.popFirst()!
719
13.4k
720
13.4k
    // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
721
13.4k
    // but this is fine for now.
722
13.4k
    var accumulatedData = buffer.readData(length: buffer.readableBytes)!
723
13.4k
    accumulatedData.reserveCapacity(capacity)
724
13.4k
    while let buffer = responses.popFirst() {
725
12.7k
      accumulatedData.append(contentsOf: buffer.readableBytesView)
726
13.4k
    }
727
13.4k
728
13.4k
    // We can reuse the popped buffer.
729
13.4k
    let base64Encoded = accumulatedData.base64EncodedString()
730
13.4k
    buffer.clear(minimumCapacity: base64Encoded.utf8.count)
731
13.4k
    buffer.writeString(base64Encoded)
732
13.4k
733
13.4k
    return buffer
734
13.4k
  }
735
}
736
737
extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState {
738
  fileprivate mutating func processInboundData(
739
    buffer: inout ByteBuffer
740
67.8k
  ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
741
67.8k
    if self.requestBuffer == nil {
742
60.3k
      // We're not dealing with gRPC Web Text: just forward the buffer.
743
60.3k
      return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
744
60.3k
    }
745
7.48k
746
7.48k
    if self.requestBuffer!.readableBytes == 0 {
747
5.74k
      self.requestBuffer = buffer
748
5.74k
    } else {
749
1.74k
      self.requestBuffer!.writeBuffer(&buffer)
750
1.74k
    }
751
7.48k
752
7.48k
    let readableBytes = self.requestBuffer!.readableBytes
753
7.48k
    // The length of base64 encoded data must be a multiple of 4.
754
7.48k
    let bytesToRead = readableBytes - (readableBytes % 4)
755
7.48k
756
7.48k
    let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
757
7.48k
758
7.48k
    if bytesToRead > 0,
759
7.48k
      let base64Encoded = self.requestBuffer!.readString(length: bytesToRead),
760
7.48k
      let base64Decoded = Data(base64Encoded: base64Encoded)
761
7.48k
    {
762
4.91k
      // Recycle the input buffer and restore the request buffer.
763
4.91k
      buffer.clear()
764
4.91k
      buffer.writeContiguousBytes(base64Decoded)
765
4.91k
      action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
766
4.91k
    } else {
767
2.57k
      action = .none
768
2.57k
    }
769
7.48k
770
7.48k
    return action
771
67.8k
  }
772
}
773
774
extension HTTPHeaders {
775
150k
  fileprivate init(hpackHeaders headers: HPACKHeaders) {
776
150k
    self.init()
777
150k
    self.reserveCapacity(headers.count)
778
150k
779
150k
    // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
780
230k
    let regularHeaders = headers.drop { name, _, _ in
781
230k
      name.utf8.first == .some(UInt8(ascii: ":"))
782
230k
    }.lazy.map { name, value, _ in
783
100k
      (name, value)
784
100k
    }
785
150k
786
150k
    self.add(contentsOf: regularHeaders)
787
150k
  }
788
}