Coverage Report

Created: 2026-03-11 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/GRPCServerPipelineConfigurator.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP1
20
import NIOHTTP2
21
import NIOTLS
22
23
/// Configures a server pipeline for gRPC with the appropriate handlers depending on the HTTP
24
/// version used for transport.
25
///
26
/// If TLS is enabled then the handler listens for an 'TLSUserEvent.handshakeCompleted' event and
27
/// configures the pipeline appropriately for the protocol negotiated via ALPN. If TLS is not
28
/// configured then the HTTP version is determined by parsing the inbound byte stream.
29
final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChannelHandler {
30
  internal typealias InboundIn = ByteBuffer
31
  internal typealias InboundOut = ByteBuffer
32
33
  /// The server configuration.
34
  private let configuration: Server.Configuration
35
36
  /// A buffer containing the buffered bytes.
37
  private var buffer: ByteBuffer?
38
39
  /// The current state.
40
  private var state: State
41
42
  private enum ALPN {
43
    /// ALPN is expected. It may or may not be required, however.
44
    case expected(required: Bool)
45
46
    /// ALPN was expected but not required and no protocol was negotiated in the handshake. We may
47
    /// now fall back to parsing bytes on the connection.
48
    case expectedButFallingBack
49
50
    /// ALPN is not expected; this is a cleartext connection.
51
    case notExpected
52
  }
53
54
  private enum State {
55
    /// The pipeline isn't configured yet.
56
    case notConfigured(alpn: ALPN)
57
    /// We're configuring the pipeline.
58
    case configuring
59
  }
60
61
72.5k
  init(configuration: Server.Configuration) {
62
72.5k
    if let tls = configuration.tlsConfiguration {
63
0
      self.state = .notConfigured(alpn: .expected(required: tls.requireALPN))
64
72.5k
    } else {
65
72.5k
      self.state = .notConfigured(alpn: .notExpected)
66
72.5k
    }
67
72.5k
68
72.5k
    self.configuration = configuration
69
72.5k
  }
70
71
  /// Makes a gRPC idle handler for the server..
72
11.7k
  private func makeIdleHandler() -> GRPCIdleHandler {
73
11.7k
    return .init(
74
11.7k
      idleTimeout: self.configuration.connectionIdleTimeout,
75
11.7k
      keepalive: self.configuration.connectionKeepalive,
76
11.7k
      logger: self.configuration.logger
77
11.7k
    )
78
11.7k
  }
79
80
  /// Makes an HTTP/2 handler.
81
11.7k
  private func makeHTTP2Handler() -> NIOHTTP2Handler {
82
11.7k
    var configuration = NIOHTTP2Handler.ConnectionConfiguration()
83
11.7k
    configuration.initialSettings = [
84
11.7k
      HTTP2Setting(
85
11.7k
        parameter: .maxConcurrentStreams,
86
11.7k
        value: self.configuration.httpMaxConcurrentStreams
87
11.7k
      ),
88
11.7k
      HTTP2Setting(
89
11.7k
        parameter: .maxHeaderListSize,
90
11.7k
        value: HPACKDecoder.defaultMaxHeaderListSize
91
11.7k
      ),
92
11.7k
      HTTP2Setting(
93
11.7k
        parameter: .maxFrameSize,
94
11.7k
        value: self.configuration.httpMaxFrameSize
95
11.7k
      ),
96
11.7k
      HTTP2Setting(
97
11.7k
        parameter: .initialWindowSize,
98
11.7k
        value: self.configuration.httpTargetWindowSize
99
11.7k
      ),
100
11.7k
    ]
101
11.7k
    configuration.maximumRecentlyResetStreams = self.configuration.httpMaxResetStreams
102
11.7k
    return NIOHTTP2Handler(mode: .server, connectionConfiguration: configuration)
103
11.7k
  }
104
105
  /// Makes an HTTP/2 multiplexer suitable handling gRPC requests.
106
11.7k
  private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer {
107
11.7k
    return .init(
108
11.7k
      mode: .server,
109
11.7k
      channel: channel,
110
11.7k
      targetWindowSize: self.configuration.httpTargetWindowSize
111
26.1k
    ) { [logger = self.configuration.logger] stream in
112
26.1k
      // Sync options were added to the HTTP/2 stream channel in 1.17.0 (we require at least this)
113
26.1k
      // so this shouldn't be `nil`, but it's not a problem if it is.
114
26.1k
      let http2StreamID = try? stream.syncOptions?.getOption(HTTP2StreamChannelOptions.streamID)
115
26.1k
      let streamID =
116
26.1k
        http2StreamID.map { streamID in
117
26.1k
          return String(Int(streamID))
118
26.1k
        } ?? "<unknown>"
119
26.1k
120
26.1k
      var logger = logger
121
26.1k
      logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
122
26.1k
123
26.1k
      do {
124
26.1k
        // TODO: provide user configuration for header normalization.
125
26.1k
        let handler = self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: true, logger: logger)
126
26.1k
        try stream.pipeline.syncOperations.addHandler(handler)
127
26.1k
        return stream.eventLoop.makeSucceededVoidFuture()
128
26.1k
      } catch {
129
0
        return stream.eventLoop.makeFailedFuture(error)
130
0
      }
131
26.1k
    }
132
11.7k
  }
