Coverage Report

Created: 2026-04-29 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Line
Count
Source
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import DequeModule
17
import Logging
18
import NIOCore
19
import NIOHPACK
20
21
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
22
public struct GRPCAsyncServerHandler<
23
  Serializer: MessageSerializer,
24
  Deserializer: MessageDeserializer,
25
  Request: Sendable,
26
  Response: Sendable
27
>: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
28
  @usableFromInline
29
  internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>
30
31
0
  public func receiveMetadata(_ metadata: HPACKHeaders) {
32
0
    self._handler.receiveMetadata(metadata)
33
0
  }
34
35
0
  public func receiveMessage(_ bytes: ByteBuffer) {
36
0
    self._handler.receiveMessage(bytes)
37
0
  }
38
39
0
  public func receiveEnd() {
40
0
    self._handler.receiveEnd()
41
0
  }
42
43
0
  public func receiveError(_ error: Error) {
44
0
    self._handler.receiveError(error)
45
0
  }
46
47
0
  public func finish() {
48
0
    self._handler.finish()
49
0
  }
50
}
51
52
// MARK: - RPC Adapters
53
54
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
55
extension GRPCAsyncServerHandler {
56
  public typealias Request = Deserializer.Output
57
  public typealias Response = Serializer.Input
58
59
  @inlinable
60
  public init(
61
    context: CallHandlerContext,
62
    requestDeserializer: Deserializer,
63
    responseSerializer: Serializer,
64
    interceptors: [ServerInterceptor<Request, Response>],
65
    wrapping unary:
66
      @escaping @Sendable (Request, GRPCAsyncServerCallContext) async throws
67
      -> Response
68
0
  ) {
69
0
    self._handler = .init(
70
0
      context: context,
71
0
      requestDeserializer: requestDeserializer,
72
0
      responseSerializer: responseSerializer,
73
0
      callType: .unary,
74
0
      interceptors: interceptors,
75
0
      userHandler: { requestStream, responseStreamWriter, context in
76
0
        var iterator = requestStream.makeAsyncIterator()
77
0
        guard let request = try await iterator.next(), try await iterator.next() == nil else {
78
0
          throw GRPCError.ProtocolViolation("Unary RPC expects exactly one request")
79
0
        }
80
0
        let response = try await unary(request, context)
81
0
        try await responseStreamWriter.send(response)
82
0
      }
83
0
    )
84
0
  }
85
86
  @inlinable
87
  public init(
88
    context: CallHandlerContext,
89
    requestDeserializer: Deserializer,
90
    responseSerializer: Serializer,
91
    interceptors: [ServerInterceptor<Request, Response>],
92
    wrapping clientStreaming:
93
      @escaping @Sendable (
94
        GRPCAsyncRequestStream<Request>,
95
        GRPCAsyncServerCallContext
96
      ) async throws -> Response
97
0
  ) {
98
0
    self._handler = .init(
99
0
      context: context,
100
0
      requestDeserializer: requestDeserializer,
101
0
      responseSerializer: responseSerializer,
102
0
      callType: .clientStreaming,
103
0
      interceptors: interceptors,
104
0
      userHandler: { requestStream, responseStreamWriter, context in
105
0
        let response = try await clientStreaming(requestStream, context)
106
0
        try await responseStreamWriter.send(response)
107
0
      }
108
0
    )
109
0
  }
110
111
  @inlinable
112
  public init(
113
    context: CallHandlerContext,
114
    requestDeserializer: Deserializer,
115
    responseSerializer: Serializer,
116
    interceptors: [ServerInterceptor<Request, Response>],
117
    wrapping serverStreaming:
118
      @escaping @Sendable (
119
        Request,
120
        GRPCAsyncResponseStreamWriter<Response>,
121
        GRPCAsyncServerCallContext
122
      ) async throws -> Void
123
0
  ) {
124
0
    self._handler = .init(
125
0
      context: context,
126
0
      requestDeserializer: requestDeserializer,
127
0
      responseSerializer: responseSerializer,
128
0
      callType: .serverStreaming,
129
0
      interceptors: interceptors,
130
0
      userHandler: { requestStream, responseStreamWriter, context in
131
0
        var iterator = requestStream.makeAsyncIterator()
132
0
        guard let request = try await iterator.next(), try await iterator.next() == nil else {
133
0
          throw GRPCError.ProtocolViolation("Server-streaming RPC expects exactly one request")
134
0
        }
135
0
        try await serverStreaming(request, responseStreamWriter, context)
136
0
      }
137
0
    )
138
0
  }
139
140
  @inlinable
141
  public init(
142
    context: CallHandlerContext,
143
    requestDeserializer: Deserializer,
144
    responseSerializer: Serializer,
145
    interceptors: [ServerInterceptor<Request, Response>],
146
    wrapping bidirectional:
147
      @escaping @Sendable (
148
        GRPCAsyncRequestStream<Request>,
149
        GRPCAsyncResponseStreamWriter<Response>,
150
        GRPCAsyncServerCallContext
151
      ) async throws -> Void
152
0
  ) {
153
0
    self._handler = .init(
154
0
      context: context,
155
0
      requestDeserializer: requestDeserializer,
156
0
      responseSerializer: responseSerializer,
157
0
      callType: .bidirectionalStreaming,
158
0
      interceptors: interceptors,
159
0
      userHandler: bidirectional
160
0
    )
161
0
  }
162
}
163
164
// MARK: - Server Handler
165
166
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
167
@usableFromInline
168
internal final class AsyncServerHandler<
169
  Serializer: MessageSerializer,
