Coverage Report

Created: 2026-02-14 07:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP2
20
21
internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
22
  typealias InboundIn = HTTP2Frame.FramePayload
23
  typealias OutboundOut = HTTP2Frame.FramePayload
24
25
  private var logger: Logger
26
  private var state: HTTP2ToRawGRPCStateMachine
27
  private let errorDelegate: ServerErrorDelegate?
28
  private var context: ChannelHandlerContext!
29
30
  private let servicesByName: [Substring: CallHandlerProvider]
31
  private let encoding: ServerMessageEncoding
32
  private let normalizeHeaders: Bool
33
  private let maxReceiveMessageLength: Int
34
35
  /// The configuration state of the handler.
36
46.7k
  private var configurationState: Configuration = .notConfigured
37
38
  /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
39
  /// burst of reading has completed.
40
46.7k
  private var isReading = false
41
42
  /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
43
  /// then it is held until the read completes in order to elide unnecessary flushes.
44
46.7k
  private var flushPending = false
45
46
  private enum Configuration {
47
    case notConfigured
48
    case configured(GRPCServerHandlerProtocol)
49
50
144k
    var isConfigured: Bool {
51
144k
      switch self {
52
144k
      case .configured:
53
0
        return true
54
144k
      case .notConfigured:
55
144k
        return false
56
144k
      }
57
144k
    }
58
59
218k
    mutating func tearDown() -> GRPCServerHandlerProtocol? {
60
218k
      switch self {
61
218k
      case .notConfigured:
62
44.0k
        return nil
63
218k
      case let .configured(handler):
64
174k
        self = .notConfigured
65
174k
        return handler
66
218k
      }
67
218k
    }
68
  }
69
70
  init(
71
    servicesByName: [Substring: CallHandlerProvider],
72
    encoding: ServerMessageEncoding,
73
    errorDelegate: ServerErrorDelegate?,
74
    normalizeHeaders: Bool,
75
    maximumReceiveMessageLength: Int,
76
    logger: Logger
77
28.3k
  ) {
78
28.3k
    self.logger = logger
79
28.3k
    self.errorDelegate = errorDelegate
80
28.3k
    self.servicesByName = servicesByName
81
28.3k
    self.encoding = encoding
82
28.3k
    self.normalizeHeaders = normalizeHeaders
83
28.3k
    self.maxReceiveMessageLength = maximumReceiveMessageLength
84
28.3k
    self.state = HTTP2ToRawGRPCStateMachine()
85
28.3k
  }
86
87
28.3k
  internal func handlerAdded(context: ChannelHandlerContext) {
88
28.3k
    self.context = context
89
28.3k
  }
90
91
28.3k
  internal func handlerRemoved(context: ChannelHandlerContext) {
92
28.3k
    self.context = nil
93
28.3k
    self.configurationState = .notConfigured
94
28.3k
  }
95
96
12.3k
  internal func errorCaught(context: ChannelHandlerContext, error: Error) {
97
12.3k
    switch self.configurationState {
98
12.3k
    case .notConfigured:
99
10.7k
      context.close(mode: .all, promise: nil)
100
12.3k
    case let .configured(hander):
101
1.57k
      hander.receiveError(error)
102
12.3k
    }
103
12.3k
  }
104
105
28.3k
  internal func channelInactive(context: ChannelHandlerContext) {
106
28.3k
    if let handler = self.configurationState.tearDown() {
107
1.57k
      handler.finish()
108
26.7k
    } else {
109
26.7k
      context.fireChannelInactive()
110
26.7k
    }
111
28.3k
  }
112
113
1.49M
  internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
114
1.49M
    self.isReading = true
115
1.49M
    let payload = self.unwrapInboundIn(data)
