Coverage Report

Created: 2025-09-08 07:07

/src/grpc-swift/Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOHPACK
18
19
public final class UnaryServerHandler<
20
  Serializer: MessageSerializer,
21
  Deserializer: MessageDeserializer
22
>: GRPCServerHandlerProtocol {
23
  public typealias Request = Deserializer.Output
24
  public typealias Response = Serializer.Input
25
26
  /// A response serializer.
27
  @usableFromInline
28
  internal let serializer: Serializer
29
30
  /// A request deserializer.
31
  @usableFromInline
32
  internal let deserializer: Deserializer
33
34
  /// A pipeline of user provided interceptors.
35
  @usableFromInline
36
  internal var interceptors: ServerInterceptorPipeline<Request, Response>!
37
38
  /// The context required in order create the function.
39
  @usableFromInline
40
  internal let context: CallHandlerContext
41
42
  /// A reference to a `UserInfo`.
43
  @usableFromInline
44
  internal let userInfoRef: Ref<UserInfo>
45
46
  /// The user provided function to execute.
47
  @usableFromInline
48
  internal let userFunction: (Request, StatusOnlyCallContext) -> EventLoopFuture<Response>
49
50
  /// The state of the function invocation.
51
  @usableFromInline
52
164k
  internal var state: State = .idle
53
54
  @usableFromInline
55
  internal enum State {
56
    // Initial state. Nothing has happened yet.
57
    case idle
58
    // Headers have been received and now we're holding a context with which to invoke the user
59
    // function when we receive a message.
60
    case createdContext(UnaryResponseCallContext<Response>)
61
    // The user function has been invoked, we're waiting for the response.
62
    case invokedFunction(UnaryResponseCallContext<Response>)
63
    // The function has completed or we are no longer proceeding with execution (because of an error
64
    // or unexpected closure).
65
    case completed
66
  }
67
68
  @inlinable
69
  public init(
70
    context: CallHandlerContext,
71
    requestDeserializer: Deserializer,
72
    responseSerializer: Serializer,
73
    interceptors: [ServerInterceptor<Request, Response>],
74
    userFunction: @escaping (Request, StatusOnlyCallContext) -> EventLoopFuture<Response>
75
81.7k
  ) {
76
81.7k
    self.userFunction = userFunction
77
81.7k
    self.serializer = responseSerializer
78
81.7k
    self.deserializer = requestDeserializer
79
81.7k
    self.context = context
80
81.7k
81
81.7k
    let userInfoRef = Ref(UserInfo())
82
81.7k
    self.userInfoRef = userInfoRef
83
81.7k
    self.interceptors = ServerInterceptorPipeline(
84
81.7k
      logger: context.logger,
85
81.7k
      eventLoop: context.eventLoop,
86
81.7k
      path: context.path,
87
81.7k
      callType: .unary,
88
81.7k
      remoteAddress: context.remoteAddress,
89
81.7k
      userInfoRef: userInfoRef,
90
81.7k
      closeFuture: context.closeFuture,
91
81.7k
      interceptors: interceptors,
92
215k
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_
Line
Count
Source
92
81.7k
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA_cfu0_
Line
Count
Source
92
133k
      onRequestPart: self.receiveInterceptedPart(_:),
93
271k
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA22GRPCServerResponsePartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_
Line
Count
Source
93
81.7k
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC18UnaryServerHandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0C11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAQGAO_AA010StatusOnlymN0_ptctcfcyAA22GRPCServerResponsePartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_yA__A3_tcfu2_
Line
Count
Source
93
189k
      onResponsePart: self.sendInterceptedPart(_:promise:)
94
81.7k
    )
95
81.7k
  }
96
97
  // MARK: - Public API: gRPC to Interceptors
98
99
  @inlinable
100
32.0k
  public func receiveMetadata(_ metadata: HPACKHeaders) {
101
32.0k
    self.interceptors.receive(.metadata(metadata))
102
32.0k
  }
103
104
  @inlinable
105
19.0k
  public func receiveMessage(_ bytes: ByteBuffer) {
106
19.0k
    do {
107
19.0k
      let message = try self.deserializer.deserialize(byteBuffer: bytes)
108
19.0k
      self.interceptors.receive(.message(message))
109
19.0k
    } catch {
110
7.66k
      self.handleError(error)
111
19.0k
    }
112
19.0k
  }
113
114
  @inlinable
115
10.8k
  public func receiveEnd() {
116
10.8k
    self.interceptors.receive(.end)
117
10.8k
  }
118
119
  @inlinable
120
2.94k
  public func receiveError(_ error: Error) {
121
2.94k
    self.handleError(error)
122
2.94k
    self.finish()
123
2.94k
  }
124
125
  @inlinable
126
35.0k
  public func finish() {
127
35.0k
    switch self.state {
128
35.0k
    case .idle:
129
0
      self.interceptors = nil
130
0
      self.state = .completed
131
35.0k
132
35.0k
    case let .createdContext(context),
133
0
      let .invokedFunction(context):
134
0
      context.responsePromise.fail(GRPCStatus(code: .unavailable, message: nil))
135
0
      self.context.eventLoop.execute {
136
0
        self.interceptors = nil
137
0
      }
138
35.0k
139
35.0k
    case .completed:
140
35.0k
      self.interceptors = nil
141
35.0k
    }
142
35.0k
  }
143
144
  // MARK: - Interceptors to User Function
145
146
  @inlinable
147
133k
  internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
148
133k
    switch part {
149
133k
    case let .metadata(headers):
150
81.7k
      self.receiveInterceptedMetadata(headers)
151
133k
    case let .message(message):
152
26.2k
      self.receiveInterceptedMessage(message)
153
133k
    case .end:
154
25.4k
      self.receiveInterceptedEnd()
155
133k
    }
156
133k
  }
