Coverage Report

Created: 2025-06-24 06:59

/src/grpc-swift/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
18
/// A type for the stream of request messages send to a gRPC server method.
19
///
20
/// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
21
/// method which allows you to create a stream that you can drive.
22
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
23
public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
24
  @usableFromInline
25
  internal typealias _AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
26
    Element,
27
    Error,
28
    NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
29
    GRPCAsyncSequenceProducerDelegate
30
  >
31
32
  /// A source used for driving a ``GRPCAsyncRequestStream`` during tests.
33
  public struct Source {
34
    @usableFromInline
35
    internal let continuation: AsyncThrowingStream<Element, Error>.Continuation
36
37
    @inlinable
38
0
    init(continuation: AsyncThrowingStream<Element, Error>.Continuation) {
39
0
      self.continuation = continuation
40
0
    }
41
42
    /// Yields the element to the request stream.
43
    ///
44
    /// - Parameter element: The element to yield to the request stream.
45
    @inlinable
46
0
    public func yield(_ element: Element) {
47
0
      self.continuation.yield(element)
48
0
    }
49
50
    /// Finished the request stream.
51
    @inlinable
52
0
    public func finish() {
53
0
      self.continuation.finish()
54
0
    }
55
56
    /// Finished the request stream.
57
    ///
58
    /// - Parameter error: An optional `Error` to finish the request stream with.
59
    @inlinable
60
0
    public func finish(throwing error: Error?) {
61
0
      self.continuation.finish(throwing: error)
62
0
    }
63
  }
64
65
  /// Simple struct for the return type of ``GRPCAsyncRequestStream/makeTestingRequestStream()``.
66
  ///
67
  /// This struct contains two properties:
68
  /// 1. The ``stream`` which is the actual ``GRPCAsyncRequestStream`` and should be passed to the method under testing.
69
  /// 2. The ``source`` which can be used to drive the stream.
70
  public struct TestingStream {
71
    /// The actual stream.
72
    public let stream: GRPCAsyncRequestStream<Element>
73
    /// The source used to drive the stream.
74
    public let source: Source
75
76
    @inlinable
77
0
    init(stream: GRPCAsyncRequestStream<Element>, source: Source) {
78
0
      self.stream = stream
79
0
      self.source = source
80
0
    }
81
  }
82
83
  @usableFromInline
84
  enum Backing: Sendable {
85
    case asyncStream(AsyncThrowingStream<Element, Error>)
86
    case throwingAsyncSequenceProducer(_AsyncSequenceProducer)
87
  }
88
89
  @usableFromInline
90
  internal let backing: Backing
91
92
  @inlinable
93
0
  internal init(_ sequence: _AsyncSequenceProducer) {
94
0
    self.backing = .throwingAsyncSequenceProducer(sequence)
95
0
  }
96
97
  @inlinable
98
0
  internal init(_ stream: AsyncThrowingStream<Element, Error>) {
99
0
    self.backing = .asyncStream(stream)
100
0
  }
101
102
  /// Creates a new testing stream.
103
  ///
104
  /// This is useful for writing unit tests for your gRPC method implementations since it allows you to drive the stream passed
105
  /// to your method.
106
  ///
107
  /// - Returns: A new ``TestingStream`` containing the actual ``GRPCAsyncRequestStream`` and a ``Source``.
108
  @inlinable
109
0
  public static func makeTestingRequestStream() -> TestingStream {
110
0
    var continuation: AsyncThrowingStream<Element, Error>.Continuation!
111
0
    let stream = AsyncThrowingStream<Element, Error> { continuation = $0 }
112
0
    let source = Source(continuation: continuation)
113
0
    let requestStream = Self(stream)
114
0
    return TestingStream(stream: requestStream, source: source)
115
0
  }
116
117
  @inlinable
118
0
  public func makeAsyncIterator() -> Iterator {
119
0
    switch self.backing {
120
0
    case let .asyncStream(stream):
121
0
      return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
122
0
    case let .throwingAsyncSequenceProducer(sequence):
123
0
      return Self.AsyncIterator(.throwingAsyncSequenceProducer(sequence.makeAsyncIterator()))
124
0
    }
125
0
  }
126
127
  public struct Iterator: AsyncIteratorProtocol {
128
    @usableFromInline
129
    enum BackingIterator {
130
      case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
131
      case throwingAsyncSequenceProducer(_AsyncSequenceProducer.AsyncIterator)
132
    }
133
134
    @usableFromInline
135
    internal var iterator: BackingIterator
136
137
    @usableFromInline
138
0
    internal init(_ iterator: BackingIterator) {
139
0
      self.iterator = iterator
140
0
    }
141
142
    @inlinable
143
0
    public mutating func next() async throws -> Element? {
144
0
      if Task.isCancelled { throw GRPCStatus(code: .cancelled) }
145
0
      switch self.iterator {
146
0
      case var .asyncStream(iterator):
147
0
        let element = try await iterator.next()
148
0
        self.iterator = .asyncStream(iterator)
149
0
        return element
150
0
      case let .throwingAsyncSequenceProducer(iterator):
151
0
        return try await iterator.next()
152
0
      }
153
0
    }
154
  }
155
}
156
157
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
158
extension GRPCAsyncRequestStream: Sendable where Element: Sendable {}