133
134
  /// Makes an HTTP/2 to raw gRPC server handler.
135
  private func makeHTTP2ToRawGRPCHandler(
136
    normalizeHeaders: Bool,
137
    logger: Logger
138
45.1k
  ) -> HTTP2ToRawGRPCServerCodec {
139
45.1k
    return HTTP2ToRawGRPCServerCodec(
140
45.1k
      servicesByName: self.configuration.serviceProvidersByName,
141
45.1k
      encoding: self.configuration.messageEncoding,
142
45.1k
      errorDelegate: self.configuration.errorDelegate,
143
45.1k
      normalizeHeaders: normalizeHeaders,
144
45.1k
      maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength,
145
45.1k
      logger: logger
146
45.1k
    )
147
45.1k
  }
148
149
  /// The pipeline finished configuring.
150
30.6k
  private func configurationCompleted(result: Result<Void, Error>, context: ChannelHandlerContext) {
151
30.6k
    switch result {
152
30.6k
    case .success:
153
30.6k
      context.pipeline.removeHandler(context: context, promise: nil)
154
30.6k
    case let .failure(error):
155
0
      self.errorCaught(context: context, error: error)
156
30.6k
    }
157
30.6k
  }
158
159
  /// Configures the pipeline to handle gRPC requests on an HTTP/2 connection.
160
11.7k
  private func configureHTTP2(context: ChannelHandlerContext) {
161
11.7k
    // We're now configuring the pipeline.
162
11.7k
    self.state = .configuring
163
11.7k
164
11.7k
    // We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
165
11.7k
    // to then insert our keepalive and idle handlers between. We can just add everything together.
166
11.7k
    let result: Result<Void, Error>
167
11.7k
168
11.7k
    do {
169
11.7k
      // This is only ever called as a result of reading a user inbound event or reading inbound so
170
11.7k
      // we'll be on the right event loop and sync operations are fine.
171
11.7k
      let sync = context.pipeline.syncOperations
172
11.7k
      try sync.addHandler(self.makeHTTP2Handler())
173
11.7k
      try sync.addHandler(self.makeIdleHandler())
174
11.7k
      try sync.addHandler(self.makeHTTP2Multiplexer(for: context.channel))
175
11.7k
      result = .success(())
176
11.7k
    } catch {
177
0
      result = .failure(error)
178
11.7k
    }
179
11.7k
180
11.7k
    self.configurationCompleted(result: result, context: context)
181
11.7k
  }
182
183
  /// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
184
18.9k
  private func configureHTTP1(context: ChannelHandlerContext) {
185
18.9k
    // We're now configuring the pipeline.
186
18.9k
    self.state = .configuring
187
18.9k
188
18.9k
    let result: Result<Void, Error>
189
18.9k
    do {
190
18.9k
      // This is only ever called as a result of reading a user inbound event or reading inbound so
191
18.9k
      // we'll be on the right event loop and sync operations are fine.
192
18.9k
      let sync = context.pipeline.syncOperations
193
18.9k
      try sync.configureHTTPServerPipeline(withErrorHandling: true)
194
18.9k
      try sync.addHandler(WebCORSHandler(configuration: self.configuration.webCORS))
195
18.9k
      let scheme = self.configuration.tlsConfiguration == nil ? "http" : "https"
196
18.9k
      try sync.addHandler(GRPCWebToHTTP2ServerCodec(scheme: scheme))
197
18.9k
      // There's no need to normalize headers for HTTP/1.
198
18.9k
      try sync.addHandler(
199
18.9k
        self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger)
200
18.9k
      )
201
18.9k
      result = .success(())
202
18.9k
    } catch {
203
0
      result = .failure(error)
204
18.9k
    }
205
18.9k
206
18.9k
    self.configurationCompleted(result: result, context: context)
207
18.9k
  }