116
1.49M
117
1.49M
    switch payload {
118
1.49M
    case let .headers(payload):
119
300k
      let receiveHeaders = self.state.receive(
120
300k
        headers: payload.headers,
121
300k
        eventLoop: context.eventLoop,
122
300k
        errorDelegate: self.errorDelegate,
123
300k
        remoteAddress: context.channel.remoteAddress,
124
300k
        logger: self.logger,
125
300k
        allocator: context.channel.allocator,
126
300k
        responseWriter: self,
127
300k
        closeFuture: context.channel.closeFuture,
128
300k
        services: self.servicesByName,
129
300k
        encoding: self.encoding,
130
300k
        normalizeHeaders: self.normalizeHeaders
131
300k
      )
132
300k
133
300k
      switch receiveHeaders {
134
300k
      case let .configure(handler):
135
121k
        assert(!self.configurationState.isConfigured)
136
121k
        self.configurationState = .configured(handler)
137
121k
        self.configured()
138
300k
139
300k
      case let .rejectRPC(trailers):
140
179k
        assert(!self.configurationState.isConfigured)
141
179k
        // We're not handling this request: write headers and end stream.
142
179k
        let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
143
179k
        context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
144
300k
      }
145
1.49M
146
1.49M
    case let .data(payload):
147
1.19M
      switch payload.data {
148
1.19M
      case var .byteBuffer(buffer):
149
1.19M
        let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
150
1.19M
        switch action {
151
1.19M
        case .tryReading:
152
151k
          self.tryReadingMessage()
153
1.19M
154
1.19M
        case .finishHandler:
155
61.0k
          let handler = self.configurationState.tearDown()
156
61.0k
          handler?.finish()
157
1.19M
158
1.19M
        case .nothing:
159
981k
          ()
160
1.19M
        }
161
1.19M
162
1.19M
      case .fileRegion:
163
0
        preconditionFailure("Unexpected IOData.fileRegion")
164
1.19M
      }
165
1.49M
166
1.49M
    // Ignored.
167
1.49M
    case .alternativeService,
168
393
      .goAway,
169
393
      .origin,
170
393
      .ping,
171
393
      .priority,
172
393
      .pushPromise,
173
393
      .rstStream,
174
393
      .settings,
175
393
      .windowUpdate:
176
393
      ()
177
1.49M
    }
178
1.49M
  }
179
180
29.3k
  internal func channelReadComplete(context: ChannelHandlerContext) {
181
29.3k
    self.isReading = false
182
29.3k
183
29.3k
    if self.flushPending {
184
2.65k
      self.deliverPendingResponses()
185
2.65k
      self.flushPending = false
186
2.65k
      context.flush()
187
2.65k
    }
188
29.3k
189
29.3k
    context.fireChannelReadComplete()
190
29.3k
  }
191
192
179k
  private func deliverPendingResponses() {
193
179k
    while let (result, promise) = self.state.nextResponse() {
194
97.1k
      switch result {
195
97.1k
      case let .success(buffer):
196
97.1k
        let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
197
97.1k
        self.context.write(self.wrapOutboundOut(payload), promise: promise)
198
97.1k
      case let .failure(error):
199
0
        promise?.fail(error)
200
97.1k
      }
201
179k
    }
202
179k
  }
203
204
  /// Called when the pipeline has finished configuring.
205
174k
  private func configured() {
206
174k
    switch self.state.pipelineConfigured() {
207
174k
    case let .forwardHeaders(headers):
208
174k
      switch self.configurationState {
209
174k
      case .notConfigured:
210
0
        preconditionFailure()
211
174k
      case let .configured(handler):
212
174k
        handler.receiveMetadata(headers)
213
174k
      }
214
174k
215
174k
    case let .forwardHeadersAndRead(headers):
216
0
      switch self.configurationState {
217
0
      case .notConfigured:
218
0
        preconditionFailure()
219
0
      case let .configured(handler):
220
0
        handler.receiveMetadata(headers)
221
0
      }
222
0
      self.tryReadingMessage()
223
174k
    }
224
174k
  }
225
226
  /// Try to read a request message from the buffer.