157
158
  @inlinable
159
81.7k
  internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
160
81.7k
    switch self.state {
161
81.7k
    case .idle:
162
81.7k
      // Make a context to invoke the user function with.
163
81.7k
      let context = UnaryResponseCallContext<Response>(
164
81.7k
        eventLoop: self.context.eventLoop,
165
81.7k
        headers: headers,
166
81.7k
        logger: self.context.logger,
167
81.7k
        userInfoRef: self.userInfoRef,
168
81.7k
        closeFuture: self.context.closeFuture
169
81.7k
      )
170
81.7k
171
81.7k
      // Move to the next state.
172
81.7k
      self.state = .createdContext(context)
173
81.7k
174
81.7k
      // Register a callback on the response future. The user function will complete this promise.
175
163k
      context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
$s4GRPC18UnaryServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOy5InputQzs5Error_pGcACyxq_Gcfu_
Line
Count
Source
175
81.7k
      context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
$s4GRPC18UnaryServerHandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOy5InputQzs5Error_pGcACyxq_Gcfu_yAMcfu0_
Line
Count
Source
175
81.7k
      context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
176
81.7k
177
81.7k
      // Send back response headers.
178
81.7k
      self.interceptors.send(.metadata([:]), promise: nil)
179
81.7k
180
81.7k
    case .createdContext, .invokedFunction:
181
0
      self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
182
81.7k
183
81.7k
    case .completed:
184
0
      // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
185
0
      // to an error or otherwise) and an interceptor doing some async work later emitting headers.
186
0
      // Dropping them is fine.
187
0
      ()
188
81.7k
    }
189
81.7k
  }
190
191
  @inlinable
192
26.2k
  internal func receiveInterceptedMessage(_ request: Request) {
193
26.2k
    switch self.state {
194
26.2k
    case .idle:
195
0
      self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
196
26.2k
197
26.2k
    case let .createdContext(context):
198
26.2k
      // Happy path: execute the function; complete the promise with the result.
199
26.2k
      self.state = .invokedFunction(context)
200
26.2k
      context.responsePromise.completeWith(self.userFunction(request, context))
201
26.2k
202
26.2k
    case .invokedFunction:
203
0
      // The function's already been invoked with a message.
204
0
      self.handleError(GRPCError.ProtocolViolation("Multiple messages received on unary RPC"))
205
26.2k
206
26.2k
    case .completed:
207
0
      // We received a message but we're already done: this may happen if we terminate the RPC
208
0
      // due to a channel error, for example.
209
0
      ()
210
26.2k
    }
211
26.2k
  }
212
213
  @inlinable
214
25.4k
  internal func receiveInterceptedEnd() {
215
25.4k
    switch self.state {
216
25.4k
    case .idle:
217
0
      self.handleError(GRPCError.ProtocolViolation("End received before headers"))
218
25.4k
219
25.4k
    case .createdContext:
220
25.4k
      self.handleError(GRPCError.ProtocolViolation("End received before message"))
221
25.4k
222
25.4k
    case .invokedFunction, .completed:
223
0
      ()
224
25.4k
    }
225
25.4k
  }
226
227
  // MARK: - User Function To Interceptors
228
229
  @inlinable
