/src/grpc-swift/Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | |
19 | | @usableFromInline |
20 | | internal final class ServerInterceptorPipeline<Request, Response> { |
21 | | /// The `EventLoop` this RPC is being executed on. |
22 | | @usableFromInline |
23 | | internal let eventLoop: EventLoop |
24 | | |
25 | | /// The path of the RPC in the format "/Service/Method", e.g. "/echo.Echo/Get". |
26 | | @usableFromInline |
27 | | internal let path: String |
28 | | |
29 | | /// The type of the RPC, e.g. "unary". |
30 | | @usableFromInline |
31 | | internal let type: GRPCCallType |
32 | | |
33 | | /// The remote peer's address. |
34 | | @usableFromInline |
35 | | internal let remoteAddress: SocketAddress? |
36 | | |
37 | | /// A logger. |
38 | | @usableFromInline |
39 | | internal let logger: Logger |
40 | | |
41 | | /// A reference to a 'UserInfo'. |
42 | | @usableFromInline |
43 | | internal let userInfoRef: Ref<UserInfo> |
44 | | |
45 | | /// A future which completes when the call closes. This may be used to register callbacks which |
46 | | /// free up resources used by the interceptor. |
47 | | @usableFromInline |
48 | | internal let closeFuture: EventLoopFuture<Void> |
49 | | |
50 | | /// Called when a response part has traversed the interceptor pipeline. |
51 | | @usableFromInline |
52 | | internal var _onResponsePart: |
53 | | Optional< |
54 | | ( |
55 | | GRPCServerResponsePart<Response>, |
56 | | EventLoopPromise<Void>? |
57 | | ) -> Void |
58 | | > |
59 | | |
60 | | /// Called when a request part has traversed the interceptor pipeline. |
61 | | @usableFromInline |
62 | | internal var _onRequestPart: Optional<(GRPCServerRequestPart<Request>) -> Void> |
63 | | |
64 | | /// The index before the first user interceptor context index. (always -1). |
65 | | @usableFromInline |
66 | | internal let _headIndex: Int |
67 | | |
68 | | /// The index after the last user interceptor context index (i.e. 'userContext.endIndex'). |
69 | | @usableFromInline |
70 | | internal let _tailIndex: Int |
71 | | |
72 | | /// Contexts for user provided interceptors. |
73 | | @usableFromInline |
74 | | internal var _userContexts: [ServerInterceptorContext<Request, Response>] |
75 | | |
76 | | /// Whether the interceptor pipeline is still open. It becomes closed after an 'end' response |
77 | | /// part has traversed the pipeline. |
78 | | @usableFromInline |
79 | 1.06M | internal var _isOpen = true |
80 | | |
81 | | /// The index of the next context on the inbound side of the context at the given index. |
82 | | @inlinable |
83 | 1.80M | internal func _nextInboundIndex(after index: Int) -> Int { |
84 | 1.80M | // Unchecked arithmetic is okay here: our greatest inbound index is '_tailIndex' but we will |
85 | 1.80M | // never ask for the inbound index after the tail. |
86 | 1.80M | assert(self._indexIsValid(index)) |
87 | 1.80M | return index &+ 1 |
88 | 1.80M | } |
89 | | |
90 | | /// The index of the next context on the outbound side of the context at the given index. |
91 | | @inlinable |
92 | 6.42M | internal func _nextOutboundIndex(after index: Int) -> Int { |
93 | 6.42M | // Unchecked arithmetic is okay here: our lowest outbound index is '_headIndex' but we will |
94 | 6.42M | // never ask for the outbound index after the head. |
95 | 6.42M | assert(self._indexIsValid(index)) |
96 | 6.42M | return index &- 1 |
97 | 6.42M | } |
98 | | |
99 | | /// Returns true of the index is in the range `_headIndex ... _tailIndex`. |
100 | | @inlinable |
101 | 1.77M | internal func _indexIsValid(_ index: Int) -> Bool { |
102 | 1.77M | return self._headIndex <= index && index <= self._tailIndex |
103 | 1.77M | } |
104 | | |
105 | | @inlinable |
106 | | internal init( |
107 | | logger: Logger, |
108 | | eventLoop: EventLoop, |
109 | | path: String, |
110 | | callType: GRPCCallType, |
111 | | remoteAddress: SocketAddress?, |
112 | | userInfoRef: Ref<UserInfo>, |
113 | | closeFuture: EventLoopFuture<Void>, |
114 | | interceptors: [ServerInterceptor<Request, Response>], |
115 | | onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void, |
116 | | onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void |
117 | 498k | ) { |
118 | 498k | self.logger = logger |
119 | 498k | self.eventLoop = eventLoop |
120 | 498k | self.path = path |
121 | 498k | self.type = callType |
122 | 498k | self.remoteAddress = remoteAddress |
123 | 498k | self.userInfoRef = userInfoRef |
124 | 498k | self.closeFuture = closeFuture |
125 | 498k | |
126 | 498k | self._onResponsePart = onResponsePart |
127 | 498k | self._onRequestPart = onRequestPart |
128 | 498k | |
129 | 498k | // Head comes before user interceptors. |
130 | 498k | self._headIndex = -1 |
131 | 498k | // Tail comes just after. |
132 | 498k | self._tailIndex = interceptors.endIndex |
133 | 498k | |
134 | 498k | // Make some contexts. |
135 | 498k | self._userContexts = [] |
136 | 498k | self._userContexts.reserveCapacity(interceptors.count) |
137 | 498k | |
138 | 498k | for index in 0 ..< interceptors.count { |
139 | 0 | let context = ServerInterceptorContext(for: interceptors[index], atIndex: index, in: self) |
140 | 0 | self._userContexts.append(context) |
141 | 498k | } |
142 | 498k | } |
143 | | |
144 | | /// Emit a request part message into the interceptor pipeline. |
145 | | /// |
146 | | /// - Parameter part: The part to emit into the pipeline. |
147 | | /// - Important: This *must* to be called from the `eventLoop`. |
148 | | @inlinable |
149 | 1.80M | internal func receive(_ part: GRPCServerRequestPart<Request>) { |
150 | 1.80M | self.invokeReceive(part, fromContextAtIndex: self._headIndex) |
151 | 1.80M | } |
152 | | |
153 | | /// Invoke receive on the appropriate context when called from the context at the given index. |
154 | | @inlinable |
155 | | internal func invokeReceive( |
156 | | _ part: GRPCServerRequestPart<Request>, |
157 | | fromContextAtIndex index: Int |
158 | 1.80M | ) { |
159 | 1.80M | self._invokeReceive(part, onContextAtIndex: self._nextInboundIndex(after: index)) |
160 | 1.80M | } |
161 | | |
162 | | /// Invoke receive on the context at the given index, if doing so is safe. |
163 | | @inlinable |
164 | | internal func _invokeReceive( |
165 | | _ part: GRPCServerRequestPart<Request>, |
166 | | onContextAtIndex index: Int |
167 | 1.80M | ) { |
168 | 1.80M | self.eventLoop.assertInEventLoop() |
169 | 1.80M | assert(self._indexIsValid(index)) |
170 | 1.80M | guard self._isOpen else { |
171 | 0 | return |
172 | 1.80M | } |
173 | 1.80M | |
174 | 1.80M | // We've checked the index. |
175 | 1.80M | self._invokeReceive(part, onContextAtUncheckedIndex: index) |
176 | 1.80M | } |
177 | | |
178 | | /// Invoke receive on the context at the given index, assuming that the index is valid and the |
179 | | /// pipeline is still open. |
180 | | @inlinable |
181 | | internal func _invokeReceive( |
182 | | _ part: GRPCServerRequestPart<Request>, |
183 | | onContextAtUncheckedIndex index: Int |
184 | 1.80M | ) { |
185 | 1.80M | switch index { |
186 | 1.80M | case self._headIndex: |
187 | 0 | // The next inbound index must exist, either for the tail or a user interceptor. |
188 | 0 | self._invokeReceive( |
189 | 0 | part, |
190 | 0 | onContextAtUncheckedIndex: self._nextInboundIndex(after: self._headIndex) |
191 | 0 | ) |
192 | 1.80M | |
193 | 1.80M | case self._tailIndex: |
194 | 1.80M | self._onRequestPart?(part) |
195 | 1.80M | |
196 | 1.80M | default: |
197 | 0 | self._userContexts[index].invokeReceive(part) |
198 | 1.80M | } |
199 | 1.80M | } |
200 | | |
201 | | /// Write a response message into the interceptor pipeline. |
202 | | /// |
203 | | /// - Parameters: |
204 | | /// - part: The response part to sent. |
205 | | /// - promise: A promise to complete when the response part has been successfully written. |
206 | | /// - Important: This *must* to be called from the `eventLoop`. |
207 | | @inlinable |
208 | 6.42M | internal func send(_ part: GRPCServerResponsePart<Response>, promise: EventLoopPromise<Void>?) { |
209 | 6.42M | self.invokeSend(part, promise: promise, fromContextAtIndex: self._tailIndex) |
210 | 6.42M | } |
211 | | |
212 | | /// Invoke send on the appropriate context when called from the context at the given index. |
213 | | @inlinable |
214 | | internal func invokeSend( |
215 | | _ part: GRPCServerResponsePart<Response>, |
216 | | promise: EventLoopPromise<Void>?, |
217 | | fromContextAtIndex index: Int |
218 | 6.42M | ) { |
219 | 6.42M | self._invokeSend( |
220 | 6.42M | part, |
221 | 6.42M | promise: promise, |
222 | 6.42M | onContextAtIndex: self._nextOutboundIndex(after: index) |
223 | 6.42M | ) |
224 | 6.42M | } |
225 | | |
226 | | /// Invoke send on the context at the given index, if doing so is safe. Fails the `promise` if it |
227 | | /// is not safe to do so. |
228 | | @inlinable |
229 | | internal func _invokeSend( |
230 | | _ part: GRPCServerResponsePart<Response>, |
231 | | promise: EventLoopPromise<Void>?, |
232 | | onContextAtIndex index: Int |
233 | 6.42M | ) { |
234 | 6.42M | self.eventLoop.assertInEventLoop() |
235 | 6.42M | assert(self._indexIsValid(index)) |
236 | 6.42M | guard self._isOpen else { |
237 | 0 | promise?.fail(GRPCError.AlreadyComplete()) |
238 | 0 | return |
239 | 6.42M | } |
240 | 6.42M | |
241 | 6.42M | self._invokeSend(uncheckedIndex: index, part, promise: promise) |
242 | 6.42M | } |
243 | | |
244 | | /// Invoke send on the context at the given index, assuming that the index is valid and the |
245 | | /// pipeline is still open. |
246 | | @inlinable |
247 | | internal func _invokeSend( |
248 | | uncheckedIndex index: Int, |
249 | | _ part: GRPCServerResponsePart<Response>, |
250 | | promise: EventLoopPromise<Void>? |
251 | 6.42M | ) { |
252 | 6.42M | switch index { |
253 | 6.42M | case self._headIndex: |
254 | 6.42M | let onResponsePart = self._onResponsePart |
255 | 6.42M | if part.isEnd { |
256 | 498k | self.close() |
257 | 498k | } |
258 | 6.42M | onResponsePart?(part, promise) |
259 | 6.42M | |
260 | 6.42M | case self._tailIndex: |
261 | 0 | // The next outbound index must exist: it will be the head or a user interceptor. |
262 | 0 | self._invokeSend( |
263 | 0 | uncheckedIndex: self._nextOutboundIndex(after: self._tailIndex), |
264 | 0 | part, |
265 | 0 | promise: promise |
266 | 0 | ) |
267 | 6.42M | |
268 | 6.42M | default: |
269 | 0 | self._userContexts[index].invokeSend(part, promise: promise) |
270 | 6.42M | } |
271 | 6.42M | } |
272 | | |
273 | | @inlinable |
274 | 498k | internal func close() { |
275 | 498k | // We're no longer open. |
276 | 498k | self._isOpen = false |
277 | 498k | // Each context hold a ref to the pipeline; break the retain cycle. |
278 | 498k | self._userContexts.removeAll() |
279 | 498k | // Drop the refs to the server handler. |
280 | 498k | self._onRequestPart = nil |
281 | 498k | self._onResponsePart = nil |
282 | 498k | } |
283 | | } |
284 | | |
285 | | extension ServerInterceptorContext { |
286 | | @inlinable |
287 | 0 | internal func invokeReceive(_ part: GRPCServerRequestPart<Request>) { |
288 | 0 | self.interceptor.receive(part, context: self) |
289 | 0 | } |
290 | | |
291 | | @inlinable |
292 | | internal func invokeSend( |
293 | | _ part: GRPCServerResponsePart<Response>, |
294 | | promise: EventLoopPromise<Void>? |
295 | 0 | ) { |
296 | 0 | self.interceptor.send(part, promise: promise, context: self) |
297 | 0 | } |
298 | | } |