170
  Deserializer: MessageDeserializer,
171
  Request: Sendable,
172
  Response: Sendable
173
>: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
174
  /// A response serializer.
175
  @usableFromInline
176
  internal let serializer: Serializer
177
178
  /// A request deserializer.
179
  @usableFromInline
180
  internal let deserializer: Deserializer
181
182
  /// The event loop that this handler executes on.
183
  @usableFromInline
184
  internal let eventLoop: EventLoop
185
186
  /// A `ByteBuffer` allocator provided by the underlying `Channel`.
187
  @usableFromInline
188
  internal let allocator: ByteBufferAllocator
189
190
  /// A user-provided error delegate which, if provided, is used to transform errors and potentially
191
  /// pack errors into trailers.
192
  @usableFromInline
193
  internal let errorDelegate: ServerErrorDelegate?
194
195
  /// A logger.
196
  @usableFromInline
197
  internal let logger: Logger
198
199
  /// A reference to the user info. This is shared with the interceptor pipeline and may be accessed
200
  /// from the async call context. `UserInfo` is _not_ `Sendable` and must always be accessed from
201
  /// an appropriate event loop.
202
  @usableFromInline
203
  internal let userInfoRef: Ref<UserInfo>
204
205
  /// Whether compression is enabled on the server and an algorithm has been negotiated with
206
  /// the client
207
  @usableFromInline
208
  internal let compressionEnabledOnRPC: Bool
209
210
  /// Whether the RPC method would like to compress responses (if possible). Defaults to true.
211
  @usableFromInline
212
  internal var compressResponsesIfPossible: Bool
213
214
  /// The interceptor pipeline does not track flushing as a separate event. The flush decision is
215
  /// included with metadata alongside each message. For the status and trailers the flush is
216
  /// implicit. For headers we track whether to flush here.
217
  ///
218
  /// In most cases the flush will be delayed until the first message is flushed and this will
219
  /// remain unset. However, this may be set when the server handler
220
  /// uses ``GRPCAsyncServerCallContext/sendHeaders(_:)``.
221
  @usableFromInline
222
  internal var flushNextHeaders: Bool
223
224
  /// A state machine for the interceptor pipeline.
225
  @usableFromInline
226
  internal private(set) var interceptorStateMachine: ServerInterceptorStateMachine
227
  /// The interceptor pipeline.
228
  @usableFromInline
229
  internal private(set) var interceptors: Optional<ServerInterceptorPipeline<Request, Response>>
230
  /// An object for writing intercepted responses to the channel.
231
  @usableFromInline
232
  internal private(set) var responseWriter: Optional<GRPCServerResponseWriter>
233
234
  /// A state machine for the user implemented function.
235
  @usableFromInline
236
  internal private(set) var handlerStateMachine: ServerHandlerStateMachine
237
  /// A bag of components used by the user handler.
238
  @usableFromInline
239
  internal private(set) var handlerComponents:
240
    Optional<
241
      ServerHandlerComponents<
242
        Request,
243
        Response,
244
        GRPCAsyncWriterSinkDelegate<(Response, Compression)>
245
      >
246
    >
247
248
  /// The user provided function to execute.
249
  @usableFromInline
250
  internal let userHandler:
251
    @Sendable (
252
      GRPCAsyncRequestStream<Request>,
253
      GRPCAsyncResponseStreamWriter<Response>,
254
      GRPCAsyncServerCallContext
255
    ) async throws -> Void
256
257
  @usableFromInline
258
  internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
259
    Request,
260
    Error,
261
    NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
262
    GRPCAsyncSequenceProducerDelegate
263
  >
264
265
  @inlinable
266
  internal init(
267
    context: CallHandlerContext,
268
    requestDeserializer: Deserializer,
269
    responseSerializer: Serializer,
270
    callType: GRPCCallType,
271
    interceptors: [ServerInterceptor<Request, Response>],
272
    userHandler:
273
      @escaping @Sendable (
274
        GRPCAsyncRequestStream<Request>,
275
        GRPCAsyncResponseStreamWriter<Response>,
276
        GRPCAsyncServerCallContext
277
      ) async throws -> Void
278
0
  ) {
279
0
    self.serializer = responseSerializer
280
0
    self.deserializer = requestDeserializer
281
0
    self.eventLoop = context.eventLoop
282
0
    self.allocator = context.allocator
283
0
    self.responseWriter = context.responseWriter
284
0
    self.errorDelegate = context.errorDelegate
285
0
    self.compressionEnabledOnRPC = context.encoding.isEnabled
286
0
    self.compressResponsesIfPossible = true
287
0
    self.flushNextHeaders = false
288
0
    self.logger = context.logger
289
0
290
0
    self.userInfoRef = Ref(UserInfo())
291
0
    self.handlerStateMachine = .init()
292
0
    self.handlerComponents = nil
293
0
294
0
    self.userHandler = userHandler
295
0
296
0
    self.interceptorStateMachine = .init()
297
0
    self.interceptors = nil
298
0
    self.interceptors = ServerInterceptorPipeline(
299
0
      logger: context.logger,
300
0
      eventLoop: context.eventLoop,
301
0
      path: context.path,
302
0
      callType: callType,
303
0
      remoteAddress: context.remoteAddress,
304
0
      userInfoRef: self.userInfoRef,
305
0
      closeFuture: context.closeFuture,
306
0
      interceptors: interceptors,
307
0
      onRequestPart: self.receiveInterceptedPart(_:),
Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerS4PartOyq0_GcAJcfu_
Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerS4PartOyq0_GcAJcfu_yA1_cfu0_
308
0
      onResponsePart: self.sendInterceptedPart(_:promise:)
Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerU4PartOyq1_G_7NIOCore16EventLoopPromiseVyytGSgtcAJcfu1_
Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC7context19requestDeserializer18responseSerializer8callType12interceptors04userD0ACyxq_q0_q1_GAA04CallD7ContextV_q_xAA08GRPCCallK0OSayAA0C11InterceptorCyq0_q1_GGyAA22GRPCAsyncRequestStreamVyq0_G_AA0r8ResponseT6WriterVyq1_GAA0rcnO0VtYaYbKctcfcyAA010GRPCServerU4PartOyq1_G_7NIOCore16EventLoopPromiseVyytGSgtcAJcfu1_yA1__A6_tcfu2_
309
0
    )
310
0
  }
311
312
  // MARK: - GRPCServerHandlerProtocol conformance
313
314
  @inlinable
315
0
  internal func receiveMetadata(_ headers: HPACKHeaders) {
316
0
    switch self.interceptorStateMachine.interceptRequestMetadata() {
317
0
    case .intercept:
318
0
      self.interceptors?.receive(.metadata(headers))
319
0
    case .cancel:
320
0
      self.cancel(error: nil)
321
0
    case .drop:
322
0
      ()
323
0
    }
324
0
  }
325
326
  @inlinable
327
0
  internal func receiveMessage(_ bytes: ByteBuffer) {
328
0
    let request: Request
329
0
330
0
    do {
331
0
      request = try self.deserializer.deserialize(byteBuffer: bytes)
332
0
    } catch {
333
0
      return self.cancel(error: error)
334
0
    }
335
0
336
0
    switch self.interceptorStateMachine.interceptRequestMessage() {
337
0
    case .intercept:
338
0
      self.interceptors?.receive(.message(request))
339
0
    case .cancel:
340
0
      self.cancel(error: nil)
341
0
    case .drop:
342
0
      ()
343
0
    }
344
0
  }
345
346
  @inlinable