208
209
  /// Attempts to determine the HTTP version from the buffer and then configure the pipeline
210
  /// appropriately. Closes the connection if the HTTP version could not be determined.
211
  private func determineHTTPVersionAndConfigurePipeline(
212
    buffer: ByteBuffer,
213
    context: ChannelHandlerContext
214
31.2k
  ) {
215
31.2k
    switch HTTPVersionParser.determineHTTPVersion(buffer) {
216
31.2k
    case .http2:
217
11.7k
      self.configureHTTP2(context: context)
218
31.2k
    case .http1:
219
18.9k
      self.configureHTTP1(context: context)
220
31.2k
    case .unknown:
221
96
      // Neither H2 nor H1 or the length limit has been exceeded.
222
96
      self.configuration.logger.error("Unable to determine http version, closing")
223
96
      context.close(mode: .all, promise: nil)
224
31.2k
    case .notEnoughBytes:
225
410
      ()  // Try again with more bytes.
226
31.2k
    }
227
31.2k
  }
228
229
  /// Handles a 'TLSUserEvent.handshakeCompleted' event and configures the pipeline to handle gRPC
230
  /// requests.
231
  private func handleHandshakeCompletedEvent(
232
    _ event: TLSUserEvent,
233
    alpnIsRequired: Bool,
234
    context: ChannelHandlerContext
235
0
  ) {
236
0
    switch event {
237
0
    case let .handshakeCompleted(negotiatedProtocol):
238
0
      let tlsVersion = try? context.channel.getTLSVersionSync()
239
0
      self.configuration.logger.debug(
240
0
        "TLS handshake completed",
241
0
        metadata: [
242
0
          "alpn": "\(negotiatedProtocol ?? "nil")",
243
0
          "tls_version": "\(tlsVersion.map(String.init(describing:)) ?? "nil")",
244
0
        ]
245
0
      )
246
0
247
0
      switch negotiatedProtocol {
248
0
      case let .some(negotiated):
249
0
        if GRPCApplicationProtocolIdentifier.isHTTP2Like(negotiated) {
250
0
          self.configureHTTP2(context: context)
251
0
        } else if GRPCApplicationProtocolIdentifier.isHTTP1(negotiated) {
252
0
          self.configureHTTP1(context: context)
253
0
        } else {
254
0
          self.configuration.logger.warning("Unsupported ALPN identifier '\(negotiated)', closing")
255
0
          context.close(mode: .all, promise: nil)
256
0
        }
257
0
258
0
      case .none:
259
0
        if alpnIsRequired {
260
0
          self.configuration.logger.warning("No ALPN protocol negotiated, closing'")
261
0
          context.close(mode: .all, promise: nil)
262
0
        } else {
263
0
          self.configuration.logger.warning("No ALPN protocol negotiated'")
264
0
          // We're now falling back to parsing bytes.
265
0
          self.state = .notConfigured(alpn: .expectedButFallingBack)
266
0
          self.tryParsingBufferedData(context: context)
267
0
        }
268
0
      }
269
0
270
0
    case .shutdownCompleted:
271
0
      // We don't care about this here.
272
0
      ()
273
0
    }
274
0
  }
275
276
  /// Try to parse the buffered data to determine whether or not HTTP/2 or HTTP/1 should be used.
277
31.2k
  private func tryParsingBufferedData(context: ChannelHandlerContext) {
278
31.2k
    if let buffer = self.buffer {
279
31.2k
      self.determineHTTPVersionAndConfigurePipeline(buffer: buffer, context: context)
280
31.2k
    }
281
31.2k
  }
282
283
  // MARK: - Channel Handler
284
285
0
  internal func errorCaught(context: ChannelHandlerContext, error: Error) {
286
0
    if let delegate = self.configuration.errorDelegate {
287
0
      let baseError: Error
288
0
289
0
      if let errorWithContext = error as? GRPCError.WithContext {
290
0
        baseError = errorWithContext.error
291
0
      } else {
292
0
        baseError = error
293
0
      }
294
0
295
0
      delegate.observeLibraryError(baseError)
296
0
    }
297
0
298
0
    context.close(mode: .all, promise: nil)
299
0
  }
300
301
0
  internal func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
302
0
    switch self.state {
303
0
    case let .notConfigured(alpn: .expected(required)):
304
0
      if let event = event as? TLSUserEvent {
305
0
        self.handleHandshakeCompletedEvent(event, alpnIsRequired: required, context: context)
306
0
      }
307
0
308
0
    case .notConfigured(alpn: .expectedButFallingBack),
309
0
      .notConfigured(alpn: .notExpected),
310
0
      .configuring:
311
0
      ()
312
0
    }
