Coverage Report

Created: 2026-02-11 07:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP2
20
21
struct HTTP2ToRawGRPCStateMachine {
22
  /// The current state.
23
45.8k
  private var state: State = .requestIdleResponseIdle
24
}
25
26
extension HTTP2ToRawGRPCStateMachine {
27
  enum State {
28
    // Both peers are idle. Nothing has happened to the stream.
29
    case requestIdleResponseIdle
30
31
    // Received valid headers. Nothing has been sent in response.
32
    case requestOpenResponseIdle(RequestOpenResponseIdleState)
33
34
    // Received valid headers and request(s). Response headers have been sent.
35
    case requestOpenResponseOpen(RequestOpenResponseOpenState)
36
37
    // Received valid headers and request(s) but not end of the request stream. Response stream has
38
    // been closed.
39
    case requestOpenResponseClosed
40
41
    // The request stream is closed. Nothing has been sent in response.
42
    case requestClosedResponseIdle(RequestClosedResponseIdleState)
43
44
    // The request stream is closed. Response headers have been sent.
45
    case requestClosedResponseOpen(RequestClosedResponseOpenState)
46
47
    // Both streams are closed. This state is terminal.
48
    case requestClosedResponseClosed
49
  }
50
51
  struct RequestOpenResponseIdleState {
52
    /// A length prefixed message reader for request messages.
53
    var reader: LengthPrefixedMessageReader
54
55
    /// A length prefixed message writer for response messages.
56
    var writer: CoalescingLengthPrefixedMessageWriter
57
58
    /// The content type of the RPC.
59
    var contentType: ContentType
60
61
    /// An accept encoding header to send in the response headers indicating the message encoding
62
    /// that the server supports.
63
    var acceptEncoding: String?
64
65
    /// A message encoding header to send in the response headers indicating the encoding which will
66
    /// be used for responses.
67
    var responseEncoding: String?
68
69
    /// Whether to normalize user-provided metadata.
70
    var normalizeHeaders: Bool
71
72
    /// The pipeline configuration state.
73
    var configurationState: ConfigurationState
74
  }
75
76
  struct RequestClosedResponseIdleState {
77
    /// A length prefixed message reader for request messages.
78
    var reader: LengthPrefixedMessageReader
79
80
    /// A length prefixed message writer for response messages.
81
    var writer: CoalescingLengthPrefixedMessageWriter
82
83
    /// The content type of the RPC.
84
    var contentType: ContentType
85
86
    /// An accept encoding header to send in the response headers indicating the message encoding
87
    /// that the server supports.
88
    var acceptEncoding: String?
89
90
    /// A message encoding header to send in the response headers indicating the encoding which will
91
    /// be used for responses.
92
    var responseEncoding: String?
93
94
    /// Whether to normalize user-provided metadata.
95
    var normalizeHeaders: Bool
96
97
    /// The pipeline configuration state.
98
    var configurationState: ConfigurationState
99
100
0
    init(from state: RequestOpenResponseIdleState) {
101
0
      self.reader = state.reader
102
0
      self.writer = state.writer
103
0
      self.contentType = state.contentType
104
0
      self.acceptEncoding = state.acceptEncoding
105
0
      self.responseEncoding = state.responseEncoding
106
0
      self.normalizeHeaders = state.normalizeHeaders
107
0
      self.configurationState = state.configurationState
108
0
    }
109
  }
110
111
  struct RequestOpenResponseOpenState {
112
    /// A length prefixed message reader for request messages.
113
    var reader: LengthPrefixedMessageReader
114
115
    /// A length prefixed message writer for response messages.
116
    var writer: CoalescingLengthPrefixedMessageWriter
117
118
    /// Whether to normalize user-provided metadata.
119
    var normalizeHeaders: Bool
120
121
113k
    init(from state: RequestOpenResponseIdleState) {
122
113k
      self.reader = state.reader
123
113k
      self.writer = state.writer
124
113k
      self.normalizeHeaders = state.normalizeHeaders
125
113k
    }
126
  }
127
128
  struct RequestClosedResponseOpenState {
129
    /// A length prefixed message reader for request messages.
130
    var reader: LengthPrefixedMessageReader
131
132
    /// A length prefixed message writer for response messages.
133
    var writer: CoalescingLengthPrefixedMessageWriter
134
135
    /// Whether to normalize user-provided metadata.
136
    var normalizeHeaders: Bool
137
138
53.7k
    init(from state: RequestOpenResponseOpenState) {
139
53.7k
      self.reader = state.reader
140
53.7k
      self.writer = state.writer
141
53.7k
      self.normalizeHeaders = state.normalizeHeaders
142
53.7k
    }
143
144
0
    init(from state: RequestClosedResponseIdleState) {
145
0
      self.reader = state.reader
146
0
      self.writer = state.writer
147
0
      self.normalizeHeaders = state.normalizeHeaders
148
0
    }
149
  }
150
151
  /// The pipeline configuration state.
152
  enum ConfigurationState {
153
    /// The pipeline is being configured. Any message data will be buffered into an appropriate
154
    /// message reader.
155
    case configuring(HPACKHeaders)
156
157
    /// The pipeline is configured.
158
    case configured
159
160
    /// Returns true if the configuration is in the `.configured` state.
161
0
    var isConfigured: Bool {
162
0
      switch self {
163
0
      case .configuring:
164
0
        return false
165
0
      case .configured:
166
0
        return true
167
0
      }
168
0
    }
169
170
    /// Configuration has completed.
171
113k
    mutating func configured() -> HPACKHeaders {
172
113k
      switch self {
173
113k
      case .configured:
174
0
        preconditionFailure("Invalid state: already configured")
175
113k
176
113k
      case let .configuring(headers):
177
113k
        self = .configured
178
113k
        return headers
179
113k
      }
180
0
    }
181
  }
182
}
183
184
extension HTTP2ToRawGRPCStateMachine {
185
  enum PipelineConfiguredAction {
186
    /// Forward the given headers.
187
    case forwardHeaders(HPACKHeaders)
188
    /// Forward the given headers and try reading the next message.
189
    case forwardHeadersAndRead(HPACKHeaders)
190
  }
191
192
  enum ReceiveHeadersAction {
193
    /// Configure the RPC to use the given server handler.
194
    case configure(GRPCServerHandlerProtocol)
195
    /// Reject the RPC by writing out the given headers and setting end-stream.
196
    case rejectRPC(HPACKHeaders)
197
  }
198
199
  enum ReadNextMessageAction {
200
    /// Do nothing.
201
    case none
202
    /// Forward the buffer.
203
    case forwardMessage(ByteBuffer)
204
    /// Forward the buffer and try reading the next message.
205
    case forwardMessageThenReadNextMessage(ByteBuffer)
206
    /// Forward the 'end' of stream request part.
207
    case forwardEnd
208
    /// Throw an error down the pipeline.
209
    case errorCaught(Error)
210
  }
211
212
  struct StateAndReceiveHeadersAction {
213
    /// The next state.
214
    var state: State
215
    /// The action to take.
216
    var action: ReceiveHeadersAction
217
  }
218
219
  struct StateAndReceiveDataAction {
220
    /// The next state.
221
    var state: State
222
    /// The action to take
223
    var action: ReceiveDataAction
224
  }
225
226
  enum ReceiveDataAction: Hashable {
227
    /// Try to read the next message from the state machine.
228
    case tryReading
229
    /// Invoke 'finish' on the RPC handler.
230
    case finishHandler
231
    /// Do nothing.
232
    case nothing
233
  }
234
235
  enum SendEndAction {
236
    /// Send trailers to the client.
237
    case sendTrailers(HPACKHeaders)
238
    /// Send trailers to the client and invoke 'finish' on the RPC handler.
239
    case sendTrailersAndFinish(HPACKHeaders)
240
    /// Fail any promise associated with this send.
241
    case failure(Error)
242
  }
243
}
244
245
// MARK: Receive Headers
246
247
// This is the only state in which we can receive headers.
248
extension HTTP2ToRawGRPCStateMachine.State {
249
  private func _receive(
250
    headers: HPACKHeaders,
251
    eventLoop: EventLoop,
252
    errorDelegate: ServerErrorDelegate?,
253
    remoteAddress: SocketAddress?,
254
    logger: Logger,
255
    allocator: ByteBufferAllocator,
256
    responseWriter: GRPCServerResponseWriter,
257
    closeFuture: EventLoopFuture<Void>,
258
    services: [Substring: CallHandlerProvider],
259
    encoding: ServerMessageEncoding,
260
    normalizeHeaders: Bool
261
413k
  ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
262
413k
    // Extract and validate the content type. If it's nil we need to close.
263
413k
    guard let contentType = self.extractContentType(from: headers) else {
264
231k
      return self.unsupportedContentType()
265
231k
    }
266
181k
267
181k
    // Now extract the request message encoding and setup an appropriate message reader.
268
181k
    // We may send back a list of acceptable request message encodings as well.
269
181k
    let reader: LengthPrefixedMessageReader
270
181k
    let acceptableRequestEncoding: String?
271
181k
272
181k
    switch self.extractRequestEncoding(from: headers, encoding: encoding) {
273
181k
    case let .valid(messageReader, acceptEncodingHeader):
274
177k
      reader = messageReader
275
177k
      acceptableRequestEncoding = acceptEncodingHeader
276
181k
277
181k
    case let .invalid(status, acceptableRequestEncoding):
278
3.62k
      return self.invalidRequestEncoding(
279
3.62k
        status: status,
280
3.62k
        acceptableRequestEncoding: acceptableRequestEncoding,
281
3.62k
        contentType: contentType
282
3.62k
      )
283
181k
    }
284
177k
285
177k
    // Figure out which encoding we should use for responses.
286
177k
    let (writer, responseEncoding) = self.extractResponseEncoding(
287
177k
      from: headers,
288
177k
      encoding: encoding,
289
177k
      allocator: allocator
290
177k
    )
291
177k
292
177k
    // Parse the path, and create a call handler.
293
177k
    guard let path = headers.first(name: ":path") else {
294
0
      return self.methodNotImplemented("", contentType: contentType)
295
177k
    }
296
177k
297
177k
    guard let callPath = CallPath(requestURI: path),
298
177k
      let service = services[Substring(callPath.service)]
299
177k
    else {
300
17.4k
      return self.methodNotImplemented(path, contentType: contentType)
301
160k
    }
302
160k
303
160k
    // Create a call handler context, i.e. a bunch of 'stuff' we need to create the handler with,
304
160k
    // some of which is exposed to service providers.
305
160k
    let context = CallHandlerContext(
306
160k
      errorDelegate: errorDelegate,
307
160k
      logger: logger,
308
160k
      encoding: encoding,
309
160k
      eventLoop: eventLoop,
310
160k
      path: path,
311
160k
      remoteAddress: remoteAddress,
312
160k
      responseWriter: responseWriter,
313
160k
      allocator: allocator,
314
160k
      closeFuture: closeFuture
315
160k
    )
316
160k
317
160k
    // We have a matching service, hopefully we have a provider for the method too.
318
160k
    let method = Substring(callPath.method)
319
160k
320
160k
    if let handler = service.handle(method: method, context: context) {
321
158k
      let nextState = HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState(
322
158k
        reader: reader,
323
158k
        writer: writer,
324
158k
        contentType: contentType,
325
158k
        acceptEncoding: acceptableRequestEncoding,
326
158k
        responseEncoding: responseEncoding,
327
158k
        normalizeHeaders: normalizeHeaders,
328
158k
        configurationState: .configuring(headers)
329
158k
      )
330
158k
331
158k
      return .init(
332
158k
        state: .requestOpenResponseIdle(nextState),
333
158k
        action: .configure(handler)
334
158k
      )
335
158k
    } else {
336
1.95k
      return self.methodNotImplemented(path, contentType: contentType)
337
1.95k
    }
338
160k
  }
339
340
  /// The 'content-type' is not supported; close with status code 415.
341
231k
  private func unsupportedContentType() -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
342
231k
    // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
343
231k
    //
344
231k
    //   If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
345
231k
    //   with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
346
231k
    //   clients from interpreting a gRPC error response, which uses status 200 (OK), as
347
231k
    //   successful.
348
231k
    let trailers = HPACKHeaders([(":status", "415")])
349
231k
    return .init(
350
231k
      state: .requestClosedResponseClosed,
351
231k
      action: .rejectRPC(trailers)
352
231k
    )
353
231k
  }
