/src/grpc-swift/Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2019, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | import NIOHPACK |
19 | | import NIOHTTP2 |
20 | | |
21 | | /// A bidirectional-streaming gRPC call. Each response is passed to the provided observer block. |
22 | | /// |
23 | | /// Messages should be sent via the ``sendMessage(_:compression:)`` and ``sendMessages(_:compression:)`` methods; the stream of messages |
24 | | /// must be terminated by calling ``sendEnd()`` to indicate the final message has been sent. |
25 | | /// |
26 | | /// Note: while this object is a `struct`, its implementation delegates to ``Call``. It therefore |
27 | | /// has reference semantics. |
28 | | public struct BidirectionalStreamingCall< |
29 | | RequestPayload, |
30 | | ResponsePayload |
31 | | >: StreamingRequestClientCall { |
32 | | private let call: Call<RequestPayload, ResponsePayload> |
33 | | private let responseParts: StreamingResponseParts<ResponsePayload> |
34 | | |
35 | | /// The options used to make the RPC. |
36 | 0 | public var options: CallOptions { |
37 | 0 | return self.call.options |
38 | 0 | } |
39 | | |
40 | | /// The path used to make the RPC. |
41 | 0 | public var path: String { |
42 | 0 | return self.call.path |
43 | 0 | } |
44 | | |
45 | | /// The `Channel` used to transport messages for this RPC. |
46 | 0 | public var subchannel: EventLoopFuture<Channel> { |
47 | 0 | return self.call.channel |
48 | 0 | } |
49 | | |
50 | | /// The `EventLoop` this call is running on. |
51 | 0 | public var eventLoop: EventLoop { |
52 | 0 | return self.call.eventLoop |
53 | 0 | } |
54 | | |
55 | | /// Cancel this RPC if it hasn't already completed. |
56 | 0 | public func cancel(promise: EventLoopPromise<Void>?) { |
57 | 0 | self.call.cancel(promise: promise) |
58 | 0 | } |
59 | | |
60 | | // MARK: - Response Parts |
61 | | |
62 | | /// The initial metadata returned from the server. |
63 | 0 | public var initialMetadata: EventLoopFuture<HPACKHeaders> { |
64 | 0 | return self.responseParts.initialMetadata |
65 | 0 | } |
66 | | |
67 | | /// The trailing metadata returned from the server. |
68 | 0 | public var trailingMetadata: EventLoopFuture<HPACKHeaders> { |
69 | 0 | return self.responseParts.trailingMetadata |
70 | 0 | } |
71 | | |
72 | | /// The final status of the the RPC. |
73 | 0 | public var status: EventLoopFuture<GRPCStatus> { |
74 | 0 | return self.responseParts.status |
75 | 0 | } |
76 | | |
77 | | internal init( |
78 | | call: Call<RequestPayload, ResponsePayload>, |
79 | | callback: @escaping (ResponsePayload) -> Void |
80 | 0 | ) { |
81 | 0 | self.call = call |
82 | 0 | self.responseParts = StreamingResponseParts(on: call.eventLoop, callback) |
83 | 0 | } |
84 | | |
85 | 0 | internal func invoke() { |
86 | 0 | self.call.invokeStreamingRequests( |
87 | 0 | onStart: {}, |
88 | 0 | onError: self.responseParts.handleError(_:), Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFys5Error_pcAA0C13ResponsePartsCyq_Gcfu_ Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFys5Error_pcAA0C13ResponsePartsCyq_Gcfu_ysAE_pcfu0_ |
89 | 0 | onResponsePart: self.responseParts.handle(_:) Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFyAA22GRPCClientResponsePartOyq_GcAA0cG5PartsCyq_Gcfu1_ Unexecuted instantiation: $s4GRPC26BidirectionalStreamingCallV6invokeyyFyAA22GRPCClientResponsePartOyq_GcAA0cG5PartsCyq_Gcfu1_yAGcfu2_ |
90 | 0 | ) |
91 | 0 | } |
92 | | |
93 | | // MARK: - Requests |
94 | | |
95 | | /// Sends a message to the service. |
96 | | /// |
97 | | /// - Important: Callers must terminate the stream of messages by calling ``sendEnd()`` or |
98 | | /// ``sendEnd(promise:)``. |
99 | | /// |
100 | | /// - Parameters: |
101 | | /// - message: The message to send. |
102 | | /// - compression: Whether compression should be used for this message. Ignored if compression |
103 | | /// was not enabled for the RPC. |
104 | | /// - promise: A promise to fulfill with the outcome of the send operation. |
105 | | public func sendMessage( |
106 | | _ message: RequestPayload, |
107 | | compression: Compression = .deferToCallDefault, |
108 | | promise: EventLoopPromise<Void>? |
109 | 0 | ) { |
110 | 0 | let compress = self.call.compress(compression) |
111 | 0 | self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise) |
112 | 0 | } |
113 | | |
114 | | /// Sends a sequence of messages to the service. |
115 | | /// |
116 | | /// - Important: Callers must terminate the stream of messages by calling ``sendEnd()`` or |
117 | | /// ``sendEnd(promise:)``. |
118 | | /// |
119 | | /// - Parameters: |
120 | | /// - messages: The sequence of messages to send. |
121 | | /// - compression: Whether compression should be used for this message. Ignored if compression |
122 | | /// was not enabled for the RPC. |
123 | | /// - promise: A promise to fulfill with the outcome of the send operation. It will only succeed |
124 | | /// if all messages were written successfully. |
125 | | public func sendMessages<S>( |
126 | | _ messages: S, |
127 | | compression: Compression = .deferToCallDefault, |
128 | | promise: EventLoopPromise<Void>? |
129 | 0 | ) where S: Sequence, S.Element == RequestPayload { |
130 | 0 | self.call.sendMessages(messages, compression: compression, promise: promise) |
131 | 0 | } |
132 | | |
133 | | /// Terminates a stream of messages sent to the service. |
134 | | /// |
135 | | /// - Important: This should only ever be called once. |
136 | | /// - Parameter promise: A promise to be fulfilled when the end has been sent. |
137 | 0 | public func sendEnd(promise: EventLoopPromise<Void>?) { |
138 | 0 | self.call.send(.end, promise: promise) |
139 | 0 | } |
140 | | } |