347
0
  internal func receiveEnd() {
348
0
    switch self.interceptorStateMachine.interceptRequestEnd() {
349
0
    case .intercept:
350
0
      self.interceptors?.receive(.end)
351
0
    case .cancel:
352
0
      self.cancel(error: nil)
353
0
    case .drop:
354
0
      ()
355
0
    }
356
0
  }
357
358
  @inlinable
359
0
  internal func receiveError(_ error: Error) {
360
0
    self.cancel(error: error)
361
0
  }
362
363
  @inlinable
364
0
  internal func finish() {
365
0
    self.cancel(error: nil)
366
0
  }
367
368
  @usableFromInline
369
0
  internal func cancel(error: Error?) {
370
0
    self.eventLoop.assertInEventLoop()
371
0
372
0
    switch self.handlerStateMachine.cancel() {
373
0
    case .cancelAndNilOutHandlerComponents:
374
0
      // Cancel handler related things (task, response writer).
375
0
      self.handlerComponents?.cancel()
376
0
      self.handlerComponents = nil
377
0
378
0
      // We don't distinguish between having sent the status or not; we just tell the interceptor
379
0
      // state machine that we want to send a response status. It will inform us whether to
380
0
      // generate and send one or not.
381
0
      switch self.interceptorStateMachine.interceptedResponseStatus() {
382
0
      case .forward:
383
0
        let error = error ?? GRPCStatus.processingError
384
0
        let (status, trailers) = ServerErrorProcessor.processLibraryError(
385
0
          error,
386
0
          delegate: self.errorDelegate
387
0
        )
388
0
        self.responseWriter?.sendEnd(status: status, trailers: trailers, promise: nil)
389
0
      case .drop, .cancel:
390
0
        ()
391
0
      }
392
0
393
0
    case .none:
394
0
      ()
395
0
    }
396
0
397
0
    switch self.interceptorStateMachine.cancel() {
398
0
    case .sendStatusThenNilOutInterceptorPipeline:
399
0
      self.responseWriter?.sendEnd(status: .processingError, trailers: [:], promise: nil)
400
0
      fallthrough
401
0
    case .nilOutInterceptorPipeline:
402
0
      self.interceptors?.close()
403
0
      self.interceptors = nil
404
0
      self.responseWriter = nil
405
0
    case .none:
406
0
      ()
407
0
    }
408
0
  }
409
410
  // MARK: - Interceptors to User Function
411
412
  @inlinable
413
0
  internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
414
0
    switch part {
415
0
    case let .metadata(headers):
416
0
      self.receiveInterceptedMetadata(headers)
417
0
    case let .message(message):
418
0
      self.receiveInterceptedMessage(message)
419
0
    case .end:
420
0
      self.receiveInterceptedEnd()
421
0
    }
422
0
  }
423
424
  @inlinable
425
0
  internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
426
0
    switch self.interceptorStateMachine.interceptedRequestMetadata() {
427
0
    case .forward:
428
0
      ()  // continue
429
0
    case .cancel:
430
0
      return self.cancel(error: nil)
431
0
    case .drop:
432
0
      return
433
0
    }
434
0
435
0
    switch self.handlerStateMachine.handleMetadata() {
436
0
    case .invokeHandler:
437
0
      // We're going to invoke the handler. We need to create a handful of things in order to do
438
0
      // that:
439
0
      //
440
0
      // - A context which allows the handler to set response headers/trailers and provides them
441
0
      //   with a logger amongst other things.
442
0
      // - A request source; we push request messages into this which the handler consumes via
443
0
      //   an async sequence.
444
0
      // - An async writer and delegate. The delegate calls us back with responses. The writer is
445
0
      //   passed to the handler.
446
0
      //
447
0
      // All of these components are held in a bundle ("handler components") outside of the state
448
0
      // machine. We release these when we eventually call cancel (either when we `self.cancel()`
449
0
      // as a result of an error or when `self.finish()` is called).
450
0
      let handlerContext = GRPCAsyncServerCallContext(
451
0
        headers: headers,
452
0
        logger: self.logger,
453
0
        contextProvider: self
454
0
      )
455
0
456
0
      let backpressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
457
0
        lowWatermark: 10,
458
0
        highWatermark: 50
459
0
      )
460
0
      let requestSequenceProducer = NIOThrowingAsyncSequenceProducer.makeSequence(
461
0
        elementType: Request.self,
462
0
        failureType: Error.self,
463
0
        backPressureStrategy: backpressureStrategy,
464
0
        delegate: GRPCAsyncSequenceProducerDelegate()
465
0
      )