354
355
  /// The RPC method is not implemented. Close with an appropriate status.
356
  private func methodNotImplemented(
357
    _ path: String,
358
    contentType: ContentType
359
19.4k
  ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
360
19.4k
    let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
361
19.4k
      for: GRPCStatus(code: .unimplemented, message: "'\(path)' is not implemented"),
362
19.4k
      contentType: contentType,
363
19.4k
      acceptableRequestEncoding: nil,
364
19.4k
      userProvidedHeaders: nil,
365
19.4k
      normalizeUserProvidedHeaders: false
366
19.4k
    )
367
19.4k
368
19.4k
    return .init(
369
19.4k
      state: .requestClosedResponseClosed,
370
19.4k
      action: .rejectRPC(trailers)
371
19.4k
    )
372
19.4k
  }
373
374
  /// The request encoding specified by the client is not supported. Close with an appropriate
375
  /// status.
376
  private func invalidRequestEncoding(
377
    status: GRPCStatus,
378
    acceptableRequestEncoding: String?,
379
    contentType: ContentType
380
3.62k
  ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveHeadersAction {
381
3.62k
    let trailers = HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
382
3.62k
      for: status,
383
3.62k
      contentType: contentType,
384
3.62k
      acceptableRequestEncoding: acceptableRequestEncoding,
385
3.62k
      userProvidedHeaders: nil,
386
3.62k
      normalizeUserProvidedHeaders: false
387
3.62k
    )
388
3.62k
389
3.62k
    return .init(
390
3.62k
      state: .requestClosedResponseClosed,
391
3.62k
      action: .rejectRPC(trailers)
392
3.62k
    )
393
3.62k
  }
