Coverage Report

Created: 2025-09-04 06:32

/src/grpc-swift/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
18
/// Writer for server-streaming RPC handlers to provide responses.
19
///
20
/// To enable testability this type provides a static ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``
21
/// method which allows you to create a stream that you can drive.
22
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
23
public struct GRPCAsyncResponseStreamWriter<Response: Sendable>: Sendable {
24
  @usableFromInline
25
  internal typealias AsyncWriter = NIOAsyncWriter<
26
    (Response, Compression),
27
    GRPCAsyncWriterSinkDelegate<(Response, Compression)>
28
  >
29
30
  /// An `AsyncSequence` backing a ``GRPCAsyncResponseStreamWriter`` for testing purposes.
31
  ///
32
  /// - Important: This `AsyncSequence` is never finishing.
33
  public struct ResponseStream: AsyncSequence {
34
    public typealias Element = (Response, Compression)
35
36
    @usableFromInline
37
    internal let stream: AsyncStream<(Response, Compression)>
38
39
    @usableFromInline
40
    internal let continuation: AsyncStream<(Response, Compression)>.Continuation
41
42
    @inlinable
43
    init(
44
      stream: AsyncStream<(Response, Compression)>,
45
      continuation: AsyncStream<(Response, Compression)>.Continuation
46
0
    ) {
47
0
      self.stream = stream
48
0
      self.continuation = continuation
49
0
    }
50
51
0
    public func makeAsyncIterator() -> AsyncIterator {
52
0
      AsyncIterator(iterator: self.stream.makeAsyncIterator())
53
0
    }
54
55
    /// Finishes the response stream.
56
    ///
57
    /// This is useful in tests to finish the stream after the async method finished and allows you to collect all written responses.
58
0
    public func finish() {
59
0
      self.continuation.finish()
60
0
    }
61
62
    public struct AsyncIterator: AsyncIteratorProtocol {
63
      @usableFromInline
64
      internal var iterator: AsyncStream<(Response, Compression)>.AsyncIterator
65
66
      @inlinable
67
0
      init(iterator: AsyncStream<(Response, Compression)>.AsyncIterator) {
68
0
        self.iterator = iterator
69
0
      }
70
71
0
      public mutating func next() async -> Element? {
72
0
        await self.iterator.next()
73
0
      }
74
    }
75
  }
76
77
  /// Simple struct for the return type of ``GRPCAsyncResponseStreamWriter/makeTestingResponseStreamWriter()``.
78
  ///
79
  /// This struct contains two properties:
80
  /// 1. The ``writer`` which is the actual ``GRPCAsyncResponseStreamWriter`` and should be passed to the method under testing.
81
  /// 2. The ``stream`` which can be used to observe the written responses.
82
  public struct TestingStreamWriter {
83
    /// The actual writer.
84
    public let writer: GRPCAsyncResponseStreamWriter<Response>
85
    /// The written responses in a stream.
86
    ///
87
    /// - Important: This `AsyncSequence` is never finishing.
88
    public let stream: ResponseStream
89
90
    @inlinable
91
0
    init(writer: GRPCAsyncResponseStreamWriter<Response>, stream: ResponseStream) {
92
0
      self.writer = writer
93
0
      self.stream = stream
94
0
    }
95
  }
96
97
  @usableFromInline
98
  enum Backing: Sendable {
99
    case asyncWriter(AsyncWriter)
100
    case closure(@Sendable ((Response, Compression)) async -> Void)
101
  }
102
103
  @usableFromInline
104
  internal let backing: Backing
105
106
  @inlinable
107
0
  internal init(wrapping asyncWriter: AsyncWriter) {
108
0
    self.backing = .asyncWriter(asyncWriter)
109
0
  }
110
111
  @inlinable
112
0
  internal init(onWrite: @escaping @Sendable ((Response, Compression)) async -> Void) {
113
0
    self.backing = .closure(onWrite)
114
0
  }
115
116
  @inlinable
117
  public func send(
118
    _ response: Response,
119
    compression: Compression = .deferToCallDefault
120
0
  ) async throws {
121
0
    switch self.backing {
122
0
    case let .asyncWriter(writer):
123
0
      try await writer.yield((response, compression))
124
0
125
0
    case let .closure(closure):
126
0
      await closure((response, compression))
127
0
    }
128
0
  }
129
130
  @inlinable
131
  public func send<S: Sequence>(
132
    contentsOf responses: S,
133
    compression: Compression = .deferToCallDefault
134
0
  ) async throws where S.Element == Response {
135
0
    let responsesWithCompression = responses.lazy.map { ($0, compression) }
136
0
    switch self.backing {
137
0
    case let .asyncWriter(writer):
138
0
      try await writer.yield(contentsOf: responsesWithCompression)
139
0
140
0
    case let .closure(closure):
141
0
      for response in responsesWithCompression {
142
0
        await closure(response)
143
0
      }
144
0
    }
145
0
  }
146
147
  /// Creates a new `GRPCAsyncResponseStreamWriter` backed by a ``ResponseStream``.
148
  /// This is mostly useful for testing purposes where one wants to observe the written responses.
149
  ///
150
  /// - Note: For most tests it is useful to call ``ResponseStream/finish()`` after the async method under testing
151
  /// resumed. This allows you to easily collect all written responses.
152
  @inlinable
153
0
  public static func makeTestingResponseStreamWriter() -> TestingStreamWriter {
154
0
    var continuation: AsyncStream<(Response, Compression)>.Continuation!
155
0
    let asyncStream = AsyncStream<(Response, Compression)> { cont in
156
0
      continuation = cont
157
0
    }
158
0
    let writer = Self.init { [continuation] in
159
0
      continuation!.yield($0)
160
0
    }
161
0
    let responseStream = ResponseStream(
162
0
      stream: asyncStream,
163
0
      continuation: continuation
164
0
    )
165
0
166
0
    return TestingStreamWriter(writer: writer, stream: responseStream)
167
0
  }
168
}