Coverage Report

Created: 2025-06-24 06:59

/src/grpc-swift/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP2
20
21
internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter {
22
  typealias InboundIn = HTTP2Frame.FramePayload
23
  typealias OutboundOut = HTTP2Frame.FramePayload
24
25
  private var logger: Logger
26
  private var state: HTTP2ToRawGRPCStateMachine
27
  private let errorDelegate: ServerErrorDelegate?
28
  private var context: ChannelHandlerContext!
29
30
  private let servicesByName: [Substring: CallHandlerProvider]
31
  private let encoding: ServerMessageEncoding
32
  private let normalizeHeaders: Bool
33
  private let maxReceiveMessageLength: Int
34
35
  /// The configuration state of the handler.
36
49.0k
  private var configurationState: Configuration = .notConfigured
37
38
  /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a
39
  /// burst of reading has completed.
40
49.0k
  private var isReading = false
41
42
  /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true`
43
  /// then it is held until the read completes in order to elide unnecessary flushes.
44
49.0k
  private var flushPending = false
45
46
  private enum Configuration {
47
    case notConfigured
48
    case configured(GRPCServerHandlerProtocol)
49
50
132k
    var isConfigured: Bool {
51
132k
      switch self {
52
132k
      case .configured:
53
0
        return true
54
132k
      case .notConfigured:
55
132k
        return false
56
132k
      }
57
132k
    }
58
59
207k
    mutating func tearDown() -> GRPCServerHandlerProtocol? {
60
207k
      switch self {
61
207k
      case .notConfigured:
62
46.2k
        return nil
63
207k
      case let .configured(handler):
64
161k
        self = .notConfigured
65
161k
        return handler
66
207k
      }
67
207k
    }
68
  }
69
70
  init(
71
    servicesByName: [Substring: CallHandlerProvider],
72
    encoding: ServerMessageEncoding,
73
    errorDelegate: ServerErrorDelegate?,
74
    normalizeHeaders: Bool,
75
    maximumReceiveMessageLength: Int,
76
    logger: Logger
77
32.2k
  ) {
78
32.2k
    self.logger = logger
79
32.2k
    self.errorDelegate = errorDelegate
80
32.2k
    self.servicesByName = servicesByName
81
32.2k
    self.encoding = encoding
82
32.2k
    self.normalizeHeaders = normalizeHeaders
83
32.2k
    self.maxReceiveMessageLength = maximumReceiveMessageLength
84
32.2k
    self.state = HTTP2ToRawGRPCStateMachine()
85
32.2k
  }
86
87
32.2k
  internal func handlerAdded(context: ChannelHandlerContext) {
88
32.2k
    self.context = context
89
32.2k
  }
90
91
32.2k
  internal func handlerRemoved(context: ChannelHandlerContext) {
92
32.2k
    self.context = nil
93
32.2k
    self.configurationState = .notConfigured
94
32.2k
  }
95
96
10.4k
  internal func errorCaught(context: ChannelHandlerContext, error: Error) {
97
10.4k
    switch self.configurationState {
98
10.4k
    case .notConfigured:
99
8.89k
      context.close(mode: .all, promise: nil)
100
10.4k
    case let .configured(hander):
101
1.60k
      hander.receiveError(error)
102
10.4k
    }
103
10.4k
  }
104
105
32.2k
  internal func channelInactive(context: ChannelHandlerContext) {
106
32.2k
    if let handler = self.configurationState.tearDown() {
107
1.60k
      handler.finish()
108
32.2k
    } else {
109
30.6k
      context.fireChannelInactive()
110
32.2k
    }
111
32.2k
  }
112
113
939k
  internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
114
939k
    self.isReading = true
115
939k
    let payload = self.unwrapInboundIn(data)
116
939k
117
939k
    switch payload {
118
939k
    case let .headers(payload):
119
254k
      let receiveHeaders = self.state.receive(
120
254k
        headers: payload.headers,
121
254k
        eventLoop: context.eventLoop,
122
254k
        errorDelegate: self.errorDelegate,
123
254k
        remoteAddress: context.channel.remoteAddress,
124
254k
        logger: self.logger,
125
254k
        allocator: context.channel.allocator,
126
254k
        responseWriter: self,
127
254k
        closeFuture: context.channel.closeFuture,
128
254k
        services: self.servicesByName,
129
254k
        encoding: self.encoding,
130
254k
        normalizeHeaders: self.normalizeHeaders
131
254k
      )
132
254k
133
254k
      switch receiveHeaders {
134
254k
      case let .configure(handler):
135
105k
        assert(!self.configurationState.isConfigured)
136
105k
        self.configurationState = .configured(handler)
137
105k
        self.configured()
138
254k
139
254k
      case let .rejectRPC(trailers):
140
148k
        assert(!self.configurationState.isConfigured)
141
148k
        // We're not handling this request: write headers and end stream.
142
148k
        let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
143
148k
        context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil)
144
939k
      }
145
939k
146
939k
    case let .data(payload):
147
685k
      switch payload.data {
148
685k
      case var .byteBuffer(buffer):
149
685k
        let action = self.state.receive(buffer: &buffer, endStream: payload.endStream)
150
685k
        switch action {
151
685k
        case .tryReading:
152
129k
          self.tryReadingMessage()
153
685k
154
685k
        case .finishHandler:
155
30.4k
          let handler = self.configurationState.tearDown()
156
30.4k
          handler?.finish()
157
685k
158
685k
        case .nothing:
159
525k
          ()
160
685k
        }
161
685k
162
685k
      case .fileRegion:
163
0
        preconditionFailure("Unexpected IOData.fileRegion")
164
939k
      }
165
939k
166
939k
    // Ignored.
167
939k
    case .alternativeService,
168
403
      .goAway,
169
403
      .origin,
170
403
      .ping,
171
403
      .priority,
172
403
      .pushPromise,
173
403
      .rstStream,
174
403
      .settings,
175
403
      .windowUpdate:
176
403
      ()
177
939k
    }
178
939k
  }
179
180
33.2k
  internal func channelReadComplete(context: ChannelHandlerContext) {
181
33.2k
    self.isReading = false
182
33.2k
183
33.2k
    if self.flushPending {
184
2.64k
      self.deliverPendingResponses()
185
2.64k
      self.flushPending = false
186
2.64k
      context.flush()
187
33.2k
    }
188
33.2k
189
33.2k
    context.fireChannelReadComplete()
190
33.2k
  }
191
192
166k
  private func deliverPendingResponses() {
193
166k
    while let (result, promise) = self.state.nextResponse() {
194
106k
      switch result {
195
106k
      case let .success(buffer):
196
106k
        let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
197
106k
        self.context.write(self.wrapOutboundOut(payload), promise: promise)
198
106k
      case let .failure(error):
199
0
        promise?.fail(error)
200
106k
      }
201
166k
    }
202
166k
  }
203
204
  /// Called when the pipeline has finished configuring.
205
161k
  private func configured() {
206
161k
    switch self.state.pipelineConfigured() {
207
161k
    case let .forwardHeaders(headers):
208
161k
      switch self.configurationState {
209
161k
      case .notConfigured:
210
0
        preconditionFailure()
211
161k
      case let .configured(handler):
212
161k
        handler.receiveMetadata(headers)
213
161k
      }
214
161k
215
161k
    case let .forwardHeadersAndRead(headers):
216
0
      switch self.configurationState {
217
0
      case .notConfigured:
218
0
        preconditionFailure()
219
0
      case let .configured(handler):
220
0
        handler.receiveMetadata(headers)
221
0
      }
222
0
      self.tryReadingMessage()
223
161k
    }
224
161k
  }
225
226
  /// Try to read a request message from the buffer.
227
207k
  private func tryReadingMessage() {
228
207k
    // This while loop exists to break the recursion in `.forwardMessageThenReadNextMessage`.
229
207k
    // Almost all cases return directly out of the loop.
230
1.93M
    while true {
231
1.93M
      let action = self.state.readNextRequest(
232
1.93M
        maxLength: self.maxReceiveMessageLength
233
1.93M
      )
234
1.93M
      switch action {
235
1.93M
      case .none:
236
75.2k
        return
237
1.93M
238
1.93M
      case let .forwardMessage(buffer):
239
16.6k
        switch self.configurationState {
240
16.6k
        case .notConfigured:
241
0
          preconditionFailure()
242
16.6k
        case let .configured(handler):
243
16.6k
          handler.receiveMessage(buffer)
244
16.6k
        }
245
16.6k
246
16.6k
        return
247
1.93M
248
1.93M
      case let .forwardMessageThenReadNextMessage(buffer):
249
1.73M
        switch self.configurationState {
250
1.73M
        case .notConfigured:
251
0
          preconditionFailure()
252
1.73M
        case let .configured(handler):
253
1.73M
          handler.receiveMessage(buffer)
254
1.73M
        }
255
1.73M
256
1.73M
        continue
257
1.93M
258
1.93M
      case .forwardEnd:
259
108k
        switch self.configurationState {
260
108k
        case .notConfigured:
261
0
          preconditionFailure()
262
108k
        case let .configured(handler):
263
108k
          handler.receiveEnd()
264
108k
        }
265
108k
266
108k
        return
267
1.93M
268
1.93M
      case let .errorCaught(error):
269
7.08k
        switch self.configurationState {
270
7.08k
        case .notConfigured:
271
0
          preconditionFailure()
272
7.08k
        case let .configured(handler):
273
7.08k
          handler.receiveError(error)
274
7.08k
        }
275
7.08k
276
7.08k
        return
277
1.93M
      }
278
1.93M
    }
279
0
  }
280
281
  internal func sendMetadata(
282
    _ headers: HPACKHeaders,
283
    flush: Bool,
284
    promise: EventLoopPromise<Void>?
285
105k
  ) {
286
105k
    switch self.state.send(headers: headers) {
287
105k
    case let .success(headers):
288
105k
      let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
289
105k
      self.context.write(self.wrapOutboundOut(payload), promise: promise)
290
105k
      if flush {
291
93.1k
        self.markFlushPoint()
292
105k
      }
293
105k
294
105k
    case let .failure(error):
295
0
      promise?.fail(error)
296
105k
    }
297
105k
  }
298
299
  internal func sendMessage(
300
    _ buffer: ByteBuffer,
301
    metadata: MessageMetadata,
302
    promise: EventLoopPromise<Void>?
303
859k
  ) {
304
859k
    let result = self.state.send(
305
859k
      buffer: buffer,
306
859k
      compress: metadata.compress,
307
859k
      promise: promise
308
859k
    )
309
859k
310
859k
    switch result {
311
859k
    case .success:
312
859k
      if metadata.flush {
313
756k
        self.markFlushPoint()
314
859k
      }
315
859k
316
859k
    case let .failure(error):
317
0
      promise?.fail(error)
318
859k
    }
319
859k
  }
320
321
  internal func sendEnd(
322
    status: GRPCStatus,
323
    trailers: HPACKHeaders,
324
    promise: EventLoopPromise<Void>?
325
105k
  ) {
326
105k
    // About to end the stream: send any pending responses.
327
105k
    self.deliverPendingResponses()
328
105k
329
105k
    switch self.state.send(status: status, trailers: trailers) {
330
105k
    case let .sendTrailers(trailers):
331
32.0k
      self.sendTrailers(trailers, promise: promise)
332
105k
333
105k
    case let .sendTrailersAndFinish(trailers):
334
73.3k
      self.sendTrailers(trailers, promise: promise)
335
73.3k
336
73.3k
      // 'finish' the handler.
337
73.3k
      let handler = self.configurationState.tearDown()
338
73.3k
      handler?.finish()
339
105k
340
105k
    case let .failure(error):
341
0
      promise?.fail(error)
342
105k
    }
343
105k
  }
344
345
161k
  private func sendTrailers(_ trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
346
161k
    // Always end stream for status and trailers.
347
161k
    let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
348
161k
    self.context.write(self.wrapOutboundOut(payload), promise: promise)
349
161k
    // We'll always flush on end.
350
161k
    self.markFlushPoint()
351
161k
  }
352
353
  /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
354
  /// or emit a flush now if we are not.
355
1.39M
  private func markFlushPoint() {
356
1.39M
    if self.isReading {
357
1.39M
      self.flushPending = true
358
1.39M
    } else {
359
770
      // About to flush: send any pending responses.
360
770
      self.deliverPendingResponses()
361
770
      self.flushPending = false
362
770
      self.context.flush()
363
1.39M
    }
364
1.39M
  }
365
}