394
395
  /// Makes a 'GRPCStatus' and response trailers suitable for returning to the client when the
396
  /// request message encoding is not supported.
397
  ///
398
  /// - Parameters:
399
  ///   - encoding: The unsupported request message encoding sent by the client.
400
  ///   - acceptable: The list if acceptable request message encoding the client may use.
401
  /// - Returns: The status and trailers to return to the client.
402
  private func makeStatusAndTrailersForUnsupportedEncoding(
403
    _ encoding: String,
404
    advertisedEncoding: [String]
405
1.56k
  ) -> (GRPCStatus, acceptEncoding: String?) {
406
1.56k
    let status: GRPCStatus
407
1.56k
    let acceptEncoding: String?
408
1.56k
409
1.56k
    if advertisedEncoding.isEmpty {
410
1.56k
      // No compression is supported; there's nothing to tell the client about.
411
1.56k
      status = GRPCStatus(code: .unimplemented, message: "compression is not supported")
412
1.56k
      acceptEncoding = nil
413
1.56k
    } else {
414
0
      // Return a list of supported encodings which we advertise. (The list we advertise may be a
415
0
      // subset of the encodings we support.)
416
0
      acceptEncoding = advertisedEncoding.joined(separator: ",")
417
0
      status = GRPCStatus(
418
0
        code: .unimplemented,
419
0
        message: "\(encoding) compression is not supported, supported algorithms are "
420
0
          + "listed in '\(GRPCHeaderName.acceptEncoding)'"
421
0
      )
422
0
    }
423
1.56k
424
1.56k
    return (status, acceptEncoding)
425
1.56k
  }
426
427
  /// Extract and validate the 'content-type' sent by the client.
428
  /// - Parameter headers: The headers to extract the 'content-type' from
429
413k
  private func extractContentType(from headers: HPACKHeaders) -> ContentType? {
430
413k
    return headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
431
413k
  }
432
433
  /// The result of validating the request encoding header.
434
  private enum RequestEncodingValidation {
435
    /// The encoding was valid.
436
    case valid(messageReader: LengthPrefixedMessageReader, acceptEncoding: String?)
437
    /// The encoding was invalid, the RPC should be terminated with this status.
438
    case invalid(status: GRPCStatus, acceptEncoding: String?)
439
  }
440
441
  /// Extract and validate the request message encoding header.
442
  /// - Parameters:
443
  ///   - headers: The headers to extract the message encoding header from.
444
  /// - Returns: `RequestEncodingValidation`, either a message reader suitable for decoding requests
445
  ///   and an accept encoding response header if the request encoding was valid, or a pair of
446
  ///     `GRPCStatus` and trailers to close the RPC with.
447
  private func extractRequestEncoding(
448
    from headers: HPACKHeaders,
449
    encoding: ServerMessageEncoding
450
181k
  ) -> RequestEncodingValidation {
451
181k
    let encodingValues = headers.values(forHeader: GRPCHeaderName.encoding, canonicalForm: true)
452
181k
    var encodingIterator = encodingValues.makeIterator()
453
181k
    let encodingHeader = encodingIterator.next()
454
181k
455
181k
    // Fail if there's more than one encoding header.
456
181k
    if let first = encodingHeader, let second = encodingIterator.next() {
457
2.05k
      var encodings: [Substring] = []
458
2.05k
      encodings.reserveCapacity(8)
459
2.05k
      encodings.append(first)
460
2.05k
      encodings.append(second)
461
743k
      while let next = encodingIterator.next() {
462
743k
        encodings.append(next)
463
743k
      }
464
2.05k
      let status = GRPCStatus(
465
2.05k
        code: .invalidArgument,
466
2.05k
        message:
467
2.05k
          "'\(GRPCHeaderName.encoding)' must contain no more than one value but was '\(encodings.joined(separator: ", "))'"
468
2.05k
      )
469
2.05k
      return .invalid(status: status, acceptEncoding: nil)
470
179k
    }
471
179k
472
179k
    let result: RequestEncodingValidation
473
179k
    let validator = MessageEncodingHeaderValidator(encoding: encoding)
474
179k
475
179k
    switch validator.validate(requestEncoding: encodingHeader.map { String($0) }) {
476
179k
    case let .supported(algorithm, decompressionLimit, acceptEncoding):
477
0
      // Request message encoding is valid and supported.
478
0
      result = .valid(
479
0
        messageReader: LengthPrefixedMessageReader(
480
0
          compression: algorithm,
481
0
          decompressionLimit: decompressionLimit
482
0
        ),
483
0
        acceptEncoding: acceptEncoding.isEmpty ? nil : acceptEncoding.joined(separator: ",")
484
0
      )
485
179k
486
179k
    case .noCompression:
487
177k
      // No message encoding header was present. This means no compression is being used.
488
177k
      result = .valid(
489
177k
        messageReader: LengthPrefixedMessageReader(),
490
177k
        acceptEncoding: nil
491
177k
      )
492
179k
493
179k
    case let .unsupported(encoding, acceptable):
494
1.56k
      // Request encoding is not supported.
495
1.56k
      let (status, acceptEncoding) = self.makeStatusAndTrailersForUnsupportedEncoding(
496
1.56k
        encoding,
497
1.56k
        advertisedEncoding: acceptable
498
1.56k
      )
499
1.56k
      result = .invalid(status: status, acceptEncoding: acceptEncoding)
500
179k
    }
501
179k
502
179k
    return result
503
181k
  }