313
0
314
0
    context.fireUserInboundEventTriggered(event)
315
0
  }
316
317
18.1k
  internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
318
18.1k
    var buffer = self.unwrapInboundIn(data)
319
18.1k
    self.buffer.setOrWriteBuffer(&buffer)
320
18.1k
321
18.1k
    switch self.state {
322
18.1k
    case .notConfigured(alpn: .notExpected),
323
18.1k
      .notConfigured(alpn: .expectedButFallingBack):
324
18.1k
      // If ALPN isn't expected, or we didn't negotiate via ALPN and we don't require it then we
325
18.1k
      // can try parsing the data we just buffered.
326
18.1k
      self.tryParsingBufferedData(context: context)
327
18.1k
328
18.1k
    case .notConfigured(alpn: .expected),
329
0
      .configuring:
330
0
      // We expect ALPN or we're being configured, just buffer the data, we'll forward it later.
331
0
      ()
332
18.1k
    }
333
18.1k
334
18.1k
    // Don't forward the reads: we'll do so when we have configured the pipeline.
335
18.1k
  }
336
337
  internal func removeHandler(
338
    context: ChannelHandlerContext,
339
    removalToken: ChannelHandlerContext.RemovalToken
340
17.9k
  ) {
341
17.9k
    // Forward any buffered reads.
342
17.9k
    if let buffer = self.buffer {
343
17.9k
      self.buffer = nil
344
17.9k
      context.fireChannelRead(self.wrapInboundOut(buffer))
345
17.9k
    }
346
17.9k
    context.leavePipeline(removalToken: removalToken)
347
17.9k
  }
