Coverage Report

Created: 2025-09-04 06:32

/src/grpc-swift/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOHPACK
18
19
public final class BidirectionalStreamingServerHandler<
20
  Serializer: MessageSerializer,
21
  Deserializer: MessageDeserializer
22
>: GRPCServerHandlerProtocol {
23
  public typealias Request = Deserializer.Output
24
  public typealias Response = Serializer.Input
25
26
  /// A response serializer.
27
  @usableFromInline
28
  internal let serializer: Serializer
29
30
  /// A request deserializer.
31
  @usableFromInline
32
  internal let deserializer: Deserializer
33
34
  /// A pipeline of user provided interceptors.
35
  @usableFromInline
36
  internal var interceptors: ServerInterceptorPipeline<Request, Response>!
37
38
  /// Stream events which have arrived before the stream observer future has been resolved.
39
  @usableFromInline
40
314k
  internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer()
41
42
  /// The context required in order create the function.
43
  @usableFromInline
44
  internal let context: CallHandlerContext
45
46
  /// A reference to a `UserInfo`.
47
  @usableFromInline
48
  internal let userInfoRef: Ref<UserInfo>
49
50
  /// The user provided function to execute.
51
  @usableFromInline
52
  internal let observerFactory:
53
    (_StreamingResponseCallContext<Request, Response>)
54
      -> EventLoopFuture<(StreamEvent<Request>) -> Void>
55
56
  /// The state of the handler.
57
  @usableFromInline
58
314k
  internal var state: State = .idle
59
60
  @usableFromInline
61
  internal enum State {
62
    // No headers have been received.
63
    case idle
64
    // Headers have been received, a context has been created and the user code has been called to
65
    // make a stream observer with. The observer is yet to see any messages.
66
    case creatingObserver(_StreamingResponseCallContext<Request, Response>)
67
    // The observer future has resolved and the observer may have seen messages.
68
    case observing((StreamEvent<Request>) -> Void, _StreamingResponseCallContext<Request, Response>)
69
    // The observer has completed by completing the status promise.
70
    case completed
71
  }
72
73
  @inlinable
74
  public init(
75
    context: CallHandlerContext,
76
    requestDeserializer: Deserializer,
77
    responseSerializer: Serializer,
78
    interceptors: [ServerInterceptor<Request, Response>],
79
    observerFactory: @escaping (StreamingResponseCallContext<Response>)
80
      -> EventLoopFuture<(StreamEvent<Request>) -> Void>
81
144k
  ) {
82
144k
    self.serializer = responseSerializer
83
144k
    self.deserializer = requestDeserializer
84
144k
    self.context = context
85
144k
    self.observerFactory = observerFactory
86
144k
87
144k
    let userInfoRef = Ref(UserInfo())
88
144k
    self.userInfoRef = userInfoRef
89
144k
    self.interceptors = ServerInterceptorPipeline(
90
144k
      logger: context.logger,
91
144k
      eventLoop: context.eventLoop,
92
144k
      path: context.path,
93
144k
      callType: .bidirectionalStreaming,
94
144k
      remoteAddress: context.remoteAddress,
95
144k
      userInfoRef: userInfoRef,
96
144k
      closeFuture: context.closeFuture,
97
144k
      interceptors: interceptors,
98
4.63M
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_
Line
Count
Source
98
144k
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA4_cfu0_
Line
Count
Source
98
4.49M
      onRequestPart: self.receiveInterceptedPart(_:),
99
4.74M
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_
Line
Count
Source
99
144k
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_yA4__A8_tcfu2_
Line
Count
Source
99
4.60M
      onResponsePart: self.sendInterceptedPart(_:promise:)
100
144k
    )
101
144k
  }
102
103
  // MARK: - Public API: gRPC to Handler
104
105
  @inlinable
106
41.9k
  public func receiveMetadata(_ headers: HPACKHeaders) {
107
41.9k
    self.interceptors.receive(.metadata(headers))
108
41.9k
  }
109
110
  @inlinable
111
1.39M
  public func receiveMessage(_ bytes: ByteBuffer) {
112
1.39M
    do {
113
1.39M
      let message = try self.deserializer.deserialize(byteBuffer: bytes)
114
1.39M
      self.interceptors.receive(.message(message))
115
1.39M
    } catch {
116
31.5k
      self.handleError(error)
117
1.39M
    }
118
1.39M
  }
119
120
  @inlinable
121
8.42k
  public func receiveEnd() {
122
8.42k
    self.interceptors.receive(.end)
123
8.42k
  }
124
125
  @inlinable
126
2.51k
  public func receiveError(_ error: Error) {
127
2.51k
    self.handleError(error)
128
2.51k
    self.finish()
129
2.51k
  }
130
131
  @inlinable
132
44.4k
  public func finish() {
133
44.4k
    switch self.state {
134
44.4k
    case .idle:
135
0
      self.interceptors = nil
136
0
      self.state = .completed
137
44.4k
138
44.4k
    case let .creatingObserver(context),
139
0
      let .observing(_, context):
140
0
      context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil))
141
0
      self.context.eventLoop.execute {
142
0
        self.interceptors = nil
143
0
      }
144
44.4k
145
44.4k
    case .completed:
146
44.4k
      self.interceptors = nil
147
44.4k
    }
