Coverage Report

Created: 2026-06-15 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift
Line
Count
Source
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOHPACK
18
19
public final class BidirectionalStreamingServerHandler<
20
  Serializer: MessageSerializer,
21
  Deserializer: MessageDeserializer
22
>: GRPCServerHandlerProtocol {
23
  public typealias Request = Deserializer.Output
24
  public typealias Response = Serializer.Input
25
26
  /// A response serializer.
27
  @usableFromInline
28
  internal let serializer: Serializer
29
30
  /// A request deserializer.
31
  @usableFromInline
32
  internal let deserializer: Deserializer
33
34
  /// A pipeline of user provided interceptors.
35
  @usableFromInline
36
  internal var interceptors: ServerInterceptorPipeline<Request, Response>!
37
38
  /// Stream events which have arrived before the stream observer future has been resolved.
39
  @usableFromInline
40
129k
  internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer()
41
42
  /// The context required in order create the function.
43
  @usableFromInline
44
  internal let context: CallHandlerContext
45
46
  /// A reference to a `UserInfo`.
47
  @usableFromInline
48
  internal let userInfoRef: Ref<UserInfo>
49
50
  /// The user provided function to execute.
51
  @usableFromInline
52
  internal let observerFactory:
53
    (_StreamingResponseCallContext<Request, Response>)
54
      -> EventLoopFuture<(StreamEvent<Request>) -> Void>
55
56
  /// The state of the handler.
57
  @usableFromInline
58
129k
  internal var state: State = .idle
59
60
  @usableFromInline
61
  internal enum State {
62
    // No headers have been received.
63
    case idle
64
    // Headers have been received, a context has been created and the user code has been called to
65
    // make a stream observer with. The observer is yet to see any messages.
66
    case creatingObserver(_StreamingResponseCallContext<Request, Response>)
67
    // The observer future has resolved and the observer may have seen messages.
68
    case observing((StreamEvent<Request>) -> Void, _StreamingResponseCallContext<Request, Response>)
69
    // The observer has completed by completing the status promise.
70
    case completed
71
  }
72
73
  @inlinable
74
  public init(
75
    context: CallHandlerContext,
76
    requestDeserializer: Deserializer,
77
    responseSerializer: Serializer,
78
    interceptors: [ServerInterceptor<Request, Response>],
79
    observerFactory:
80
      @escaping (StreamingResponseCallContext<Response>)
81
      -> EventLoopFuture<(StreamEvent<Request>) -> Void>
82
57.4k
  ) {
83
57.4k
    self.serializer = responseSerializer
84
57.4k
    self.deserializer = requestDeserializer
85
57.4k
    self.context = context
86
57.4k
    self.observerFactory = observerFactory
87
57.4k
88
57.4k
    let userInfoRef = Ref(UserInfo())
89
57.4k
    self.userInfoRef = userInfoRef
90
57.4k
    self.interceptors = ServerInterceptorPipeline(
91
57.4k
      logger: context.logger,
92
57.4k
      eventLoop: context.eventLoop,
93
57.4k
      path: context.path,
94
57.4k
      callType: .bidirectionalStreaming,
95
57.4k
      remoteAddress: context.remoteAddress,
96
57.4k
      userInfoRef: userInfoRef,
97
57.4k
      closeFuture: context.closeFuture,
98
57.4k
      interceptors: interceptors,
99
3.92M
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_
Line
Count
Source
99
57.4k
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA4_cfu0_
Line
Count
Source
99
3.86M
      onRequestPart: self.receiveInterceptedPart(_:),
100
3.95M
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_
Line
Count
Source
100
57.4k
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_yA4__A8_tcfu2_
Line
Count
Source
100
3.89M
      onResponsePart: self.sendInterceptedPart(_:promise:)
101
57.4k
    )
102
57.4k
  }
103
104
  // MARK: - Public API: gRPC to Handler
105
106
  @inlinable
107
14.3k
  public func receiveMetadata(_ headers: HPACKHeaders) {
108
14.3k
    self.interceptors.receive(.metadata(headers))
109
14.3k
  }
110
111
  @inlinable
112
950k
  public func receiveMessage(_ bytes: ByteBuffer) {
113
950k
    do {
114
950k
      let message = try self.deserializer.deserialize(byteBuffer: bytes)
115
944k
      self.interceptors.receive(.message(message))
116
944k
    } catch {
117
5.73k
      self.handleError(error)
118
950k
    }
119
950k
  }
120
121
  @inlinable
122
7.20k
  public func receiveEnd() {
123
7.20k
    self.interceptors.receive(.end)
124
7.20k
  }
125
126
  @inlinable
127
1.68k
  public func receiveError(_ error: Error) {
128
1.68k
    self.handleError(error)
129
1.68k
    self.finish()
130
1.68k
  }
131
132
  @inlinable
133
16.0k
  public func finish() {
134
16.0k
    switch self.state {
135
16.0k
    case .idle:
136
0
      self.interceptors = nil
137
0
      self.state = .completed
138
16.0k
139
16.0k
    case let .creatingObserver(context),
140
0
      let .observing(_, context):
141
0
      context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil))
142
0
      self.context.eventLoop.execute {
143
0
        self.interceptors = nil
144
0
      }
145
16.0k
146
16.0k
    case .completed:
147
16.0k
      self.interceptors = nil
148
16.0k
    }