348
}
349
350
// MARK: - HTTP Version Parser
351
352
struct HTTPVersionParser {
353
  /// HTTP/2 connection preface bytes. See RFC 7540 § 5.3.
354
  private static let http2ClientMagic = [
355
    UInt8(ascii: "P"),
356
    UInt8(ascii: "R"),
357
    UInt8(ascii: "I"),
358
    UInt8(ascii: " "),
359
    UInt8(ascii: "*"),
360
    UInt8(ascii: " "),
361
    UInt8(ascii: "H"),
362
    UInt8(ascii: "T"),
363
    UInt8(ascii: "T"),
364
    UInt8(ascii: "P"),
365
    UInt8(ascii: "/"),
366
    UInt8(ascii: "2"),
367
    UInt8(ascii: "."),
368
    UInt8(ascii: "0"),
369
    UInt8(ascii: "\r"),
370
    UInt8(ascii: "\n"),
371
    UInt8(ascii: "\r"),
372
    UInt8(ascii: "\n"),
373
    UInt8(ascii: "S"),
374
    UInt8(ascii: "M"),
375
    UInt8(ascii: "\r"),
376
    UInt8(ascii: "\n"),
377
    UInt8(ascii: "\r"),
378
    UInt8(ascii: "\n"),
379
  ]
380
381
  /// Determines whether the bytes in the `ByteBuffer` are prefixed with the HTTP/2 client
382
  /// connection preface.
383
18.1k
  static func prefixedWithHTTP2ConnectionPreface(_ buffer: ByteBuffer) -> SubParseResult {
384
18.1k
    let view = buffer.readableBytesView
385
18.1k
386
18.1k
    guard view.count >= HTTPVersionParser.http2ClientMagic.count else {
387
1.24k
      // Not enough bytes.
388
1.24k
      return .notEnoughBytes
389
16.9k
    }
390
16.9k
391
16.9k
    let slice = view[view.startIndex ..< view.startIndex.advanced(by: self.http2ClientMagic.count)]
392
16.9k
    return slice.elementsEqual(HTTPVersionParser.http2ClientMagic) ? .accepted : .rejected
393
18.1k
  }
394
395
  enum ParseResult: Hashable {
396
    case http1
397
    case http2
398
    case unknown
399
    case notEnoughBytes
400
  }
401
402
  enum SubParseResult: Hashable {
403
    case accepted
404
    case rejected
405
    case notEnoughBytes
406
  }
407
408
  private static let maxLengthToCheck = 1024
409
410
18.1k
  static func determineHTTPVersion(_ buffer: ByteBuffer) -> ParseResult {
411
18.1k
    switch Self.prefixedWithHTTP2ConnectionPreface(buffer) {
412
18.1k
    case .accepted:
413
6.86k
      return .http2
414
18.1k
415
18.1k
    case .notEnoughBytes:
416
1.24k
      switch Self.prefixedWithHTTP1RequestLine(buffer) {
417
1.24k
      case .accepted:
418
1.10k
        // Not enough bytes to check H2, but enough to confirm H1.
419
1.10k
        return .http1
420
1.24k
      case .notEnoughBytes:
421
117
        // Not enough bytes to check H2 or H1.
422
117
        return .notEnoughBytes
423
1.24k
      case .rejected:
424
28
        // Not enough bytes to check H2 and definitely not H1.
425
28
        return .notEnoughBytes
426
1.24k
      }
427
18.1k
428
18.1k
    case .rejected:
429
10.0k
      switch Self.prefixedWithHTTP1RequestLine(buffer) {
430
10.0k
      case .accepted:
431
9.97k
        // Not H2, but H1 is confirmed.
432
9.97k
        return .http1
433
10.0k
      case .notEnoughBytes:
434
22
        // Not H2, but not enough bytes to reject H1 yet.
435
22
        return .notEnoughBytes
436
10.0k
      case .rejected:
437
40
        // Not H2 or H1.
438
40
        return .unknown
439
10.0k
      }
440
18.1k
    }
441
18.1k
  }
442
443
  private static let http1_1 = [
444
    UInt8(ascii: "H"),
445
    UInt8(ascii: "T"),
446
    UInt8(ascii: "T"),
447
    UInt8(ascii: "P"),
448
    UInt8(ascii: "/"),
449
    UInt8(ascii: "1"),
450
    UInt8(ascii: "."),
451
    UInt8(ascii: "1"),
452
  ]
453
454
  /// Determines whether the bytes in the `ByteBuffer` are prefixed with an HTTP/1.1 request line.
455
11.2k
  static func prefixedWithHTTP1RequestLine(_ buffer: ByteBuffer) -> SubParseResult {
456
11.2k
    var readableBytesView = buffer.readableBytesView
457
11.2k
458
11.2k
    // We don't need to validate the request line, only determine whether we think it's an HTTP1
459
11.2k
    // request line. Another handler will parse it properly.
460
11.2k
461
11.2k
    // From RFC 2616 § 5.1:
462
11.2k
    //   Request-Line = Method SP Request-URI SP HTTP-Version CRLF
463
11.2k
464
11.2k
    // Get through the first space.
465
11.2k
    guard readableBytesView.dropPrefix(through: UInt8(ascii: " ")) != nil else {
466
86
      let tooLong = buffer.readableBytes > Self.maxLengthToCheck
467
86
      return tooLong ? .rejected : .notEnoughBytes
468
11.1k
    }
469
11.1k
470
11.1k
    // Get through the second space.
471
11.1k
    guard readableBytesView.dropPrefix(through: UInt8(ascii: " ")) != nil else {
472
39
      let tooLong = buffer.readableBytes > Self.maxLengthToCheck
473
39
      return tooLong ? .rejected : .notEnoughBytes
474
11.1k
    }
475
11.1k
476
11.1k
    // +2 for \r\n
477
11.1k
    guard readableBytesView.count >= (Self.http1_1.count + 2) else {
478
24
      return .notEnoughBytes
479
11.1k
    }
480
11.1k
481
11.1k
    guard let version = readableBytesView.dropPrefix(through: UInt8(ascii: "\r")),
482
11.1k
      readableBytesView.first == UInt8(ascii: "\n")
483
11.1k
    else {
484
28
      // If we didn't drop the prefix OR we did and the next byte wasn't '\n', then we had enough
485
28
      // bytes but the '\r\n' wasn't present: reject this as being HTTP1.
486
28
      return .rejected
487
11.1k
    }
488
11.1k
489
11.1k
    return version.elementsEqual(Self.http1_1) ? .accepted : .rejected
490
11.2k
  }
491
}
492
493
extension Collection where Self == Self.SubSequence, Self.Element: Equatable {
494
  /// Drops the prefix off the collection up to and including the first `separator`
495
  /// only if that separator appears in the collection.
496
  ///
497
  /// Returns the prefix up to but not including the separator if it was found, nil otherwise.
498
33.6k
  mutating func dropPrefix(through separator: Element) -> SubSequence? {
499
33.6k
    if self.isEmpty {
500
22
      return nil
501
33.5k
    }
502
33.5k
503
33.5k
    guard let separatorIndex = self.firstIndex(of: separator) else {
504
114
      return nil
505
33.4k
    }
506
33.4k
507
33.4k
    let prefix = self[..<separatorIndex]
508
33.4k
    self = self[self.index(after: separatorIndex)...]
509
33.4k
    return prefix
510
33.6k
  }
511
}