466
0
467
0
      let responseWriter = NIOAsyncWriter.makeWriter(
468
0
        isWritable: true,
469
0
        delegate: GRPCAsyncWriterSinkDelegate<(Response, Compression)>(
470
0
          didYield: self.interceptResponseMessages,
Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy11DequeModule0J0Vyq1__AA11CompressionVtGYbcACyxq_q0_q1_Gcfu_
Unexecuted instantiation: $s4GRPC18AsyncServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy11DequeModule0J0Vyq1__AA11CompressionVtGYbcACyxq_q0_q1_Gcfu_yAMYbcfu0_
471
0
          didTerminate: { error in
472
0
            self.interceptTermination(error)
473
0
          }
474
0
        )
475
0
      )
476
0
477
0
      // Update our state before invoke the handler.
478
0
      self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
479
0
      self.handlerComponents = ServerHandlerComponents<
480
0
        Request,
481
0
        Response,
482
0
        GRPCAsyncWriterSinkDelegate<(Response, Compression)>
483
0
      >(
484
0
        requestSource: requestSequenceProducer.source,
485
0
        responseWriterSink: responseWriter.sink,
486
0
        task: Task {
487
0
          // We don't have a task cancellation handler here: we do it in `self.cancel()`.
488
0
          await self.invokeUserHandler(
489
0
            requestSequence: requestSequenceProducer,
490
0
            responseWriter: responseWriter.writer,
491
0
            callContext: handlerContext
492
0
          )
493
0
        }
494
0
      )
495
0
496
0
    case .cancel:
497
0
      self.cancel(error: nil)
498
0
    }
499
0
  }
500
501
  @Sendable
502
  @usableFromInline
503
  internal func invokeUserHandler(
504
    requestSequence: AsyncSequenceProducer.NewSequence,
505
    responseWriter: NIOAsyncWriter<
506
      (Response, Compression),
507
      GRPCAsyncWriterSinkDelegate<(Response, Compression)>
508
    >,
509
    callContext: GRPCAsyncServerCallContext
510
0
  ) async {
511
0
    defer {
512
0
      // It's possible the user handler completed before the end of the request stream. We
513
0
      // explicitly finish it to drop any unconsumed inbound messages.
514
0
      requestSequence.source.finish()
515
0
    }
516
0
517
0
    do {
518
0
      let grpcRequestStream = GRPCAsyncRequestStream(requestSequence.sequence)
519
0
      let grpcResponseStreamWriter = GRPCAsyncResponseStreamWriter(wrapping: responseWriter)
520
0
      try await self.userHandler(grpcRequestStream, grpcResponseStreamWriter, callContext)
521
0
522
0
      responseWriter.finish()
523
0
    } catch {
524
0
      responseWriter.finish(error: error)
525
0
    }
526
0
  }
527
528
  @inlinable
529
0
  internal func receiveInterceptedMessage(_ request: Request) {
530
0
    switch self.interceptorStateMachine.interceptedRequestMessage() {
531
0
    case .forward:
532
0
      switch self.handlerStateMachine.handleMessage() {
533
0
      case .forward:
534
0
        _ = self.handlerComponents?.requestSource.yield(request)
535
0
      case .cancel:
536
0
        self.cancel(error: nil)
537
0
      }
538
0
539
0
    case .cancel:
540
0
      self.cancel(error: nil)
541
0
542
0
    case .drop:
543
0
      ()
544
0
    }
545
0
  }
546
547
  @inlinable
548
0
  internal func receiveInterceptedEnd() {
549
0
    switch self.interceptorStateMachine.interceptedRequestEnd() {
550
0
    case .forward:
551
0
      switch self.handlerStateMachine.handleEnd() {
552
0
      case .forward:
553
0
        self.handlerComponents?.requestSource.finish()
554
0
      case .cancel:
555
0
        self.cancel(error: nil)
556
0
      }
557
0
    case .cancel:
558
0
      self.cancel(error: nil)
559
0
    case .drop:
560
0
      ()
561
0
    }
562
0
  }
563
564
  // MARK: - User Function To Interceptors
565
566
  @inlinable
