Coverage Report

Created: 2025-06-24 06:59

/src/grpc-swift/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOHPACK
18
19
public final class BidirectionalStreamingServerHandler<
20
  Serializer: MessageSerializer,
21
  Deserializer: MessageDeserializer
22
>: GRPCServerHandlerProtocol {
23
  public typealias Request = Deserializer.Output
24
  public typealias Response = Serializer.Input
25
26
  /// A response serializer.
27
  @usableFromInline
28
  internal let serializer: Serializer
29
30
  /// A request deserializer.
31
  @usableFromInline
32
  internal let deserializer: Deserializer
33
34
  /// A pipeline of user provided interceptors.
35
  @usableFromInline
36
  internal var interceptors: ServerInterceptorPipeline<Request, Response>!
37
38
  /// Stream events which have arrived before the stream observer future has been resolved.
39
  @usableFromInline
40
216k
  internal var requestBuffer: CircularBuffer<StreamEvent<Request>> = CircularBuffer()
41
42
  /// The context required in order create the function.
43
  @usableFromInline
44
  internal let context: CallHandlerContext
45
46
  /// A reference to a `UserInfo`.
47
  @usableFromInline
48
  internal let userInfoRef: Ref<UserInfo>
49
50
  /// The user provided function to execute.
51
  @usableFromInline
52
  internal let observerFactory:
53
    (_StreamingResponseCallContext<Request, Response>)
54
      -> EventLoopFuture<(StreamEvent<Request>) -> Void>
55
56
  /// The state of the handler.
57
  @usableFromInline
58
216k
  internal var state: State = .idle
59
60
  @usableFromInline
61
  internal enum State {
62
    // No headers have been received.
63
    case idle
64
    // Headers have been received, a context has been created and the user code has been called to
65
    // make a stream observer with. The observer is yet to see any messages.
66
    case creatingObserver(_StreamingResponseCallContext<Request, Response>)
67
    // The observer future has resolved and the observer may have seen messages.
68
    case observing((StreamEvent<Request>) -> Void, _StreamingResponseCallContext<Request, Response>)
69
    // The observer has completed by completing the status promise.
70
    case completed
71
  }
72
73
  @inlinable
74
  public init(
75
    context: CallHandlerContext,
76
    requestDeserializer: Deserializer,
77
    responseSerializer: Serializer,
78
    interceptors: [ServerInterceptor<Request, Response>],
79
    observerFactory: @escaping (StreamingResponseCallContext<Response>)
80
      -> EventLoopFuture<(StreamEvent<Request>) -> Void>
81
99.8k
  ) {
82
99.8k
    self.serializer = responseSerializer
83
99.8k
    self.deserializer = requestDeserializer
84
99.8k
    self.context = context
85
99.8k
    self.observerFactory = observerFactory
86
99.8k
87
99.8k
    let userInfoRef = Ref(UserInfo())
88
99.8k
    self.userInfoRef = userInfoRef
89
99.8k
    self.interceptors = ServerInterceptorPipeline(
90
99.8k
      logger: context.logger,
91
99.8k
      eventLoop: context.eventLoop,
92
99.8k
      path: context.path,
93
99.8k
      callType: .bidirectionalStreaming,
94
99.8k
      remoteAddress: context.remoteAddress,
95
99.8k
      userInfoRef: userInfoRef,
96
99.8k
      closeFuture: context.closeFuture,
97
99.8k
      interceptors: interceptors,
98
3.57M
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_
Line
Count
Source
98
99.8k
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA4_cfu0_
Line
Count
Source
98
3.47M
      onRequestPart: self.receiveInterceptedPart(_:),
99
3.65M
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_
Line
Count
Source
99
99.8k
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC7context19requestDeserializer18responseSerializer12interceptors15observerFactoryACyxq_GAA04CallE7ContextV_q_xSayAA0D11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyyAA06StreamT0OyAOGcGAA0c8ResponsenO0CyAQGctcfcyAA010GRPCServerX4PartOyAQG_AT0tU7PromiseVyytGSgtcAIcfu1_yA4__A8_tcfu2_
Line
Count
Source
99
3.55M
      onResponsePart: self.sendInterceptedPart(_:promise:)
100
99.8k
    )
101
99.8k
  }
102
103
  // MARK: - Public API: gRPC to Handler
104
105
  @inlinable
106
30.1k
  public func receiveMetadata(_ headers: HPACKHeaders) {
107
30.1k
    self.interceptors.receive(.metadata(headers))
108
30.1k
  }
109
110
  @inlinable
111
1.11M
  public func receiveMessage(_ bytes: ByteBuffer) {
112
1.11M
    do {
113
1.11M
      let message = try self.deserializer.deserialize(byteBuffer: bytes)
114
1.11M
      self.interceptors.receive(.message(message))
115
1.11M
    } catch {
116
21.7k
      self.handleError(error)
117
1.11M
    }
118
1.11M
  }
119
120
  @inlinable
121
5.45k
  public func receiveEnd() {
122
5.45k
    self.interceptors.receive(.end)
123
5.45k
  }
124
125
  @inlinable
126
3.43k
  public func receiveError(_ error: Error) {
127
3.43k
    self.handleError(error)
128
3.43k
    self.finish()
129
3.43k
  }
130
131
  @inlinable
132
33.5k
  public func finish() {
133
33.5k
    switch self.state {
134
33.5k
    case .idle:
135
0
      self.interceptors = nil
136
0
      self.state = .completed
137
33.5k
138
33.5k
    case let .creatingObserver(context),
139
0
      let .observing(_, context):
140
0
      context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil))
141
0
      self.context.eventLoop.execute {
142
0
        self.interceptors = nil
143
0
      }
144
33.5k
145
33.5k
    case .completed:
146
33.5k
      self.interceptors = nil
147
33.5k
    }