149
16.0k
  }
150
151
  // MARK: - Interceptors to User Function
152
153
  @inlinable
154
3.86M
  internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
155
3.86M
    switch part {
156
3.86M
    case let .metadata(headers):
157
57.4k
      self.receiveInterceptedMetadata(headers)
158
3.86M
    case let .message(message):
159
3.77M
      self.receiveInterceptedMessage(message)
160
3.86M
    case .end:
161
28.8k
      self.receiveInterceptedEnd()
162
3.86M
    }
163
3.86M
  }
164
165
  @inlinable
166
57.4k
  internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
167
57.4k
    switch self.state {
168
57.4k
    case .idle:
169
57.4k
      // Make a context to invoke the observer block factory with.
170
57.4k
      let context = _StreamingResponseCallContext<Request, Response>(
171
57.4k
        eventLoop: self.context.eventLoop,
172
57.4k
        headers: headers,
173
57.4k
        logger: self.context.logger,
174
57.4k
        userInfoRef: self.userInfoRef,
175
57.4k
        compressionIsEnabled: self.context.encoding.isEnabled,
176
57.4k
        closeFuture: self.context.closeFuture,
177
3.83M
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_
Line
Count
Source
177
57.4k
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_yAI_AkPtcfu0_
Line
Count
Source
177
3.77M
        sendResponse: self.interceptResponse(_:metadata:promise:)
178
57.4k
      )
179
57.4k
180
57.4k
      // Move to the next state.
181
57.4k
      self.state = .creatingObserver(context)
182
57.4k
183
57.4k
      // Send response headers back via the interceptors.
184
57.4k
      self.interceptors.send(.metadata([:]), promise: nil)
185
57.4k
186
57.4k
      // Register callbacks on the status future.
187
114k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_
Line
Count
Source
187
57.4k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_yAMcfu2_
Line
Count
Source
187
57.4k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
188
57.4k
189
57.4k
      // Make an observer block and register a completion block.
190
114k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_
Line
Count
Source
190
57.4k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_yAPcfu4_
Line
Count
Source
190
57.4k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
191
57.4k
192
57.4k
    case .creatingObserver, .observing:
193
0
      self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
194
57.4k
195
57.4k
    case .completed:
196
0
      // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
197
0
      // to an error or otherwise) and an interceptor doing some async work later emitting headers.
198
0
      // Dropping them is fine.
199
0
      ()
200
57.4k
    }
201
57.4k
  }
202
203
  @inlinable
204
3.77M
  internal func receiveInterceptedMessage(_ request: Request) {
205
3.77M
    switch self.state {
206
3.77M
    case .idle:
207
0
      self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
208
3.77M
    case .creatingObserver:
209
0
      self.requestBuffer.append(.message(request))
210
3.77M
    case let .observing(observer, _):
211
3.77M
      observer(.message(request))
212
3.77M
    case .completed:
213
0
      // We received a message but we're already done: this may happen if we terminate the RPC
214
0
      // due to a channel error, for example.
215
0
      ()
216
3.77M
    }
217
3.77M
  }
218
219
  @inlinable
220
28.8k
  internal func receiveInterceptedEnd() {
221
28.8k
    switch self.state {
222
28.8k
    case .idle:
223
0
      self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
224
28.8k
    case .creatingObserver:
225
0
      self.requestBuffer.append(.end)
226
28.8k
    case let .observing(observer, _):
227
28.8k
      observer(.end)
228
28.8k
    case .completed:
229
0
      // We received a message but we're already done: this may happen if we terminate the RPC
230
0
      // due to a channel error, for example.
231
0
      ()
232
28.8k
    }
233
28.8k
  }
234
235
  // MARK: - User Function To Interceptors
236
237
  @inlinable
238
  internal func userFunctionResolvedWithResult(
239
    _ result: Result<(StreamEvent<Request>) -> Void, Error>
240
57.4k
  ) {
241
57.4k
    switch self.state {
242
57.4k
    case .idle, .observing:
243
0
      // The observer block can't resolve if it hasn't been created ('idle') and it can't be
244
0
      // resolved more than once ('observing').
245
0
      preconditionFailure()
246
57.4k
247
57.4k
    case let .creatingObserver(context):
248
57.4k
      switch result {
249
57.4k
      case let .success(observer):
250
57.4k
        // We have an observer block now; unbuffer any requests.
251
57.4k
        self.state = .observing(observer, context)
252
57.4k
        while let request = self.requestBuffer.popFirst() {
253
0
          observer(request)
254
0
        }
255
57.4k
256
57.4k
      case let .failure(error):
257
0
        self.handleError(error, thrownFromHandler: true)
258
57.4k
      }
259
57.4k
260
57.4k
    case .completed:
261
0
      // We've already completed. That's fine.
262
0
      ()
263
57.4k
    }
264
57.4k
  }