227
228k
  private func tryReadingMessage() {
228
228k
    // This while loop exists to break the recursion in `.forwardMessageThenReadNextMessage`.
229
228k
    // Almost all cases return directly out of the loop.
230
1.94M
    while true {
231
1.94M
      let action = self.state.readNextRequest(
232
1.94M
        maxLength: self.maxReceiveMessageLength
233
1.94M
      )
234
1.94M
      switch action {
235
1.94M
      case .none:
236
101k
        return
237
1.94M
238
1.94M
      case let .forwardMessage(buffer):
239
43.2k
        switch self.configurationState {
240
43.2k
        case .notConfigured:
241
0
          preconditionFailure()
242
43.2k
        case let .configured(handler):
243
43.2k
          handler.receiveMessage(buffer)
244
43.2k
        }
245
43.2k
246
43.2k
        return
247
1.94M
248
1.94M
      case let .forwardMessageThenReadNextMessage(buffer):
249
1.71M
        switch self.configurationState {
250
1.71M
        case .notConfigured:
251
0
          preconditionFailure()
252
1.71M
        case let .configured(handler):
253
1.71M
          handler.receiveMessage(buffer)
254
1.71M
        }
255
1.71M
256
1.71M
        continue
257
1.94M
258
1.94M
      case .forwardEnd:
259
76.3k
        switch self.configurationState {
260
76.3k
        case .notConfigured:
261
0
          preconditionFailure()
262
76.3k
        case let .configured(handler):
263
76.3k
          handler.receiveEnd()
264
76.3k
        }
265
76.3k
266
76.3k
        return
267
1.94M
268
1.94M
      case let .errorCaught(error):
269
7.88k
        switch self.configurationState {
270
7.88k
        case .notConfigured:
271
0
          preconditionFailure()
272
7.88k
        case let .configured(handler):
273
7.88k
          handler.receiveError(error)
274
7.88k
        }
275
7.88k
276
7.88k
        return
277
1.94M
      }
278
1.94M
    }
279
0
  }
280
281
  internal func sendMetadata(
282
    _ headers: HPACKHeaders,
283
    flush: Bool,
284
    promise: EventLoopPromise<Void>?
285
121k
  ) {
286
121k
    switch self.state.send(headers: headers) {
287
121k
    case let .success(headers):
288
121k
      let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
289
121k
      self.context.write(self.wrapOutboundOut(payload), promise: promise)
290
121k
      if flush {
291
91.4k
        self.markFlushPoint()
292
91.4k
      }
293
121k
294
121k
    case let .failure(error):
295
0
      promise?.fail(error)
296
121k
    }
297
121k
  }
298
299
  internal func sendMessage(
300
    _ buffer: ByteBuffer,
301
    metadata: MessageMetadata,
302
    promise: EventLoopPromise<Void>?
303
1.33M
  ) {
304
1.33M
    let result = self.state.send(
305
1.33M
      buffer: buffer,
306
1.33M
      compress: metadata.compress,
307
1.33M
      promise: promise
308
1.33M
    )
309
1.33M
310
1.33M
    switch result {
311
1.33M
    case .success:
312
1.33M
      if metadata.flush {
313
1.21M
        self.markFlushPoint()
314
1.21M
      }
315
1.33M
316
1.33M
    case let .failure(error):
317
0
      promise?.fail(error)
318
1.33M
    }
319
1.33M
  }
320
321
  internal func sendEnd(
322
    status: GRPCStatus,
323
    trailers: HPACKHeaders,
324
    promise: EventLoopPromise<Void>?
325
121k
  ) {
326
121k
    // About to end the stream: send any pending responses.
327
121k
    self.deliverPendingResponses()
328
121k
329
121k
    switch self.state.send(status: status, trailers: trailers) {
330
121k
    case let .sendTrailers(trailers):
331
62.6k
      self.sendTrailers(trailers, promise: promise)
332
121k
333
121k
    case let .sendTrailersAndFinish(trailers):
334
58.5k
      self.sendTrailers(trailers, promise: promise)
335
58.5k
336
58.5k
      // 'finish' the handler.
337
58.5k
      let handler = self.configurationState.tearDown()
338
58.5k
      handler?.finish()
339
121k
340
121k
    case let .failure(error):
341
0
      promise?.fail(error)
342
121k
    }
343
121k
  }
344
345
174k
  private func sendTrailers(_ trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
346
174k
    // Always end stream for status and trailers.
347
174k
    let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
348
174k
    self.context.write(self.wrapOutboundOut(payload), promise: promise)
349
174k
    // We'll always flush on end.
350
174k
    self.markFlushPoint()
351
174k
  }
352
353
  /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
354
  /// or emit a flush now if we are not.
355
1.87M
  private func markFlushPoint() {
356
1.87M
    if self.isReading {
357
1.87M
      self.flushPending = true
358
1.87M
    } else {
359
742
      // About to flush: send any pending responses.
360
742
      self.deliverPendingResponses()
361
742
      self.flushPending = false
362
742
      self.context.flush()
363
742
    }
364
1.87M
  }
365
}