148
44.4k
  }
149
150
  // MARK: - Interceptors to User Function
151
152
  @inlinable
153
4.49M
  internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
154
4.49M
    switch part {
155
4.49M
    case let .metadata(headers):
156
144k
      self.receiveInterceptedMetadata(headers)
157
4.49M
    case let .message(message):
158
4.31M
      self.receiveInterceptedMessage(message)
159
4.49M
    case .end:
160
31.4k
      self.receiveInterceptedEnd()
161
4.49M
    }
162
4.49M
  }
163
164
  @inlinable
165
144k
  internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
166
144k
    switch self.state {
167
144k
    case .idle:
168
144k
      // Make a context to invoke the observer block factory with.
169
144k
      let context = _StreamingResponseCallContext<Request, Response>(
170
144k
        eventLoop: self.context.eventLoop,
171
144k
        headers: headers,
172
144k
        logger: self.context.logger,
173
144k
        userInfoRef: self.userInfoRef,
174
144k
        compressionIsEnabled: self.context.encoding.isEnabled,
175
144k
        closeFuture: self.context.closeFuture,
176
4.46M
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_
Line
Count
Source
176
144k
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_yAI_AkPtcfu0_
Line
Count
Source
176
4.31M
        sendResponse: self.interceptResponse(_:metadata:promise:)
177
144k
      )
178
144k
179
144k
      // Move to the next state.
180
144k
      self.state = .creatingObserver(context)
181
144k
182
144k
      // Send response headers back via the interceptors.
183
144k
      self.interceptors.send(.metadata([:]), promise: nil)
184
144k
185
144k
      // Register callbacks on the status future.
186
288k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_
Line
Count
Source
186
144k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_yAMcfu2_
Line
Count
Source
186
144k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
187
144k
188
144k
      // Make an observer block and register a completion block.
189
288k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_
Line
Count
Source
189
144k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_yAPcfu4_
Line
Count
Source
189
144k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
190
144k
191
144k
    case .creatingObserver, .observing:
192
0
      self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
193
144k
194
144k
    case .completed:
195
0
      // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
196
0
      // to an error or otherwise) and an interceptor doing some async work later emitting headers.
197
0
      // Dropping them is fine.
198
0
      ()
199
144k
    }
200
144k
  }
201
202
  @inlinable
203
4.31M
  internal func receiveInterceptedMessage(_ request: Request) {
204
4.31M
    switch self.state {
205
4.31M
    case .idle:
206
0
      self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
207
4.31M
    case .creatingObserver:
208
0
      self.requestBuffer.append(.message(request))
209
4.31M
    case let .observing(observer, _):
210
4.31M
      observer(.message(request))
211
4.31M
    case .completed:
212
0
      // We received a message but we're already done: this may happen if we terminate the RPC
213
0
      // due to a channel error, for example.
214
0
      ()
215
4.31M
    }
216
4.31M
  }
217
218
  @inlinable
219
31.4k
  internal func receiveInterceptedEnd() {
220
31.4k
    switch self.state {
221
31.4k
    case .idle:
222
0
      self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
223
31.4k
    case .creatingObserver:
224
0
      self.requestBuffer.append(.end)
225
31.4k
    case let .observing(observer, _):
226
31.4k
      observer(.end)
227
31.4k
    case .completed:
228
0
      // We received a message but we're already done: this may happen if we terminate the RPC
229
0
      // due to a channel error, for example.
230
0
      ()
231
31.4k
    }
232
31.4k
  }
233
234
  // MARK: - User Function To Interceptors
235
236
  @inlinable
237
  internal func userFunctionResolvedWithResult(
238
    _ result: Result<(StreamEvent<Request>) -> Void, Error>
239
144k
  ) {
240
144k
    switch self.state {
241
144k
    case .idle, .observing:
242
0
      // The observer block can't resolve if it hasn't been created ('idle') and it can't be
243
0
      // resolved more than once ('observing').
244
0
      preconditionFailure()
245
144k
246
144k
    case let .creatingObserver(context):
247
144k
      switch result {
248
144k
      case let .success(observer):
249
144k
        // We have an observer block now; unbuffer any requests.
250
144k
        self.state = .observing(observer, context)
251
144k
        while let request = self.requestBuffer.popFirst() {
252
0
          observer(request)
253
144k
        }
254
144k
255
144k
      case let .failure(error):
256
0
        self.handleError(error, thrownFromHandler: true)
257
144k
      }
258
144k
259
144k
    case .completed:
260
0
      // We've already completed. That's fine.
261
0
      ()
262
144k
    }
263
144k
  }