504
505
  /// Extract a suitable message encoding for responses.
506
  /// - Parameters:
507
  ///   - headers: The headers to extract the acceptable response message encoding from.
508
  ///   - configuration: The encoding configuration for the server.
509
  /// - Returns: A message writer and the response encoding header to send back to the client.
510
  private func extractResponseEncoding(
511
    from headers: HPACKHeaders,
512
    encoding: ServerMessageEncoding,
513
    allocator: ByteBufferAllocator
514
177k
  ) -> (CoalescingLengthPrefixedMessageWriter, String?) {
515
177k
    let writer: CoalescingLengthPrefixedMessageWriter
516
177k
    let responseEncoding: String?
517
177k
518
177k
    switch encoding {
519
177k
    case let .enabled(configuration):
520
0
      // Extract the encodings acceptable to the client for response messages.
521
0
      let acceptableResponseEncoding = headers[canonicalForm: GRPCHeaderName.acceptEncoding]
522
0
523
0
      // Select the first algorithm that we support and have enabled. If we don't find one then we
524
0
      // won't compress response messages.
525
0
      let algorithm = acceptableResponseEncoding.lazy.compactMap { value in
526
0
        CompressionAlgorithm(rawValue: value)
527
0
      }.first {
528
0
        configuration.enabledAlgorithms.contains($0)
529
0
      }
530
0
531
0
      writer = .init(compression: algorithm, allocator: allocator)
532
0
      responseEncoding = algorithm?.name
533
177k
534
177k
    case .disabled:
535
177k
      // The server doesn't have compression enabled.
536
177k
      writer = .init(compression: .none, allocator: allocator)
537
177k
      responseEncoding = nil
538
177k
    }
539
177k
540
177k
    return (writer, responseEncoding)
541
177k
  }
542
}
543
544
// MARK: - Receive Data
545
546
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
547
  mutating func receive(
548
    buffer: inout ByteBuffer,
549
    endStream: Bool
550
0
  ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
551
0
    // Append the bytes to the reader.
552
0
    self.reader.append(buffer: &buffer)
553
0
554
0
    let state: HTTP2ToRawGRPCStateMachine.State
555
0
    let action: HTTP2ToRawGRPCStateMachine.ReceiveDataAction
556
0
557
0
    switch (self.configurationState.isConfigured, endStream) {
558
0
    case (true, true):
559
0
      /// Configured and end stream: read from the buffer, end will be sent as a result of draining
560
0
      /// the reader in the next state.
561
0
      state = .requestClosedResponseIdle(.init(from: self))
562
0
      action = .tryReading
563
0
564
0
    case (true, false):
565
0
      /// Configured but not end stream, just read from the buffer.
566
0
      state = .requestOpenResponseIdle(self)
567
0
      action = .tryReading
568
0
569
0
    case (false, true):
570
0
      // Not configured yet, but end of stream. Request stream is now closed but there's no point
571
0
      // reading yet.
572
0
      state = .requestClosedResponseIdle(.init(from: self))
573
0
      action = .nothing
574
0
575
0
    case (false, false):
576
0
      // Not configured yet, not end stream. No point reading a message yet since we don't have
577
0
      // anywhere to deliver it.
578
0
      state = .requestOpenResponseIdle(self)
579
0
      action = .nothing
580
0
    }
581
0
582
0
    return .init(state: state, action: action)
583
0
  }
584
}
585
586
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
587
  mutating func receive(
588
    buffer: inout ByteBuffer,
589
    endStream: Bool
590
139k
  ) -> HTTP2ToRawGRPCStateMachine.StateAndReceiveDataAction {
591
139k
    self.reader.append(buffer: &buffer)
592
139k
593
139k
    let state: HTTP2ToRawGRPCStateMachine.State
594
139k
595
139k
    if endStream {
596
53.7k
      // End stream, so move to the closed state. Any end of request stream events events will
597
53.7k
      // happen as a result of reading from the closed state.
598
53.7k
      state = .requestClosedResponseOpen(.init(from: self))
599
85.9k
    } else {
600
85.9k
      state = .requestOpenResponseOpen(self)
601
85.9k
    }
602
139k
603
139k
    return .init(state: state, action: .tryReading)
604
139k
  }
605
}
606
607
// MARK: - Send Headers
608
609
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
610
113k
  func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
611
113k
    return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
612
113k
      contentType: self.contentType,
613
113k
      responseEncoding: self.responseEncoding,
614
113k
      acceptableRequestEncoding: self.acceptEncoding,
615
113k
      userProvidedHeaders: userProvidedHeaders,
616
113k
      normalizeUserProvidedHeaders: self.normalizeHeaders
617
113k
    )
618
113k
  }
619
}
620
621
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
622
0
  func send(headers userProvidedHeaders: HPACKHeaders) -> HPACKHeaders {
623
0
    return HTTP2ToRawGRPCStateMachine.makeResponseHeaders(
624
0
      contentType: self.contentType,
625
0
      responseEncoding: self.responseEncoding,
626
0
      acceptableRequestEncoding: self.acceptEncoding,
627
0
      userProvidedHeaders: userProvidedHeaders,
628
0
      normalizeUserProvidedHeaders: self.normalizeHeaders
629
0
    )
630
0
  }
631
}
632
633
// MARK: - Send Data
634
635
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
636
  mutating func send(
637
    buffer: ByteBuffer,
638
    compress: Bool,
639
    promise: EventLoopPromise<Void>?
640
1.22M
  ) {
641
1.22M
    self.writer.append(buffer: buffer, compress: compress, promise: promise)
642
1.22M
  }
643
}
644
645
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
646
  mutating func send(
647
    buffer: ByteBuffer,
648
    compress: Bool,
649
    promise: EventLoopPromise<Void>?
650
36.6k
  ) {
651
36.6k
    self.writer.append(buffer: buffer, compress: compress, promise: promise)
652
36.6k
  }
653
}
654
655
// MARK: - Send End
656
657
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
658
  func send(
659
    status: GRPCStatus,
660
    trailers userProvidedTrailers: HPACKHeaders
661
0
  ) -> HPACKHeaders {
662
0
    return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
663
0
      for: status,
664
0
      contentType: self.contentType,
665
0
      acceptableRequestEncoding: self.acceptEncoding,
666
0
      userProvidedHeaders: userProvidedTrailers,
667
0
      normalizeUserProvidedHeaders: self.normalizeHeaders
668
0
    )
