Coverage Report

Created: 2025-06-24 06:59

/src/grpc-swift/Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2019, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP2
20
21
/// A bidirectional-streaming gRPC call. Each response is passed to the provided observer block.
22
///
23
/// Messages should be sent via the ``sendMessage(_:compression:)`` and ``sendMessages(_:compression:)`` methods; the stream of messages
24
/// must be terminated by calling ``sendEnd()`` to indicate the final message has been sent.
25
///
26
/// Note: while this object is a `struct`, its implementation delegates to ``Call``. It therefore
27
/// has reference semantics.
28
public struct BidirectionalStreamingCall<
29
  RequestPayload,
30
  ResponsePayload
31
>: StreamingRequestClientCall {
32
  private let call: Call<RequestPayload, ResponsePayload>
33
  private let responseParts: StreamingResponseParts<ResponsePayload>
34
35
  /// The options used to make the RPC.
36
0
  public var options: CallOptions {
37
0
    return self.call.options
38
0
  }
39
40
  /// The path used to make the RPC.
41
0
  public var path: String {
42
0
    return self.call.path
43
0
  }
44
45
  /// The `Channel` used to transport messages for this RPC.
46
0
  public var subchannel: EventLoopFuture<Channel> {
47
0
    return self.call.channel
48
0
  }
49
50
  /// The `EventLoop` this call is running on.
51
0
  public var eventLoop: EventLoop {
52
0
    return self.call.eventLoop
53
0
  }
54
55
  /// Cancel this RPC if it hasn't already completed.
56
0
  public func cancel(promise: EventLoopPromise<Void>?) {
57
0
    self.call.cancel(promise: promise)
58
0
  }
59
60
  // MARK: - Response Parts
61
62
  /// The initial metadata returned from the server.
63
0
  public var initialMetadata: EventLoopFuture<HPACKHeaders> {
64
0
    return self.responseParts.initialMetadata
65
0
  }
66
67
  /// The trailing metadata returned from the server.
68
0
  public var trailingMetadata: EventLoopFuture<HPACKHeaders> {
69
0
    return self.responseParts.trailingMetadata
70
0
  }
71
72
  /// The final status of the the RPC.
73
0
  public var status: EventLoopFuture<GRPCStatus> {
74
0
    return self.responseParts.status
75
0
  }
76
77
  internal init(
78
    call: Call<RequestPayload, ResponsePayload>,
79
    callback: @escaping (ResponsePayload) -> Void
80
0
  ) {
81
0
    self.call = call
82
0
    self.responseParts = StreamingResponseParts(on: call.eventLoop, callback)
83
0
  }
84
85
0
  internal func invoke() {
86
0
    self.call.invokeStreamingRequests(
87
0
      onStart: {},
88
0
      onError: self.responseParts.handleError(_:),
Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFys5Error_pcAA0C13ResponsePartsCyq_Gcfu_
Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFys5Error_pcAA0C13ResponsePartsCyq_Gcfu_ysAE_pcfu0_
89
0
      onResponsePart: self.responseParts.handle(_:)
Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFyAA22GRPCClientResponsePartOyq_GcAA0cG5PartsCyq_Gcfu1_
Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFyAA22GRPCClientResponsePartOyq_GcAA0cG5PartsCyq_Gcfu1_yAGcfu2_
90
0
    )
91
0
  }
92
93
  // MARK: - Requests
94
95
  /// Sends a message to the service.
96
  ///
97
  /// - Important: Callers must terminate the stream of messages by calling ``sendEnd()`` or
98
  ///   ``sendEnd(promise:)``.
99
  ///
100
  /// - Parameters:
101
  ///   - message: The message to send.
102
  ///   - compression: Whether compression should be used for this message. Ignored if compression
103
  ///     was not enabled for the RPC.
104
  ///   - promise: A promise to fulfill with the outcome of the send operation.
105
  public func sendMessage(
106
    _ message: RequestPayload,
107
    compression: Compression = .deferToCallDefault,
108
    promise: EventLoopPromise<Void>?
109
0
  ) {
110
0
    let compress = self.call.compress(compression)
111
0
    self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
112
0
  }
113
114
  /// Sends a sequence of messages to the service.
115
  ///
116
  /// - Important: Callers must terminate the stream of messages by calling ``sendEnd()`` or
117
  ///   ``sendEnd(promise:)``.
118
  ///
119
  /// - Parameters:
120
  ///   - messages: The sequence of messages to send.
121
  ///   - compression: Whether compression should be used for this message. Ignored if compression
122
  ///     was not enabled for the RPC.
123
  ///   - promise: A promise to fulfill with the outcome of the send operation. It will only succeed
124
  ///     if all messages were written successfully.
125
  public func sendMessages<S>(
126
    _ messages: S,
127
    compression: Compression = .deferToCallDefault,
128
    promise: EventLoopPromise<Void>?
129
0
  ) where S: Sequence, S.Element == RequestPayload {
130
0
    self.call.sendMessages(messages, compression: compression, promise: promise)
131
0
  }
132
133
  /// Terminates a stream of messages sent to the service.
134
  ///
135
  /// - Important: This should only ever be called once.
136
  /// - Parameter promise: A promise to be fulfilled when the end has been sent.
137
0
  public func sendEnd(promise: EventLoopPromise<Void>?) {
138
0
    self.call.send(.end, promise: promise)
139
0
  }
140
}