148
33.5k
  }
149
150
  // MARK: - Interceptors to User Function
151
152
  @inlinable
153
3.47M
  internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
154
3.47M
    switch part {
155
3.47M
    case let .metadata(headers):
156
99.8k
      self.receiveInterceptedMetadata(headers)
157
3.47M
    case let .message(message):
158
3.35M
      self.receiveInterceptedMessage(message)
159
3.47M
    case .end:
160
19.8k
      self.receiveInterceptedEnd()
161
3.47M
    }
162
3.47M
  }
163
164
  @inlinable
165
99.8k
  internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
166
99.8k
    switch self.state {
167
99.8k
    case .idle:
168
99.8k
      // Make a context to invoke the observer block factory with.
169
99.8k
      let context = _StreamingResponseCallContext<Request, Response>(
170
99.8k
        eventLoop: self.context.eventLoop,
171
99.8k
        headers: headers,
172
99.8k
        logger: self.context.logger,
173
99.8k
        userInfoRef: self.userInfoRef,
174
99.8k
        compressionIsEnabled: self.context.encoding.isEnabled,
175
99.8k
        closeFuture: self.context.closeFuture,
176
3.45M
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_
Line
Count
Source
176
99.8k
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageH0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_yAI_AkPtcfu0_
Line
Count
Source
176
3.35M
        sendResponse: self.interceptResponse(_:metadata:promise:)
177
99.8k
      )
178
99.8k
179
99.8k
      // Move to the next state.
180
99.8k
      self.state = .creatingObserver(context)
181
99.8k
182
99.8k
      // Send response headers back via the interceptors.
183
99.8k
      self.interceptors.send(.metadata([:]), promise: nil)
184
99.8k
185
99.8k
      // Register callbacks on the status future.
186
199k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_
Line
Count
Source
186
99.8k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_yAMcfu2_
Line
Count
Source
186
99.8k
      context.statusPromise.futureResult.whenComplete(self.userFunctionStatusResolved(_:))
187
99.8k
188
99.8k
      // Make an observer block and register a completion block.
189
199k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_
Line
Count
Source
189
99.8k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
$s4GRPC35BidirectionalStreamingServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyyAA11StreamEventOy6OutputQy_Gcs5Error_pGcACyxq_Gcfu3_yAPcfu4_
Line
Count
Source
189
99.8k
      self.observerFactory(context).whenComplete(self.userFunctionResolvedWithResult(_:))
190
99.8k
191
99.8k
    case .creatingObserver, .observing:
192
0
      self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
193
99.8k
194
99.8k
    case .completed:
195
0
      // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
196
0
      // to an error or otherwise) and an interceptor doing some async work later emitting headers.
197
0
      // Dropping them is fine.
198
0
      ()
199
99.8k
    }