669
0
  }
670
}
671
672
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
673
  func send(
674
    status: GRPCStatus,
675
    trailers userProvidedTrailers: HPACKHeaders
676
0
  ) -> HPACKHeaders {
677
0
    return HTTP2ToRawGRPCStateMachine.makeResponseTrailersOnly(
678
0
      for: status,
679
0
      contentType: self.contentType,
680
0
      acceptableRequestEncoding: self.acceptEncoding,
681
0
      userProvidedHeaders: userProvidedTrailers,
682
0
      normalizeUserProvidedHeaders: self.normalizeHeaders
683
0
    )
684
0
  }
685
}
686
687
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
688
  func send(
689
    status: GRPCStatus,
690
    trailers userProvidedTrailers: HPACKHeaders
691
53.7k
  ) -> HPACKHeaders {
692
53.7k
    return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
693
53.7k
      for: status,
694
53.7k
      userProvidedHeaders: userProvidedTrailers,
695
53.7k
      normalizeUserProvidedHeaders: true
696
53.7k
    )
697
53.7k
  }
698
}
699
700
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
701
  func send(
702
    status: GRPCStatus,
703
    trailers userProvidedTrailers: HPACKHeaders
704
59.7k
  ) -> HPACKHeaders {
705
59.7k
    return HTTP2ToRawGRPCStateMachine.makeResponseTrailers(
706
59.7k
      for: status,
707
59.7k
      userProvidedHeaders: userProvidedTrailers,
708
59.7k
      normalizeUserProvidedHeaders: true
709
59.7k
    )
710
59.7k
  }
711
}
712
713
// MARK: - Pipeline Configured
714
715
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
716
113k
  mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
717
113k
    let headers = self.configurationState.configured()
718
113k
    let action: HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction
719
113k
720
113k
    // If there are unprocessed bytes then we need to read messages as well.
721
113k
    let hasUnprocessedBytes = self.reader.unprocessedBytes != 0
722
113k
723
113k
    if hasUnprocessedBytes {
724
0
      // If there are unprocessed bytes, we need to try to read after sending the metadata.
725
0
      action = .forwardHeadersAndRead(headers)
726
113k
    } else {
727
113k
      // No unprocessed bytes; the reader is empty. Just send the metadata.
728
113k
      action = .forwardHeaders(headers)
729
113k
    }
730
113k
731
113k
    return action
732
113k
  }
733
}
734
735
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
736
0
  mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
737
0
    let headers = self.configurationState.configured()
738
0
    // Since we're already closed, we need to forward the headers and start reading.
739
0
    return .forwardHeadersAndRead(headers)
740
0
  }
741
}
742
743
// MARK: - Read Next Request
744
745
extension HTTP2ToRawGRPCStateMachine {
746
  static func read(
747
    from reader: inout LengthPrefixedMessageReader,
748
    requestStreamClosed: Bool,
749
    maxLength: Int
750
1.31M
  ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
751
1.31M
    do {
752
1.31M
      if let buffer = try reader.nextMessage(maxLength: maxLength) {
753
1.24M
        if reader.unprocessedBytes > 0 || requestStreamClosed {
754
1.21M
          // Either there are unprocessed bytes or the request stream is now closed: deliver the
755
1.21M
          // message and then try to read. The subsequent read may be another message or it may
756
1.21M
          // be end stream.
757
1.21M
          return .forwardMessageThenReadNextMessage(buffer)
758
1.21M
        } else {
759
24.7k
          // Nothing left to process and the stream isn't closed yet, just forward the message.
760
24.7k
          return .forwardMessage(buffer)
761
24.7k
        }
762
1.24M
      } else if requestStreamClosed {
763
53.7k
        return .forwardEnd
764
53.7k
      } else {
765
19.5k
        return .none
766
19.5k
      }
767
1.31M
    } catch {
768
3.70k
      return .errorCaught(error)
769
3.70k
    }
770
1.31M
  }
771
}
772
773
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseIdleState {
774
  mutating func readNextRequest(
775
    maxLength: Int
776
0
  ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
777
0
    return HTTP2ToRawGRPCStateMachine.read(
778
0
      from: &self.reader,
779
0
      requestStreamClosed: false,
780
0
      maxLength: maxLength
781
0
    )
782
0
  }
783
}
784
785
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
786
  mutating func readNextRequest(
787
    maxLength: Int
788
1.26M
  ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
789
1.26M
    return HTTP2ToRawGRPCStateMachine.read(
790
1.26M
      from: &self.reader,
791
1.26M
      requestStreamClosed: false,
792
1.26M
      maxLength: maxLength
793
1.26M
    )
794
1.26M
  }
795
}
796
797
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
798
  mutating func readNextRequest(
799
    maxLength: Int
800
0
  ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
801
0
    return HTTP2ToRawGRPCStateMachine.read(
802
0
      from: &self.reader,
803
0
      requestStreamClosed: true,
804
0
      maxLength: maxLength
805
0
    )
806
0
  }
807
}
808
809
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
810
  mutating func readNextRequest(
811
    maxLength: Int
812
53.7k
  ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
813
53.7k
    return HTTP2ToRawGRPCStateMachine.read(
814
53.7k
      from: &self.reader,
815
53.7k
      requestStreamClosed: true,
816
53.7k
      maxLength: maxLength
817
53.7k
    )
818
53.7k
  }
819
}
820
821
// MARK: - Top Level State Changes
822
823
extension HTTP2ToRawGRPCStateMachine {
824
  /// Receive request headers.
825
  mutating func receive(
826
    headers: HPACKHeaders,
827
    eventLoop: EventLoop,
828
    errorDelegate: ServerErrorDelegate?,
829
    remoteAddress: SocketAddress?,
830
    logger: Logger,
831
    allocator: ByteBufferAllocator,
832
    responseWriter: GRPCServerResponseWriter,
833
    closeFuture: EventLoopFuture<Void>,
834
    services: [Substring: CallHandlerProvider],
835
    encoding: ServerMessageEncoding,
836
    normalizeHeaders: Bool
837
283k
  ) -> ReceiveHeadersAction {
838
283k
    return self.state.receive(
839
283k
      headers: headers,
840
283k
      eventLoop: eventLoop,
841
283k
      errorDelegate: errorDelegate,
842
283k
      remoteAddress: remoteAddress,
843
283k
      logger: logger,
844
283k
      allocator: allocator,
845
283k
      responseWriter: responseWriter,
846
283k
      closeFuture: closeFuture,
847
283k
      services: services,
848
283k
      encoding: encoding,
849
283k
      normalizeHeaders: normalizeHeaders
850
283k
    )
851
283k
  }
852
853
  /// Receive request buffer.
854
  /// - Parameters:
855
  ///   - buffer: The received buffer.
856
  ///   - endStream: Whether end stream was set.
857
  /// - Returns: Returns whether the caller should try to read a message from the buffer.
858
1.21M
  mutating func receive(buffer: inout ByteBuffer, endStream: Bool) -> ReceiveDataAction {
859
1.21M
    self.state.receive(buffer: &buffer, endStream: endStream)
860
1.21M
  }
861
862
  /// Send response headers.
863
113k
  mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
864
113k
    self.state.send(headers: headers)
865
113k
  }