567
0
  internal func _interceptResponseMessage(_ response: Response, compression: Compression) {
568
0
    self.eventLoop.assertInEventLoop()
569
0
570
0
    switch self.handlerStateMachine.sendMessage() {
571
0
    case let .intercept(.some(headers)):
572
0
      switch self.interceptorStateMachine.interceptResponseMetadata() {
573
0
      case .intercept:
574
0
        self.interceptors?.send(.metadata(headers), promise: nil)
575
0
      case .cancel:
576
0
        return self.cancel(error: nil)
577
0
      case .drop:
578
0
        ()
579
0
      }
580
0
      // Fall through to the next case to send the response message.
581
0
      fallthrough
582
0
583
0
    case .intercept(.none):
584
0
      switch self.interceptorStateMachine.interceptResponseMessage() {
585
0
      case .intercept:
586
0
        let senderWantsCompression = compression.isEnabled(
587
0
          callDefault: self.compressResponsesIfPossible
588
0
        )
589
0
590
0
        let compress = self.compressionEnabledOnRPC && senderWantsCompression
591
0
592
0
        let metadata = MessageMetadata(compress: compress, flush: true)
593
0
        self.interceptors?.send(.message(response, metadata), promise: nil)
594
0
      case .cancel:
595
0
        return self.cancel(error: nil)
596
0
      case .drop:
597
0
        ()
598
0
      }
599
0
600
0
    case .drop:
601
0
      ()
602
0
    }
603
0
  }
604
605
  @Sendable
606
  @inlinable
607
0
  internal func interceptResponseMessages(_ messages: Deque<(Response, Compression)>) {
608
0
    if self.eventLoop.inEventLoop {
609
0
      for message in messages {
610
0
        self._interceptResponseMessage(message.0, compression: message.1)
611
0
      }
612
0
    } else {
613
0
      self.eventLoop.execute {
614
0
        for message in messages {
615
0
          self._interceptResponseMessage(message.0, compression: message.1)
616
0
        }
617
0
      }
618
0
    }
619
0
  }
620
621
  @inlinable
622
0
  internal func _interceptTermination(_ error: Error?) {
623
0
    self.eventLoop.assertInEventLoop()
624
0
625
0
    let processedError: Error?
626
0
    if let thrownStatus = error as? GRPCStatus, thrownStatus.isOk {
627
0
      processedError = GRPCStatus(
628
0
        code: .unknown,
629
0
        message: "Handler threw error with status code 'ok'."
630
0
      )
631
0
    } else {
632
0
      processedError = error
633
0
    }
634
0
635
0
    switch self.handlerStateMachine.sendStatus() {
636
0
    case let .intercept(requestHeaders, trailers):
637
0
      let status: GRPCStatus
638
0
      let processedTrailers: HPACKHeaders
639
0
640
0
      if let processedError = processedError {
641
0
        (status, processedTrailers) = ServerErrorProcessor.processObserverError(
642
0
          processedError,
643
0
          headers: requestHeaders,
644
0
          trailers: trailers,
645
0
          delegate: self.errorDelegate
646
0
        )
647
0
      } else {
648
0
        status = GRPCStatus.ok
649
0
        processedTrailers = trailers
650
0
      }
651
0
652
0
      switch self.interceptorStateMachine.interceptResponseStatus() {
653
0
      case .intercept:
654
0
        self.interceptors?.send(.end(status, processedTrailers), promise: nil)
655
0
      case .cancel:
656
0
        return self.cancel(error: nil)
657
0
      case .drop:
658
0
        ()
659
0
      }
660
0
661
0
    case .drop:
662
0
      ()
663
0
    }
664
0
  }
665
666
  @Sendable
667
  @inlinable
668
0
  internal func interceptTermination(_ status: Error?) {
669
0
    if self.eventLoop.inEventLoop {
670
0
      self._interceptTermination(status)
671
0
    } else {
672
0
      self.eventLoop.execute {
673
0
        self._interceptTermination(status)
674
0
      }
675
0
    }
676
0
  }
677
678
  @inlinable
679
  internal func sendInterceptedPart(
680
    _ part: GRPCServerResponsePart<Response>,
681
    promise: EventLoopPromise<Void>?
682
0
  ) {
683
0
    switch part {
684
0
    case let .metadata(headers):
685
0
      self.sendInterceptedMetadata(headers, promise: promise)
686
0
687
0
    case let .message(message, metadata):
688
0
      do {
689
0
        let bytes = try self.serializer.serialize(message, allocator: ByteBufferAllocator())
690
0
        self.sendInterceptedResponse(bytes, metadata: metadata, promise: promise)
691
0
      } catch {
692
0
        promise?.fail(error)
693
0
        self.cancel(error: error)
694
0
      }
695
0
696
0
    case let .end(status, trailers):
697
0
      self.sendInterceptedStatus(status, metadata: trailers, promise: promise)
698
0
    }
699
0
  }