230
81.7k
  internal func userFunctionCompletedWithResult(_ result: Result<Response, Error>) {
231
81.7k
    switch self.state {
232
81.7k
    case .idle:
233
0
      // Invalid state: the user function can only complete if it was executed.
234
0
      preconditionFailure()
235
81.7k
236
81.7k
    // 'created' is allowed here: we may have to (and tear down) after receiving headers
237
81.7k
    // but before receiving a message.
238
81.7k
    case let .createdContext(context),
239
26.2k
      let .invokedFunction(context):
240
26.2k
241
26.2k
      switch result {
242
26.2k
      case let .success(response):
243
26.2k
        // Complete, as we're sending 'end'.
244
26.2k
        self.state = .completed
245
26.2k
246
26.2k
        // Compression depends on whether it's enabled on the server and the setting in the caller
247
26.2k
        // context.
248
26.2k
        let compress = self.context.encoding.isEnabled && context.compressionEnabled
249
26.2k
        let metadata = MessageMetadata(compress: compress, flush: false)
250
26.2k
        self.interceptors.send(.message(response, metadata), promise: nil)
251
26.2k
        self.interceptors.send(.end(context.responseStatus, context.trailers), promise: nil)
252
26.2k
253
26.2k
      case let .failure(error):
254
0
        self.handleError(error, thrownFromHandler: true)
255
81.7k
      }
256
81.7k
257
81.7k
    case .completed:
258
55.5k
      // We've already failed. Ignore this.
259
55.5k
      ()
260
81.7k
    }
261
81.7k
  }
262
263
  @inlinable
264
  internal func sendInterceptedPart(
265
    _ part: GRPCServerResponsePart<Response>,
266
    promise: EventLoopPromise<Void>?
267
189k
  ) {
268
189k
    switch part {
269
189k
    case let .metadata(headers):
270
81.7k
      // We can delay this flush until the end of the RPC.
271
81.7k
      self.context.responseWriter.sendMetadata(headers, flush: false, promise: promise)
272
189k
273
189k
    case let .message(message, metadata):
274
26.2k
      do {
275
26.2k
        let bytes = try self.serializer.serialize(message, allocator: self.context.allocator)
276
26.2k
        self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
277
26.2k
      } catch {
278
0
        // Serialization failed: fail the promise and send end.
279
0
        promise?.fail(error)
280
0
        let (status, trailers) = ServerErrorProcessor.processLibraryError(
281
0
          error,
282
0
          delegate: self.context.errorDelegate
283
0
        )
284
0
        // Loop back via the interceptors.
285
0
        self.interceptors.send(.end(status, trailers), promise: nil)
286
189k
      }
287
189k
288
189k
    case let .end(status, trailers):
289
81.7k
      self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
290
189k
    }
291
189k
  }
292
293
  @inlinable
294
58.0k
  internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
295
58.0k
    switch self.state {
296
58.0k
    case .idle:
297
0
      assert(!isHandlerError)
298
0
      self.state = .completed
299
0
      // We don't have a promise to fail. Just send back end.
300
0
      let (status, trailers) = ServerErrorProcessor.processLibraryError(
301
0
        error,
302
0
        delegate: self.context.errorDelegate
303
0
      )
304
0
      self.interceptors.send(.end(status, trailers), promise: nil)
305
58.0k
306
58.0k
    case let .createdContext(context),
307
55.5k
      let .invokedFunction(context):
308
55.5k
      // We don't have a promise to fail. Just send back end.
309
55.5k
      self.state = .completed
310
55.5k
311
55.5k
      let status: GRPCStatus
312
55.5k
      let trailers: HPACKHeaders
313
55.5k
314
55.5k
      if isHandlerError {
315
0
        (status, trailers) = ServerErrorProcessor.processObserverError(
316
0
          error,
317
0
          headers: context.headers,
318
0
          trailers: context.trailers,
319
0
          delegate: self.context.errorDelegate
320
0
        )
321
55.5k
      } else {
322
55.5k
        (status, trailers) = ServerErrorProcessor.processLibraryError(
323
55.5k
          error,
324
55.5k
          delegate: self.context.errorDelegate
325
55.5k
        )
326
55.5k
      }
327
55.5k
328
55.5k
      self.interceptors.send(.end(status, trailers), promise: nil)
329
55.5k
      // We're already in the 'completed' state so failing the promise will be a no-op in the
330
55.5k
      // callback to 'userFunctionCompletedWithResult' (but we also need to avoid leaking the
331
55.5k
      // promise.)
332
55.5k
      context.responsePromise.fail(error)
333
58.0k
334
58.0k
    case .completed:
335
2.50k
      ()
336
58.0k
    }
337
58.0k
  }
338
}