866
867
  /// Send a response buffer.
868
  mutating func send(
869
    buffer: ByteBuffer,
870
    compress: Bool,
871
    promise: EventLoopPromise<Void>?
872
1.26M
  ) -> Result<Void, Error> {
873
1.26M
    self.state.send(buffer: buffer, compress: compress, promise: promise)
874
1.26M
  }
875
876
181k
  mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
877
181k
    self.state.nextResponse()
878
181k
  }
879
880
  /// Send status and trailers.
881
  mutating func send(
882
    status: GRPCStatus,
883
    trailers: HPACKHeaders
884
113k
  ) -> HTTP2ToRawGRPCStateMachine.SendEndAction {
885
113k
    self.state.send(status: status, trailers: trailers)
886
113k
  }
887
888
  /// The pipeline has been configured with a service provider.
889
113k
  mutating func pipelineConfigured() -> PipelineConfiguredAction {
890
113k
    self.state.pipelineConfigured()
891
113k
  }
892
893
  /// Try to read a request message.
894
1.35M
  mutating func readNextRequest(maxLength: Int) -> ReadNextMessageAction {
895
1.35M
    self.state.readNextRequest(maxLength: maxLength)
896
1.35M
  }
897
}
898
899
extension HTTP2ToRawGRPCStateMachine.State {
900
113k
  mutating func pipelineConfigured() -> HTTP2ToRawGRPCStateMachine.PipelineConfiguredAction {
901
113k
    switch self {
902
113k
    case .requestIdleResponseIdle:
903
0
      preconditionFailure("Invalid state: pipeline configured before receiving request headers")
904
113k
905
113k
    case var .requestOpenResponseIdle(state):
906
113k
      let action = state.pipelineConfigured()
907
113k
      self = .requestOpenResponseIdle(state)
908
113k
      return action
909
113k
910
113k
    case var .requestClosedResponseIdle(state):
911
0
      let action = state.pipelineConfigured()
912
0
      self = .requestClosedResponseIdle(state)
913
0
      return action
914
113k
915
113k
    case .requestOpenResponseOpen,
916
0
      .requestOpenResponseClosed,
917
0
      .requestClosedResponseOpen,
918
0
      .requestClosedResponseClosed:
919
0
      preconditionFailure("Invalid state: response stream opened before pipeline was configured")
920
113k
    }
921
0
  }
922
923
  mutating func receive(
924
    headers: HPACKHeaders,
925
    eventLoop: EventLoop,
926
    errorDelegate: ServerErrorDelegate?,
927
    remoteAddress: SocketAddress?,
928
    logger: Logger,
929
    allocator: ByteBufferAllocator,
930
    responseWriter: GRPCServerResponseWriter,
931
    closeFuture: EventLoopFuture<Void>,
932
    services: [Substring: CallHandlerProvider],
933
    encoding: ServerMessageEncoding,
934
    normalizeHeaders: Bool
935
283k
  ) -> HTTP2ToRawGRPCStateMachine.ReceiveHeadersAction {
936
283k
    switch self {
937
283k
    // These are the only states in which we can receive headers. Everything else is invalid.
938
283k
    case .requestIdleResponseIdle,
939
283k
      .requestClosedResponseClosed:
940
283k
      let stateAndAction = self._receive(
941
283k
        headers: headers,
942
283k
        eventLoop: eventLoop,
943
283k
        errorDelegate: errorDelegate,
944
283k
        remoteAddress: remoteAddress,
945
283k
        logger: logger,
946
283k
        allocator: allocator,
947
283k
        responseWriter: responseWriter,
948
283k
        closeFuture: closeFuture,
949
283k
        services: services,
950
283k
        encoding: encoding,
951
283k
        normalizeHeaders: normalizeHeaders
952
283k
      )
953
283k
      self = stateAndAction.state
954
283k
      return stateAndAction.action
955
283k
956
283k
    // We can't receive headers in any of these states.
957
283k
    case .requestOpenResponseIdle,
958
0
      .requestOpenResponseOpen,
959
0
      .requestOpenResponseClosed,
960
0
      .requestClosedResponseIdle,
961
0
      .requestClosedResponseOpen:
962
0
      preconditionFailure("Invalid state: \(self)")
963
283k
    }
964
0
  }
965
966
  /// Receive a buffer from the client.
967
  mutating func receive(
968
    buffer: inout ByteBuffer,
969
    endStream: Bool
970
1.21M
  ) -> HTTP2ToRawGRPCStateMachine.ReceiveDataAction {
971
1.21M
    switch self {
972
1.21M
    case .requestIdleResponseIdle:
973
0
      /// This isn't allowed: we must receive the request headers first.
974
0
      preconditionFailure("Invalid state")
975
1.21M
976
1.21M
    case var .requestOpenResponseIdle(state):
977
0
      let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
978
0
      self = stateAndAction.state
979
0
      return stateAndAction.action
980
1.21M
981
1.21M
    case var .requestOpenResponseOpen(state):
982
139k
      let stateAndAction = state.receive(buffer: &buffer, endStream: endStream)
983
139k
      self = stateAndAction.state
984
139k
      return stateAndAction.action
985
1.21M
986
1.21M
    case .requestClosedResponseIdle,
987
0
      .requestClosedResponseOpen:
988
0
      preconditionFailure("Invalid state: the request stream is already closed")
989
1.21M
990
1.21M
    case .requestOpenResponseClosed:
991
58.3k
      if endStream {
992
58.1k
        // Server has finish responding and this is the end of the request stream; we're done for
993
58.1k
        // this RPC now, finish the handler.
994
58.1k
        self = .requestClosedResponseClosed
995
58.1k
        return .finishHandler
996
58.1k
      } else {
997
194
        // Server has finished responding but this isn't the end of the request stream; ignore the
998
194
        // input, we need to wait for end stream before tearing down the handler.
999
194
        return .nothing
1000
194
      }
1001
1.21M
1002
1.21M
    case .requestClosedResponseClosed:
1003
1.02M
      return .nothing
1004
1.21M
    }
1005
0
  }
1006
1007
  mutating func readNextRequest(
1008
    maxLength: Int
1009
1.35M
  ) -> HTTP2ToRawGRPCStateMachine.ReadNextMessageAction {
1010
1.35M
    switch self {
1011
1.35M
    case .requestIdleResponseIdle:
1012
0
      preconditionFailure("Invalid state")
1013
1.35M
1014
1.35M
    case var .requestOpenResponseIdle(state):
1015
0
      let action = state.readNextRequest(maxLength: maxLength)
1016
0
      self = .requestOpenResponseIdle(state)
1017
0
      return action
1018
1.35M
1019
1.35M
    case var .requestOpenResponseOpen(state):
1020
1.26M
      let action = state.readNextRequest(maxLength: maxLength)
1021
1.26M
      self = .requestOpenResponseOpen(state)
1022
1.26M
      return action
1023
1.35M
1024
1.35M
    case var .requestClosedResponseIdle(state):
1025
0
      let action = state.readNextRequest(maxLength: maxLength)
1026
0
      self = .requestClosedResponseIdle(state)
1027
0
      return action
1028
1.35M
1029
1.35M
    case var .requestClosedResponseOpen(state):
1030
53.7k
      let action = state.readNextRequest(maxLength: maxLength)
1031
53.7k
      self = .requestClosedResponseOpen(state)
1032
53.7k
      return action
1033
1.35M
1034
1.35M
    case .requestOpenResponseClosed,
1035
37.9k
      .requestClosedResponseClosed:
1036
37.9k
      return .none
1037
1.35M
    }
1038
0
  }
1039
1040
113k
  mutating func send(headers: HPACKHeaders) -> Result<HPACKHeaders, Error> {
1041
113k
    switch self {
1042
113k
    case .requestIdleResponseIdle:
1043
0
      preconditionFailure("Invalid state: the request stream isn't open")
1044
113k
1045
113k
    case let .requestOpenResponseIdle(state):
1046
113k
      let headers = state.send(headers: headers)
1047
113k
      self = .requestOpenResponseOpen(.init(from: state))
1048
113k
      return .success(headers)
1049
113k
1050
113k
    case let .requestClosedResponseIdle(state):
1051
0
      let headers = state.send(headers: headers)
1052
0
      self = .requestClosedResponseOpen(.init(from: state))
1053
0
      return .success(headers)
1054
113k
1055
113k
    case .requestOpenResponseOpen,
1056
0
      .requestOpenResponseClosed,
1057
0
      .requestClosedResponseOpen,
1058
0
      .requestClosedResponseClosed:
1059
0
      return .failure(GRPCError.AlreadyComplete())
1060
113k
    }
1061
0
  }
1062
1063
  mutating func send(
1064
    buffer: ByteBuffer,
1065
    compress: Bool,
1066
    promise: EventLoopPromise<Void>?
1067
1.26M
  ) -> Result<Void, Error> {
1068
1.26M
    switch self {
1069
1.26M
    case .requestIdleResponseIdle:
1070
0
      preconditionFailure("Invalid state: the request stream is still closed")
1071
1.26M
1072
1.26M
    case .requestOpenResponseIdle,
1073
0
      .requestClosedResponseIdle:
1074
0
      let error = GRPCError.InvalidState("Response headers must be sent before response message")
1075
0
      return .failure(error)
1076
1.26M
1077
1.26M
    case var .requestOpenResponseOpen(state):
1078
1.22M
      self = .requestClosedResponseClosed
1079
1.22M
      state.send(buffer: buffer, compress: compress, promise: promise)
1080
1.22M
      self = .requestOpenResponseOpen(state)
1081
1.22M
      return .success(())
1082
1.26M
1083
1.26M
    case var .requestClosedResponseOpen(state):
1084
36.6k
      self = .requestClosedResponseClosed
1085
36.6k
      state.send(buffer: buffer, compress: compress, promise: promise)
1086
36.6k
      self = .requestClosedResponseOpen(state)
1087
36.6k
      return .success(())
1088
1.26M
1089
1.26M
    case .requestOpenResponseClosed,
1090
0
      .requestClosedResponseClosed:
1091
0
      return .failure(GRPCError.AlreadyComplete())
1092
1.26M
    }
1093
0
  }
1094
1095
181k
  mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
1096
181k
    switch self {
1097
181k
    case .requestIdleResponseIdle:
1098
0
      preconditionFailure("Invalid state: the request stream is still closed")
1099
181k
1100
181k
    case .requestOpenResponseIdle,
1101
0
      .requestClosedResponseIdle:
1102
0
      return nil
1103
181k
1104
181k
    case var .requestOpenResponseOpen(state):
1105
82.3k
      self = .requestClosedResponseClosed
1106
82.3k
      let result = state.writer.next()
1107
82.3k
      self = .requestOpenResponseOpen(state)
1108
82.3k
      return result
1109
181k
1110
181k
    case var .requestClosedResponseOpen(state):
1111
96.8k
      self = .requestClosedResponseClosed
1112
96.8k
      let result = state.writer.next()
1113
96.8k
      self = .requestClosedResponseOpen(state)
1114
96.8k
      return result
1115
181k
1116
181k
    case .requestOpenResponseClosed,
1117
2.59k
      .requestClosedResponseClosed:
1118
2.59k
      return nil
1119
181k
    }
1120
0
  }