265
266
  @inlinable
267
  internal func interceptResponse(
268
    _ response: Response,
269
    metadata: MessageMetadata,
270
    promise: EventLoopPromise<Void>?
271
3.77M
  ) {
272
3.77M
    switch self.state {
273
3.77M
    case .idle:
274
0
      // The observer block can't end responses if it doesn't exist!
275
0
      preconditionFailure()
276
3.77M
277
3.77M
    case .creatingObserver, .observing:
278
3.77M
      // The user has access to the response context before returning a future observer,
279
3.77M
      // so 'creatingObserver' is valid here (if a little strange).
280
3.77M
      self.interceptors.send(.message(response, metadata), promise: promise)
281
3.77M
282
3.77M
    case .completed:
283
0
      promise?.fail(GRPCError.AlreadyComplete())
284
3.77M
    }
285
3.77M
  }
286
287
  @inlinable
288
57.4k
  internal func userFunctionStatusResolved(_ result: Result<GRPCStatus, Error>) {
289
57.4k
    switch self.state {
290
57.4k
    case .idle:
291
0
      // The promise can't fail before we create it.
292
0
      preconditionFailure()
293
57.4k
294
57.4k
    // Making is possible, the user can complete the status before returning a stream handler.
295
57.4k
    case let .creatingObserver(context), let .observing(_, context):
296
28.8k
      switch result {
297
28.8k
      case let .success(status):
298
28.8k
        // We're sending end back, we're done.
299
28.8k
        self.state = .completed
300
28.8k
        self.interceptors.send(.end(status, context.trailers), promise: nil)
301
28.8k
302
28.8k
      case let .failure(error):
303
0
        self.handleError(error, thrownFromHandler: true)
304
28.8k
      }
305
57.4k
306
57.4k
    case .completed:
307
28.6k
      ()
308
57.4k
    }
309
57.4k
  }
310
311
  @inlinable
312
29.6k
  internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
313
29.6k
    switch self.state {
314
29.6k
    case .idle:
315
0
      assert(!isHandlerError)
316
0
      self.state = .completed
317
0
      // We don't have a promise to fail. Just send back end.
318
0
      let (status, trailers) = ServerErrorProcessor.processLibraryError(
319
0
        error,
320
0
        delegate: self.context.errorDelegate
321
0
      )
322
0
      self.interceptors.send(.end(status, trailers), promise: nil)
323
29.6k
324
29.6k
    case let .creatingObserver(context),
325
28.6k
      let .observing(_, context):
326
28.6k
      // We don't have a promise to fail. Just send back end.
327
28.6k
      self.state = .completed
328
28.6k
329
28.6k
      let status: GRPCStatus
330
28.6k
      let trailers: HPACKHeaders
331
28.6k
332
28.6k
      if isHandlerError {
333
0
        (status, trailers) = ServerErrorProcessor.processObserverError(
334
0
          error,
335
0
          headers: context.headers,
336
0
          trailers: context.trailers,
337
0
          delegate: self.context.errorDelegate
338
0
        )
339
28.6k
      } else {
340
28.6k
        (status, trailers) = ServerErrorProcessor.processLibraryError(
341
28.6k
          error,
342
28.6k
          delegate: self.context.errorDelegate
343
28.6k
        )
344
28.6k
      }
345
28.6k
346
28.6k
      self.interceptors.send(.end(status, trailers), promise: nil)
347
28.6k
      // We're already in the 'completed' state so failing the promise will be a no-op in the
348
28.6k
      // callback to 'userHandlerCompleted' (but we also need to avoid leaking the promise.)
349
28.6k
      context.statusPromise.fail(error)
350
29.6k
351
29.6k
    case .completed:
352
1.02k
      ()
353
29.6k
    }
354
29.6k
  }
355
356
  @inlinable
357
  internal func sendInterceptedPart(
358
    _ part: GRPCServerResponsePart<Response>,
359
    promise: EventLoopPromise<Void>?
360
3.89M
  ) {
361
3.89M
    switch part {
362
3.89M
    case let .metadata(headers):
363
57.4k
      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
364
3.89M
365
3.89M
    case let .message(message, metadata):
366
3.77M
      do {
367
3.77M
        let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
368
3.77M
        self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
369
3.77M
      } catch {
370
0
        // Serialization failed: fail the promise and send end.
371
0
        promise?.fail(error)
372
0
        let (status, trailers) = ServerErrorProcessor.processLibraryError(
373
0
          error,
374
0
          delegate: self.context.errorDelegate
375
0
        )
376
0
        // Loop back via the interceptors.
377
0
        self.interceptors.send(.end(status, trailers), promise: nil)
378
0
      }
379
3.89M
380
3.89M
    case let .end(status, trailers):
381
57.4k
      self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
382
3.89M
    }
383
3.89M
  }
384
}