Coverage Report

Created: 2026-02-11 07:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
19
@usableFromInline
20
internal final class ServerInterceptorPipeline<Request, Response> {
21
  /// The `EventLoop` this RPC is being executed on.
22
  @usableFromInline
23
  internal let eventLoop: EventLoop
24
25
  /// The path of the RPC in the format "/Service/Method", e.g. "/echo.Echo/Get".
26
  @usableFromInline
27
  internal let path: String
28
29
  /// The type of the RPC, e.g. "unary".
30
  @usableFromInline
31
  internal let type: GRPCCallType
32
33
  /// The remote peer's address.
34
  @usableFromInline
35
  internal let remoteAddress: SocketAddress?
36
37
  /// A logger.
38
  @usableFromInline
39
  internal let logger: Logger
40
41
  /// A reference to a 'UserInfo'.
42
  @usableFromInline
43
  internal let userInfoRef: Ref<UserInfo>
44
45
  /// A future which completes when the call closes. This may be used to register callbacks which
46
  /// free up resources used by the interceptor.
47
  @usableFromInline
48
  internal let closeFuture: EventLoopFuture<Void>
49
50
  /// Called when a response part has traversed the interceptor pipeline.
51
  @usableFromInline
52
  internal var _onResponsePart:
53
    Optional<
54
      (
55
        GRPCServerResponsePart<Response>,
56
        EventLoopPromise<Void>?
57
      ) -> Void
58
    >
59
60
  /// Called when a request part has traversed the interceptor pipeline.
61
  @usableFromInline
62
  internal var _onRequestPart: Optional<(GRPCServerRequestPart<Request>) -> Void>
63
64
  /// The index before the first user interceptor context index. (always -1).
65
  @usableFromInline
66
  internal let _headIndex: Int
67
68
  /// The index after the last user interceptor context index (i.e. 'userContext.endIndex').
69
  @usableFromInline
70
  internal let _tailIndex: Int
71
72
  /// Contexts for user provided interceptors.
73
  @usableFromInline
74
  internal var _userContexts: [ServerInterceptorContext<Request, Response>]
75
76
  /// Whether the interceptor pipeline is still open. It becomes closed after an 'end' response
77
  /// part has traversed the pipeline.
78
  @usableFromInline
79
1.06M
  internal var _isOpen = true
80
81
  /// The index of the next context on the inbound side of the context at the given index.
82
  @inlinable
83
1.80M
  internal func _nextInboundIndex(after index: Int) -> Int {
84
1.80M
    // Unchecked arithmetic is okay here: our greatest inbound index is '_tailIndex' but we will
85
1.80M
    // never ask for the inbound index after the tail.
86
1.80M
    assert(self._indexIsValid(index))
87
1.80M
    return index &+ 1
88
1.80M
  }
89
90
  /// The index of the next context on the outbound side of the context at the given index.
91
  @inlinable
92
6.42M
  internal func _nextOutboundIndex(after index: Int) -> Int {
93
6.42M
    // Unchecked arithmetic is okay here: our lowest outbound index is '_headIndex' but we will
94
6.42M
    // never ask for the outbound index after the head.
95
6.42M
    assert(self._indexIsValid(index))
96
6.42M
    return index &- 1
97
6.42M
  }
98
99
  /// Returns true of the index is in the range `_headIndex ... _tailIndex`.
100
  @inlinable
101
1.77M
  internal func _indexIsValid(_ index: Int) -> Bool {
102
1.77M
    return self._headIndex <= index && index <= self._tailIndex
103
1.77M
  }
104
105
  @inlinable
106
  internal init(
107
    logger: Logger,
108
    eventLoop: EventLoop,
109
    path: String,
110
    callType: GRPCCallType,
111
    remoteAddress: SocketAddress?,
112
    userInfoRef: Ref<UserInfo>,
113
    closeFuture: EventLoopFuture<Void>,
114
    interceptors: [ServerInterceptor<Request, Response>],
115
    onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
116
    onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
117
498k
  ) {
118
498k
    self.logger = logger
119
498k
    self.eventLoop = eventLoop
120
498k
    self.path = path
121
498k
    self.type = callType
122
498k
    self.remoteAddress = remoteAddress
123
498k
    self.userInfoRef = userInfoRef
124
498k
    self.closeFuture = closeFuture
125
498k
126
498k
    self._onResponsePart = onResponsePart
127
498k
    self._onRequestPart = onRequestPart
128
498k
129
498k
    // Head comes before user interceptors.
130
498k
    self._headIndex = -1
131
498k
    // Tail comes just after.
132
498k
    self._tailIndex = interceptors.endIndex
133
498k
134
498k
    // Make some contexts.
135
498k
    self._userContexts = []
136
498k
    self._userContexts.reserveCapacity(interceptors.count)
137
498k
138
498k
    for index in 0 ..< interceptors.count {
139
0
      let context = ServerInterceptorContext(for: interceptors[index], atIndex: index, in: self)
140
0
      self._userContexts.append(context)
141
498k
    }
142
498k
  }
143
144
  /// Emit a request part message into the interceptor pipeline.
145
  ///
146
  /// - Parameter part: The part to emit into the pipeline.
147
  /// - Important: This *must* to be called from the `eventLoop`.
148
  @inlinable
149
1.80M
  internal func receive(_ part: GRPCServerRequestPart<Request>) {
150
1.80M
    self.invokeReceive(part, fromContextAtIndex: self._headIndex)
151
1.80M
  }
152
153
  /// Invoke receive on the appropriate context when called from the context at the given index.
154
  @inlinable
155
  internal func invokeReceive(
156
    _ part: GRPCServerRequestPart<Request>,
157
    fromContextAtIndex index: Int
158
1.80M
  ) {
159
1.80M
    self._invokeReceive(part, onContextAtIndex: self._nextInboundIndex(after: index))
160
1.80M
  }
161
162
  /// Invoke receive on the context at the given index, if doing so is safe.
163
  @inlinable
164
  internal func _invokeReceive(
165
    _ part: GRPCServerRequestPart<Request>,
166
    onContextAtIndex index: Int
167
1.80M
  ) {
168
1.80M
    self.eventLoop.assertInEventLoop()
169
1.80M
    assert(self._indexIsValid(index))
170
1.80M
    guard self._isOpen else {
171
0
      return
172
1.80M
    }
173
1.80M
174
1.80M
    // We've checked the index.
175
1.80M
    self._invokeReceive(part, onContextAtUncheckedIndex: index)
176
1.80M
  }
177
178
  /// Invoke receive on the context at the given index, assuming that the index is valid and the
179
  /// pipeline is still open.
180
  @inlinable
181
  internal func _invokeReceive(
182
    _ part: GRPCServerRequestPart<Request>,
183
    onContextAtUncheckedIndex index: Int
184
1.80M
  ) {
185
1.80M
    switch index {
186
1.80M
    case self._headIndex:
187
0
      // The next inbound index must exist, either for the tail or a user interceptor.
188
0
      self._invokeReceive(
189
0
        part,
190
0
        onContextAtUncheckedIndex: self._nextInboundIndex(after: self._headIndex)
191
0
      )
192
1.80M
193
1.80M
    case self._tailIndex:
194
1.80M
      self._onRequestPart?(part)
195
1.80M
196
1.80M
    default:
197
0
      self._userContexts[index].invokeReceive(part)
198
1.80M
    }
199
1.80M
  }
200
201
  /// Write a response message into the interceptor pipeline.
202
  ///
203
  /// - Parameters:
204
  ///   - part: The response part to sent.
205
  ///   - promise: A promise to complete when the response part has been successfully written.
206
  /// - Important: This *must* to be called from the `eventLoop`.
207
  @inlinable
208
6.42M
  internal func send(_ part: GRPCServerResponsePart<Response>, promise: EventLoopPromise<Void>?) {
209
6.42M
    self.invokeSend(part, promise: promise, fromContextAtIndex: self._tailIndex)
210
6.42M
  }
211
212
  /// Invoke send on the appropriate context when called from the context at the given index.
213
  @inlinable
214
  internal func invokeSend(
215
    _ part: GRPCServerResponsePart<Response>,
216
    promise: EventLoopPromise<Void>?,
217
    fromContextAtIndex index: Int
218
6.42M
  ) {
219
6.42M
    self._invokeSend(
220
6.42M
      part,
221
6.42M
      promise: promise,
222
6.42M
      onContextAtIndex: self._nextOutboundIndex(after: index)
223
6.42M
    )
224
6.42M
  }
225
226
  /// Invoke send on the context at the given index, if doing so is safe. Fails the `promise` if it
227
  /// is not safe to do so.
228
  @inlinable
229
  internal func _invokeSend(
230
    _ part: GRPCServerResponsePart<Response>,
231
    promise: EventLoopPromise<Void>?,
232
    onContextAtIndex index: Int
233
6.42M
  ) {
234
6.42M
    self.eventLoop.assertInEventLoop()
235
6.42M
    assert(self._indexIsValid(index))
236
6.42M
    guard self._isOpen else {
237
0
      promise?.fail(GRPCError.AlreadyComplete())
238
0
      return
239
6.42M
    }
240
6.42M
241
6.42M
    self._invokeSend(uncheckedIndex: index, part, promise: promise)
242
6.42M
  }
243
244
  /// Invoke send on the context at the given index, assuming that the index is valid and the
245
  /// pipeline is still open.
246
  @inlinable
247
  internal func _invokeSend(
248
    uncheckedIndex index: Int,
249
    _ part: GRPCServerResponsePart<Response>,
250
    promise: EventLoopPromise<Void>?
251
6.42M
  ) {
252
6.42M
    switch index {
253
6.42M
    case self._headIndex:
254
6.42M
      let onResponsePart = self._onResponsePart
255
6.42M
      if part.isEnd {
256
498k
        self.close()
257
498k
      }
258
6.42M
      onResponsePart?(part, promise)
259
6.42M
260
6.42M
    case self._tailIndex:
261
0
      // The next outbound index must exist: it will be the head or a user interceptor.
262
0
      self._invokeSend(
263
0
        uncheckedIndex: self._nextOutboundIndex(after: self._tailIndex),
264
0
        part,
265
0
        promise: promise
266
0
      )
267
6.42M
268
6.42M
    default:
269
0
      self._userContexts[index].invokeSend(part, promise: promise)
270
6.42M
    }
271
6.42M
  }
272
273
  @inlinable
274
498k
  internal func close() {
275
498k
    // We're no longer open.
276
498k
    self._isOpen = false
277
498k
    // Each context hold a ref to the pipeline; break the retain cycle.
278
498k
    self._userContexts.removeAll()
279
498k
    // Drop the refs to the server handler.
280
498k
    self._onRequestPart = nil
281
498k
    self._onResponsePart = nil
282
498k
  }
283
}
284
285
extension ServerInterceptorContext {
286
  @inlinable
287
0
  internal func invokeReceive(_ part: GRPCServerRequestPart<Request>) {
288
0
    self.interceptor.receive(part, context: self)
289
0
  }
290
291
  @inlinable
292
  internal func invokeSend(
293
    _ part: GRPCServerResponsePart<Response>,
294
    promise: EventLoopPromise<Void>?
295
0
  ) {
296
0
    self.interceptor.send(part, promise: promise, context: self)
297
0
  }
298
}