1121
1122
  mutating func send(
1123
    status: GRPCStatus,
1124
    trailers: HPACKHeaders
1125
113k
  ) -> HTTP2ToRawGRPCStateMachine.SendEndAction {
1126
113k
    switch self {
1127
113k
    case .requestIdleResponseIdle:
1128
0
      preconditionFailure("Invalid state: the request stream is still closed")
1129
113k
1130
113k
    case let .requestOpenResponseIdle(state):
1131
0
      self = .requestOpenResponseClosed
1132
0
      return .sendTrailers(state.send(status: status, trailers: trailers))
1133
113k
1134
113k
    case let .requestClosedResponseIdle(state):
1135
0
      self = .requestClosedResponseClosed
1136
0
      return .sendTrailersAndFinish(state.send(status: status, trailers: trailers))
1137
113k
1138
113k
    case let .requestOpenResponseOpen(state):
1139
59.7k
      self = .requestOpenResponseClosed
1140
59.7k
      return .sendTrailers(state.send(status: status, trailers: trailers))
1141
113k
1142
113k
    case let .requestClosedResponseOpen(state):
1143
53.7k
      self = .requestClosedResponseClosed
1144
53.7k
      return .sendTrailersAndFinish(state.send(status: status, trailers: trailers))
1145
113k
1146
113k
    case .requestOpenResponseClosed,
1147
0
      .requestClosedResponseClosed:
1148
0
      return .failure(GRPCError.AlreadyComplete())
1149
113k
    }
1150
0
  }