200
99.8k
  }
201
202
  @inlinable
203
3.35M
  internal func receiveInterceptedMessage(_ request: Request) {
204
3.35M
    switch self.state {
205
3.35M
    case .idle:
206
0
      self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
207
3.35M
    case .creatingObserver:
208
0
      self.requestBuffer.append(.message(request))
209
3.35M
    case let .observing(observer, _):
210
3.35M
      observer(.message(request))
211
3.35M
    case .completed:
212
0
      // We received a message but we're already done: this may happen if we terminate the RPC
213
0
      // due to a channel error, for example.
214
0
      ()
215
3.35M
    }
216
3.35M
  }
217
218
  @inlinable
219
19.8k
  internal func receiveInterceptedEnd() {
220
19.8k
    switch self.state {
221
19.8k
    case .idle:
222
0
      self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
223
19.8k
    case .creatingObserver:
224
0
      self.requestBuffer.append(.end)
225
19.8k
    case let .observing(observer, _):
226
19.8k
      observer(.end)
227
19.8k
    case .completed:
228
0
      // We received a message but we're already done: this may happen if we terminate the RPC
229
0
      // due to a channel error, for example.
230
0
      ()
231
19.8k
    }
232
19.8k
  }
233
234
  // MARK: - User Function To Interceptors
235
236
  @inlinable
237
  internal func userFunctionResolvedWithResult(
238
    _ result: Result<(StreamEvent<Request>) -> Void, Error>
239
99.8k
  ) {
240
99.8k
    switch self.state {
241
99.8k
    case .idle, .observing:
242
0
      // The observer block can't resolve if it hasn't been created ('idle') and it can't be
243
0
      // resolved more than once ('observing').
244
0
      preconditionFailure()
245
99.8k
246
99.8k
    case let .creatingObserver(context):
247
99.8k
      switch result {
248
99.8k
      case let .success(observer):
249
99.8k
        // We have an observer block now; unbuffer any requests.
250
99.8k
        self.state = .observing(observer, context)
251
99.8k
        while let request = self.requestBuffer.popFirst() {
252
0
          observer(request)
253
99.8k
        }
254
99.8k
255
99.8k
      case let .failure(error):
256
0
        self.handleError(error, thrownFromHandler: true)
257
99.8k
      }
258
99.8k
259
99.8k
    case .completed:
260
0
      // We've already completed. That's fine.
261
0
      ()
262
99.8k
    }
263
99.8k
  }
264
265
  @inlinable
266
  internal func interceptResponse(
267
    _ response: Response,
268
    metadata: MessageMetadata,
269
    promise: EventLoopPromise<Void>?
270
3.35M
  ) {
271
3.35M
    switch self.state {
272
3.35M
    case .idle:
273
0
      // The observer block can't end responses if it doesn't exist!
274
0
      preconditionFailure()
275
3.35M
276
3.35M
    case .creatingObserver, .observing:
277
3.35M
      // The user has access to the response context before returning a future observer,
278
3.35M
      // so 'creatingObserver' is valid here (if a little strange).
279
3.35M
      self.interceptors.send(.message(response, metadata), promise: promise)
280
3.35M
281
3.35M
    case .completed:
282
0
      promise?.fail(GRPCError.AlreadyComplete())
283
3.35M
    }
284
3.35M
  }