700
701
  @inlinable
702
  internal func sendInterceptedMetadata(
703
    _ metadata: HPACKHeaders,
704
    promise: EventLoopPromise<Void>?
705
0
  ) {
706
0
    switch self.interceptorStateMachine.interceptedResponseMetadata() {
707
0
    case .forward:
708
0
      if let responseWriter = self.responseWriter {
709
0
        let flush = self.flushNextHeaders
710
0
        self.flushNextHeaders = false
711
0
        responseWriter.sendMetadata(metadata, flush: flush, promise: promise)
712
0
      } else if let promise = promise {
713
0
        promise.fail(GRPCStatus.processingError)
714
0
      }
715
0
    case .cancel:
716
0
      self.cancel(error: nil)
717
0
    case .drop:
718
0
      ()
719
0
    }
720
0
  }
721
722
  @inlinable
723
  internal func sendInterceptedResponse(
724
    _ bytes: ByteBuffer,
725
    metadata: MessageMetadata,
726
    promise: EventLoopPromise<Void>?
727
0
  ) {
728
0
    switch self.interceptorStateMachine.interceptedResponseMessage() {
729
0
    case .forward:
730
0
      if let responseWriter = self.responseWriter {
731
0
        responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
732
0
      } else if let promise = promise {
733
0
        promise.fail(GRPCStatus.processingError)
734
0
      }
735
0
    case .cancel:
736
0
      self.cancel(error: nil)
737
0
    case .drop:
738
0
      ()
739
0
    }
740
0
  }
741
742
  @inlinable
743
  internal func sendInterceptedStatus(
744
    _ status: GRPCStatus,
745
    metadata: HPACKHeaders,
746
    promise: EventLoopPromise<Void>?
747
0
  ) {
748
0
    switch self.interceptorStateMachine.interceptedResponseStatus() {
749
0
    case .forward:
750
0
      if let responseWriter = self.responseWriter {
751
0
        responseWriter.sendEnd(status: status, trailers: metadata, promise: promise)
752
0
      } else if let promise = promise {
753
0
        promise.fail(GRPCStatus.processingError)
754
0
      }
755
0
    case .cancel:
756
0
      self.cancel(error: nil)
757
0
    case .drop:
758
0
      ()
759
0
    }
760
0
  }