1151
}
1152
1153
// MARK: - Helpers
1154
1155
extension HTTP2ToRawGRPCStateMachine {
1156
  static func makeResponseHeaders(
1157
    contentType: ContentType,
1158
    responseEncoding: String?,
1159
    acceptableRequestEncoding: String?,
1160
    userProvidedHeaders: HPACKHeaders,
1161
    normalizeUserProvidedHeaders: Bool
1162
113k
  ) -> HPACKHeaders {
1163
113k
    // 4 because ':status' and 'content-type' are required. We may send back 'grpc-encoding' and
1164
113k
    // 'grpc-accept-encoding' as well.
1165
113k
    let capacity = 4 + userProvidedHeaders.count
1166
113k
1167
113k
    var headers = HPACKHeaders()
1168
113k
    headers.reserveCapacity(capacity)
1169
113k
1170
113k
    headers.add(name: ":status", value: "200")
1171
113k
    headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
1172
113k
1173
113k
    if let responseEncoding = responseEncoding {
1174
0
      headers.add(name: GRPCHeaderName.encoding, value: responseEncoding)
1175
0
    }
1176
113k
1177
113k
    if let acceptEncoding = acceptableRequestEncoding {
1178
0
      headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
1179
0
    }
1180
113k
1181
113k
    // Add user provided headers, normalizing if required.
1182
113k
    headers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
1183
113k
1184
113k
    return headers
1185
113k
  }
1186
1187
  static func makeResponseTrailersOnly(
1188
    for status: GRPCStatus,
1189
    contentType: ContentType,
1190
    acceptableRequestEncoding: String?,
1191
    userProvidedHeaders: HPACKHeaders?,
1192
    normalizeUserProvidedHeaders: Bool
1193
14.1k
  ) -> HPACKHeaders {
1194
14.1k
    // 5 because ':status', 'content-type', 'grpc-status' are required. We may also send back
1195
14.1k
    // 'grpc-message' and 'grpc-accept-encoding'.
1196
23.0k
    let capacity = 5 + (userProvidedHeaders.map { $0.count } ?? 0)
1197
14.1k
1198
14.1k
    var headers = HPACKHeaders()
1199
14.1k
    headers.reserveCapacity(capacity)
1200
14.1k
1201
14.1k
    // Add the required trailers.
1202
14.1k
    headers.add(name: ":status", value: "200")
1203
14.1k
    headers.add(name: GRPCHeaderName.contentType, value: contentType.canonicalValue)
1204
14.1k
    headers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
1205
14.1k
1206
23.0k
    if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
1207
14.1k
      headers.add(name: GRPCHeaderName.statusMessage, value: message)
1208
14.1k
    }
1209
14.1k
1210
14.1k
    // We may include this if the requested encoding was not valid.
1211
14.1k
    if let acceptEncoding = acceptableRequestEncoding {
1212
0
      headers.add(name: GRPCHeaderName.acceptEncoding, value: acceptEncoding)
1213
0
    }
1214
14.1k
1215
14.1k
    if let userProvided = userProvidedHeaders {
1216
0
      headers.add(contentsOf: userProvided, normalize: normalizeUserProvidedHeaders)
1217
0
    }
1218
14.1k
1219
14.1k
    return headers
1220
14.1k
  }
1221
1222
  static func makeResponseTrailers(
1223
    for status: GRPCStatus,
1224
    userProvidedHeaders: HPACKHeaders,
1225
    normalizeUserProvidedHeaders: Bool
1226
113k
  ) -> HPACKHeaders {
1227
113k
    // Most RPCs should end with status code 'ok' (hopefully!), and if the user didn't provide any
1228
113k
    // additional trailers, then we can use a pre-canned set of headers to avoid an extra
1229
113k
    // allocation.
1230
113k
    if status == .ok, userProvidedHeaders.isEmpty {
1231
65.6k
      return Self.gRPCStatusOkTrailers
1232
65.6k
    }
1233
47.7k
1234
47.7k
    // 2 because 'grpc-status' is required, we may also send back 'grpc-message'.
1235
47.7k
    let capacity = 2 + userProvidedHeaders.count
1236
47.7k
1237
47.7k
    var trailers = HPACKHeaders()
1238
47.7k
    trailers.reserveCapacity(capacity)
1239
47.7k
1240
47.7k
    // status code.
1241
47.7k
    trailers.add(name: GRPCHeaderName.statusCode, value: String(describing: status.code.rawValue))
1242
47.7k
1243
47.7k
    // status message, if present.
1244
72.3k
    if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
1245
47.7k
      trailers.add(name: GRPCHeaderName.statusMessage, value: message)
1246
47.7k
    }
1247
47.7k
1248
47.7k
    // user provided trailers.
1249
47.7k
    trailers.add(contentsOf: userProvidedHeaders, normalize: normalizeUserProvidedHeaders)
1250
47.7k
1251
47.7k
    return trailers
1252
113k
  }
1253
1254
  private static let gRPCStatusOkTrailers: HPACKHeaders = [
1255
    GRPCHeaderName.statusCode: String(describing: GRPCStatus.Code.ok.rawValue)
1256
  ]
1257
}
1258
1259
extension HPACKHeaders {
1260
230k
  fileprivate mutating func add(contentsOf other: HPACKHeaders, normalize: Bool) {
1261
230k
    if normalize {
1262
72.3k
      self.add(
1263
72.3k
        contentsOf: other.lazy.map { name, value, indexable in
1264
0
          (name: name.lowercased(), value: value, indexable: indexable)
1265
0
        }
1266
72.3k
      )
1267
158k
    } else {
1268
158k
      self.add(contentsOf: other)
1269
158k
    }
1270
230k
  }
1271
}