285
286
  @inlinable
287
99.8k
  internal func userFunctionStatusResolved(_ result: Result<GRPCStatus, Error>) {
288
99.8k
    switch self.state {
289
99.8k
    case .idle:
290
0
      // The promise can't fail before we create it.
291
0
      preconditionFailure()
292
99.8k
293
99.8k
    // Making is possible, the user can complete the status before returning a stream handler.
294
99.8k
    case let .creatingObserver(context), let .observing(_, context):
295
19.8k
      switch result {
296
19.8k
      case let .success(status):
297
19.8k
        // We're sending end back, we're done.
298
19.8k
        self.state = .completed
299
19.8k
        self.interceptors.send(.end(status, context.trailers), promise: nil)
300
19.8k
301
19.8k
      case let .failure(error):
302
0
        self.handleError(error, thrownFromHandler: true)
303
99.8k
      }
304
99.8k
305
99.8k
    case .completed:
306
80.0k
      ()
307
99.8k
    }
308
99.8k
  }
309
310
  @inlinable
311
81.4k
  internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
312
81.4k
    switch self.state {
313
81.4k
    case .idle:
314
0
      assert(!isHandlerError)
315
0
      self.state = .completed
316
0
      // We don't have a promise to fail. Just send back end.
317
0
      let (status, trailers) = ServerErrorProcessor.processLibraryError(
318
0
        error,
319
0
        delegate: self.context.errorDelegate
320
0
      )
321
0
      self.interceptors.send(.end(status, trailers), promise: nil)
322
81.4k
323
81.4k
    case let .creatingObserver(context),
324
80.0k
      let .observing(_, context):
325
80.0k
      // We don't have a promise to fail. Just send back end.
326
80.0k
      self.state = .completed
327
80.0k
328
80.0k
      let status: GRPCStatus
329
80.0k
      let trailers: HPACKHeaders
330
80.0k
331
80.0k
      if isHandlerError {
332
0
        (status, trailers) = ServerErrorProcessor.processObserverError(
333
0
          error,
334
0
          headers: context.headers,
335
0
          trailers: context.trailers,
336
0
          delegate: self.context.errorDelegate
337
0
        )
338
80.0k
      } else {
339
80.0k
        (status, trailers) = ServerErrorProcessor.processLibraryError(
340
80.0k
          error,
341
80.0k
          delegate: self.context.errorDelegate
342
80.0k
        )
343
80.0k
      }
344
80.0k
345
80.0k
      self.interceptors.send(.end(status, trailers), promise: nil)
346
80.0k
      // We're already in the 'completed' state so failing the promise will be a no-op in the
347
80.0k
      // callback to 'userHandlerCompleted' (but we also need to avoid leaking the promise.)
348
80.0k
      context.statusPromise.fail(error)
349
81.4k
350
81.4k
    case .completed:
351
1.39k
      ()
352
81.4k
    }
353
81.4k
  }
354
355
  @inlinable
356
  internal func sendInterceptedPart(
357
    _ part: GRPCServerResponsePart<Response>,
358
    promise: EventLoopPromise<Void>?
359
3.55M
  ) {
360
3.55M
    switch part {
361
3.55M
    case let .metadata(headers):
362
99.8k
      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
363
3.55M
364
3.55M
    case let .message(message, metadata):
365
3.35M
      do {
366
3.35M
        let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
367
3.35M
        self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
368
3.35M
      } catch {
369
0
        // Serialization failed: fail the promise and send end.
370
0
        promise?.fail(error)
371
0
        let (status, trailers) = ServerErrorProcessor.processLibraryError(
372
0
          error,
373
0
          delegate: self.context.errorDelegate
374
0
        )
375
0
        // Loop back via the interceptors.
376
0
        self.interceptors.send(.end(status, trailers), promise: nil)
377
3.55M
      }
378
3.55M
379
3.55M
    case let .end(status, trailers):
380
99.8k
      self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
381
3.55M
    }
382
3.55M
  }
383
}