761
}
762
763
// Sendability is unchecked as all mutable state is accessed/modified from an appropriate event
764
// loop.
765
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
766
extension AsyncServerHandler: @unchecked Sendable {}
767
768
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
769
extension AsyncServerHandler: AsyncServerCallContextProvider {
770
  @usableFromInline
771
0
  internal func setResponseHeaders(_ headers: HPACKHeaders) async throws {
772
0
    let completed = self.eventLoop.submit {
773
0
      if !self.handlerStateMachine.setResponseHeaders(headers) {
774
0
        throw GRPCStatus(
775
0
          code: .failedPrecondition,
776
0
          message: "Tried to send response headers in an invalid state"
777
0
        )
778
0
      }
779
0
    }
780
0
    try await completed.get()
781
0
  }
782
783
  @usableFromInline
784
0
  internal func acceptRPC(_ headers: HPACKHeaders) async {
785
0
    let completed = self.eventLoop.submit {
786
0
      guard self.handlerStateMachine.setResponseHeaders(headers) else { return }
787
0
788
0
      // Shh,it's a lie! We don't really have a message to send but the state machine doesn't know
789
0
      // (or care) about that. It will, however, tell us if we can send the headers or not.
790
0
      switch self.handlerStateMachine.sendMessage() {
791
0
      case let .intercept(.some(headers)):
792
0
        switch self.interceptorStateMachine.interceptResponseMetadata() {
793
0
        case .intercept:
794
0
          self.flushNextHeaders = true
795
0
          self.interceptors?.send(.metadata(headers), promise: nil)
796
0
        case .cancel:
797
0
          return self.cancel(error: nil)
798
0
        case .drop:
799
0
          ()
800
0
        }
801
0
802
0
      case .intercept(.none), .drop:
803
0
        // intercept(.none) means headers have already been sent; we should never hit this because
804
0
        // we guard on setting the response headers above.
805
0
        ()
806
0
      }
807
0
    }
808
0
    try? await completed.get()
809
0
  }
810
811
  @usableFromInline
812
0
  internal func setResponseTrailers(_ headers: HPACKHeaders) async throws {
813
0
    let completed = self.eventLoop.submit {
814
0
      self.handlerStateMachine.setResponseTrailers(headers)
815
0
    }
816
0
    try await completed.get()
817
0
  }
818
819
  @usableFromInline
820
0
  internal func setResponseCompression(_ enabled: Bool) async throws {
821
0
    let completed = self.eventLoop.submit {
822
0
      self.compressResponsesIfPossible = enabled
823
0
    }
824
0
    try await completed.get()
825
0
  }
826
827
  @usableFromInline
828
  func withUserInfo<Result: Sendable>(
829
    _ modify: @Sendable @escaping (UserInfo) throws -> Result
830
0
  ) async throws -> Result {
831
0
    let result = self.eventLoop.submit {
832
0
      try modify(self.userInfoRef.value)
833
0
    }
834
0
    return try await result.get()
835
0
  }
836
837
  @usableFromInline
838
  func withMutableUserInfo<Result: Sendable>(
839
    _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
840
0
  ) async throws -> Result {
841
0
    let result = self.eventLoop.submit {
842
0
      try modify(&self.userInfoRef.value)
843
0
    }
844
0
    return try await result.get()
845
0
  }
846
}
847
848
/// This protocol exists so that the generic server handler can be erased from the
849
/// `GRPCAsyncServerCallContext`.
850
///
851
/// It provides methods which update context on the async handler by first executing onto the
852
/// correct event loop.
853
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
854
@usableFromInline
855
protocol AsyncServerCallContextProvider: Sendable {
856
  func setResponseHeaders(_ headers: HPACKHeaders) async throws
857
  func acceptRPC(_ headers: HPACKHeaders) async
858
  func setResponseTrailers(_ trailers: HPACKHeaders) async throws
859
  func setResponseCompression(_ enabled: Bool) async throws
860
861
  func withUserInfo<Result: Sendable>(
862
    _ modify: @Sendable @escaping (UserInfo) throws -> Result
863
  ) async throws -> Result
864
865
  func withMutableUserInfo<Result: Sendable>(
866
    _ modify: @Sendable @escaping (inout UserInfo) throws -> Result
867
  ) async throws -> Result
868
}
869
870
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
871
@usableFromInline
872
internal struct ServerHandlerComponents<
873
  Request: Sendable,
874
  Response: Sendable,
875
  Delegate: NIOAsyncWriterSinkDelegate
876
> where Delegate.Element == (Response, Compression) {
877
  @usableFromInline
878
  internal typealias AsyncWriterSink = NIOAsyncWriter<(Response, Compression), Delegate>.Sink
879
880
  @usableFromInline
881
  internal typealias AsyncSequenceSource = NIOThrowingAsyncSequenceProducer<
882
    Request,
883
    Error,
884
    NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
885
    GRPCAsyncSequenceProducerDelegate
886
  >.Source
887
888
  @usableFromInline
889
  internal let task: Task<Void, Never>
890
  @usableFromInline
891
  internal let responseWriterSink: AsyncWriterSink
892
  @usableFromInline
893
  internal let requestSource: AsyncSequenceSource
894
895
  @inlinable
896
  init(
897
    requestSource: AsyncSequenceSource,
898
    responseWriterSink: AsyncWriterSink,
899
    task: Task<Void, Never>
900
0
  ) {
901
0
    self.task = task
902
0
    self.responseWriterSink = responseWriterSink
903
0
    self.requestSource = requestSource
904
0
  }
905
906
0
  func cancel() {
907
0
    // Cancel the request and response streams.
908
0
    //
909
0
    // The user handler is encouraged to check for cancellation, however, we should assume
910
0
    // they do not. Finishing the request source stops any more requests from being delivered
911
0
    // to the request stream, and finishing the writer sink will ensure no more responses are
912
0
    // written. This should reduce how long the user handler runs for as it can no longer do
913
0
    // anything useful.
914
0
    self.requestSource.finish()
915
0
    self.responseWriterSink.finish(error: CancellationError())
916
0
    self.task.cancel()
917
0
  }
918
}