Coverage Report

Created: 2026-06-15 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift
Line
Count
Source
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
import NIOCore
18
import NIOHPACK
19
20
/// Async-await variant of ``ServerStreamingCall``.
21
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
22
public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable> {
23
  private let call: Call<Request, Response>
24
  private let responseParts: StreamingResponseParts<Response>
25
  private let responseSource:
26
    NIOThrowingAsyncSequenceProducer<
27
      Response,
28
      Error,
29
      NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
30
      GRPCAsyncSequenceProducerDelegate
31
    >.Source
32
33
  /// The stream of responses from the server.
34
  public let responseStream: GRPCAsyncResponseStream<Response>
35
36
  /// The options used to make the RPC.
37
0
  public var options: CallOptions {
38
0
    return self.call.options
39
0
  }
40
41
  /// The path used to make the RPC.
42
0
  public var path: String {
43
0
    return self.call.path
44
0
  }
45
46
  /// Cancel this RPC if it hasn't already completed.
47
0
  public func cancel() {
48
0
    self.call.cancel(promise: nil)
49
0
  }
50
51
  // MARK: - Response Parts
52
53
0
  private func withRPCCancellation<R: Sendable>(_ fn: () async throws -> R) async rethrows -> R {
54
0
    return try await withTaskCancellationHandler(operation: fn) {
55
0
      self.cancel()
56
0
    }
57
0
  }
58
59
  /// The initial metadata returned from the server.
60
  ///
61
  /// - Important: The initial metadata will only be available when the first response has been
62
  /// received. However, it is not necessary for the response to have been consumed before reading
63
  /// this property.
64
  public var initialMetadata: HPACKHeaders {
65
0
    get async throws {
66
0
      try await self.withRPCCancellation {
67
0
        try await self.responseParts.initialMetadata.get()
68
0
      }
69
0
    }
70
  }
71
72
  /// The trailing metadata returned from the server.
73
  ///
74
  /// - Important: Awaiting this property will suspend until the responses have been consumed.
75
  public var trailingMetadata: HPACKHeaders {
76
0
    get async throws {
77
0
      try await self.withRPCCancellation {
78
0
        try await self.responseParts.trailingMetadata.get()
79
0
      }
80
0
    }
81
  }
82
83
  /// The final status of the the RPC.
84
  ///
85
  /// - Important: Awaiting this property will suspend until the responses have been consumed.
86
  public var status: GRPCStatus {
87
0
    get async {
88
0
      // force-try acceptable because any error is encapsulated in a successful GRPCStatus future.
89
0
      await self.withRPCCancellation {
90
0
        try! await self.responseParts.status.get()
91
0
      }
92
0
    }
93
  }
94
95
0
  private init(call: Call<Request, Response>) {
96
0
    self.call = call
97
0
    // We ignore messages in the closure and instead feed them into the response source when we
98
0
    // invoke the `call`.
99
0
    self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
100
0
101
0
    let backpressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
102
0
      lowWatermark: 10,
103
0
      highWatermark: 50
104
0
    )
105
0
    let sequenceProducer = NIOThrowingAsyncSequenceProducer.makeSequence(
106
0
      elementType: Response.self,
107
0
      failureType: Error.self,
108
0
      backPressureStrategy: backpressureStrategy,
109
0
      delegate: GRPCAsyncSequenceProducerDelegate()
110
0
    )
111
0
112
0
    self.responseSource = sequenceProducer.source
113
0
    self.responseStream = .init(sequenceProducer.sequence)
114
0
  }
115
116
  /// We expose this as the only non-private initializer so that the caller
117
  /// knows that invocation is part of initialisation.
118
  internal static func makeAndInvoke(
119
    call: Call<Request, Response>,
120
    _ request: Request
121
0
  ) -> Self {
122
0
    let asyncCall = Self(call: call)
123
0
124
0
    asyncCall.call.invokeUnaryRequest(
125
0
      request,
126
0
      onStart: {},
127
0
      onError: { error in
128
0
        asyncCall.responseParts.handleError(error)
129
0
        asyncCall.responseSource.finish(error)
130
0
      },
131
0
      onResponsePart: AsyncCall.makeResponsePartHandler(
132
0
        responseParts: asyncCall.responseParts,
133
0
        responseSource: asyncCall.responseSource,
134
0
        requestStream: nil,
135
0
        requestType: Request.self
136
0
      )
137
0
    )
138
0
139
0
    return asyncCall
140
0
  }
141
}