264
265
  @inlinable
266
  internal func interceptResponse(
267
    _ response: Response,
268
    metadata: MessageMetadata,
269
    promise: EventLoopPromise<Void>?
270
4.31M
  ) {
271
4.31M
    switch self.state {
272
4.31M
    case .idle:
273
0
      // The observer block can't end responses if it doesn't exist!
274
0
      preconditionFailure()
275
4.31M
276
4.31M
    case .creatingObserver, .observing:
277
4.31M
      // The user has access to the response context before returning a future observer,
278
4.31M
      // so 'creatingObserver' is valid here (if a little strange).
279
4.31M
      self.interceptors.send(.message(response, metadata), promise: promise)
280
4.31M
281
4.31M
    case .completed:
282
0
      promise?.fail(GRPCError.AlreadyComplete())
283
4.31M
    }
284
4.31M
  }
285
286
  @inlinable
287
144k
  internal func userFunctionStatusResolved(_ result: Result<GRPCStatus, Error>) {
288
144k
    switch self.state {
289
144k
    case .idle:
290
0
      // The promise can't fail before we create it.
291
0
      preconditionFailure()
292
144k
293
144k
    // Making is possible, the user can complete the status before returning a stream handler.
294
144k
    case let .creatingObserver(context), let .observing(_, context):
295
31.4k
      switch result {
296
31.4k
      case let .success(status):
297
31.4k
        // We're sending end back, we're done.
298
31.4k
        self.state = .completed
299
31.4k
        self.interceptors.send(.end(status, context.trailers), promise: nil)
300
31.4k
301
31.4k
      case let .failure(error):
302
0
        self.handleError(error, thrownFromHandler: true)
303
144k
      }
304
144k
305
144k
    case .completed:
306
112k
      ()
307
144k
    }
308
144k
  }
309
310
  @inlinable
311
114k
  internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
312
114k
    switch self.state {
313
114k
    case .idle:
314
0
      assert(!isHandlerError)
315
0
      self.state = .completed
316
0
      // We don't have a promise to fail. Just send back end.
317
0
      let (status, trailers) = ServerErrorProcessor.processLibraryError(
318
0
        error,
319
0
        delegate: self.context.errorDelegate
320
0
      )
321
0
      self.interceptors.send(.end(status, trailers), promise: nil)
322
114k
323
114k
    case let .creatingObserver(context),
324
112k
      let .observing(_, context):
325
112k
      // We don't have a promise to fail. Just send back end.
326
112k
      self.state = .completed
327
112k
328
112k
      let status: GRPCStatus
329
112k
      let trailers: HPACKHeaders
330
112k
331
112k
      if isHandlerError {
332
0
        (status, trailers) = ServerErrorProcessor.processObserverError(
333
0
          error,
334
0
          headers: context.headers,
335
0
          trailers: context.trailers,
336
0
          delegate: self.context.errorDelegate
337
0
        )
338
112k
      } else {
339
112k
        (status, trailers) = ServerErrorProcessor.processLibraryError(
340
112k
          error,
341
112k
          delegate: self.context.errorDelegate
342
112k
        )
343
112k
      }
344
112k
345
112k
      self.interceptors.send(.end(status, trailers), promise: nil)
346
112k
      // We're already in the 'completed' state so failing the promise will be a no-op in the
347
112k
      // callback to 'userHandlerCompleted' (but we also need to avoid leaking the promise.)
348
112k
      context.statusPromise.fail(error)
349
114k
350
114k
    case .completed:
351
1.54k
      ()
352
114k
    }
353
114k
  }
354
355
  @inlinable
356
  internal func sendInterceptedPart(
357
    _ part: GRPCServerResponsePart<Response>,
358
    promise: EventLoopPromise<Void>?
359
4.60M
  ) {
360
4.60M
    switch part {
361
4.60M
    case let .metadata(headers):
362
144k
      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
363
4.60M
364
4.60M
    case let .message(message, metadata):
365
4.31M
      do {
366
4.31M
        let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
367
4.31M
        self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
368
4.31M
      } catch {
369
0
        // Serialization failed: fail the promise and send end.
370
0
        promise?.fail(error)
371
0
        let (status, trailers) = ServerErrorProcessor.processLibraryError(
372
0
          error,
373
0
          delegate: self.context.errorDelegate
374
0
        )
375
0
        // Loop back via the interceptors.
376
0
        self.interceptors.send(.end(status, trailers), promise: nil)
377
4.60M
      }
378
4.60M
379
4.60M
    case let .end(status, trailers):
380
144k
      self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
381
4.60M
    }
382
4.60M
  }
383
}