Coverage Report

Created: 2025-09-08 07:07

/src/grpc-swift/Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOHPACK
18
19
public final class ServerStreamingServerHandler<
20
  Serializer: MessageSerializer,
21
  Deserializer: MessageDeserializer
22
>: GRPCServerHandlerProtocol {
23
  public typealias Request = Deserializer.Output
24
  public typealias Response = Serializer.Input
25
26
  /// A response serializer.
27
  @usableFromInline
28
  internal let serializer: Serializer
29
30
  /// A request deserializer.
31
  @usableFromInline
32
  internal let deserializer: Deserializer
33
34
  /// A pipeline of user provided interceptors.
35
  @usableFromInline
36
  internal var interceptors: ServerInterceptorPipeline<Request, Response>!
37
38
  /// The context required in order create the function.
39
  @usableFromInline
40
  internal let context: CallHandlerContext
41
42
  /// A reference to a `UserInfo`.
43
  @usableFromInline
44
  internal let userInfoRef: Ref<UserInfo>
45
46
  /// The user provided function to execute.
47
  @usableFromInline
48
  internal let userFunction:
49
    (Request, StreamingResponseCallContext<Response>)
50
      -> EventLoopFuture<GRPCStatus>
51
52
  /// The state of the handler.
53
  @usableFromInline
54
89.2k
  internal var state: State = .idle
55
56
  @usableFromInline
57
  internal enum State {
58
    // Initial state. Nothing has happened yet.
59
    case idle
60
    // Headers have been received and now we're holding a context with which to invoke the user
61
    // function when we receive a message.
62
    case createdContext(_StreamingResponseCallContext<Request, Response>)
63
    // The user function has been invoked, we're waiting for the status promise to be completed.
64
    case invokedFunction(_StreamingResponseCallContext<Request, Response>)
65
    // The function has completed or we are no longer proceeding with execution (because of an error
66
    // or unexpected closure).
67
    case completed
68
  }
69
70
  @inlinable
71
  public init(
72
    context: CallHandlerContext,
73
    requestDeserializer: Deserializer,
74
    responseSerializer: Serializer,
75
    interceptors: [ServerInterceptor<Request, Response>],
76
    userFunction: @escaping (Request, StreamingResponseCallContext<Response>)
77
      -> EventLoopFuture<GRPCStatus>
78
42.0k
  ) {
79
42.0k
    self.serializer = responseSerializer
80
42.0k
    self.deserializer = requestDeserializer
81
42.0k
    self.context = context
82
42.0k
    self.userFunction = userFunction
83
42.0k
84
42.0k
    let userInfoRef = Ref(UserInfo())
85
42.0k
    self.userInfoRef = userInfoRef
86
42.0k
    self.interceptors = ServerInterceptorPipeline(
87
42.0k
      logger: context.logger,
88
42.0k
      eventLoop: context.eventLoop,
89
42.0k
      path: context.path,
90
42.0k
      callType: .serverStreaming,
91
42.0k
      remoteAddress: context.remoteAddress,
92
42.0k
      userInfoRef: userInfoRef,
93
42.0k
      closeFuture: context.closeFuture,
94
42.0k
      interceptors: interceptors,
95
113k
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_
Line
Count
Source
95
42.0k
      onRequestPart: self.receiveInterceptedPart(_:),
$s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA21GRPCServerRequestPartOyAOGcAIcfu_yA3_cfu0_
Line
Count
Source
95
71.1k
      onRequestPart: self.receiveInterceptedPart(_:),
96
374k
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA010GRPCServerW4PartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_
Line
Count
Source
96
42.0k
      onResponsePart: self.sendInterceptedPart(_:promise:)
$s4GRPC015ServerStreamingB7HandlerC7context19requestDeserializer18responseSerializer12interceptors12userFunctionACyxq_GAA04CallD7ContextV_q_xSayAA0B11InterceptorCy6OutputQy_5InputQzGG7NIOCore15EventLoopFutureCyAA10GRPCStatusVGAO_AA0c8ResponsemN0CyAQGtctcfcyAA010GRPCServerW4PartOyAQG_AT0sT7PromiseVyytGSgtcAIcfu1_yA3__A7_tcfu2_
Line
Count
Source
96
332k
      onResponsePart: self.sendInterceptedPart(_:promise:)
97
42.0k
    )
98
42.0k
  }
99
100
  // MARK: Public API; gRPC to Handler
101
102
  @inlinable
103
13.7k
  public func receiveMetadata(_ headers: HPACKHeaders) {
104
13.7k
    self.interceptors.receive(.metadata(headers))
105
13.7k
  }
106
107
  @inlinable
108
8.83k
  public func receiveMessage(_ bytes: ByteBuffer) {
109
8.83k
    do {
110
8.83k
      let message = try self.deserializer.deserialize(byteBuffer: bytes)
111
8.83k
      self.interceptors.receive(.message(message))
112
8.83k
    } catch {
113
2.66k
      self.handleError(error)
114
8.83k
    }
115
8.83k
  }
116
117
  @inlinable
118
3.61k
  public func receiveEnd() {
119
3.61k
    self.interceptors.receive(.end)
120
3.61k
  }
121
122
  @inlinable
123
1.68k
  public func receiveError(_ error: Error) {
124
1.68k
    self.handleError(error)
125
1.68k
    self.finish()
126
1.68k
  }
127
128
  @inlinable
129
15.4k
  public func finish() {
130
15.4k
    switch self.state {
131
15.4k
    case .idle:
132
0
      self.interceptors = nil
133
0
      self.state = .completed
134
15.4k
135
15.4k
    case let .createdContext(context),
136
0
      let .invokedFunction(context):
137
0
      context.statusPromise.fail(GRPCStatus(code: .unavailable, message: nil))
138
0
      self.context.eventLoop.execute {
139
0
        self.interceptors = nil
140
0
      }
141
15.4k
142
15.4k
    case .completed:
143
15.4k
      self.interceptors = nil
144
15.4k
    }
145
15.4k
  }
146
147
  // MARK: - Interceptors to User Function
148
149
  @inlinable
150
71.1k
  internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
151
71.1k
    switch part {
152
71.1k
    case let .metadata(headers):
153
42.0k
      self.receiveInterceptedMetadata(headers)
154
71.1k
    case let .message(message):
155
17.4k
      self.receiveInterceptedMessage(message)
156
71.1k
    case .end:
157
11.5k
      self.receiveInterceptedEnd()
158
71.1k
    }
159
71.1k
  }
160
161
  @inlinable
162
42.0k
  internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
163
42.0k
    switch self.state {
164
42.0k
    case .idle:
165
42.0k
      // Make a context to invoke the observer block factory with.
166
42.0k
      let context = _StreamingResponseCallContext<Request, Response>(
167
42.0k
        eventLoop: self.context.eventLoop,
168
42.0k
        headers: headers,
169
42.0k
        logger: self.context.logger,
170
42.0k
        userInfoRef: self.userInfoRef,
171
42.0k
        compressionIsEnabled: self.context.encoding.isEnabled,
172
42.0k
        closeFuture: self.context.closeFuture,
173
289k
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageG0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_
Line
Count
Source
173
42.0k
        sendResponse: self.interceptResponse(_:metadata:promise:)
$s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFy5InputQz_AA07MessageG0V7NIOCore16EventLoopPromiseVyytGSgtcACyxq_Gcfu_yAI_AkPtcfu0_
Line
Count
Source
173
247k
        sendResponse: self.interceptResponse(_:metadata:promise:)
174
42.0k
      )
175
42.0k
176
42.0k
      // Move to the next state.
177
42.0k
      self.state = .createdContext(context)
178
42.0k
179
42.0k
      // Register a callback on the status future.
180
84.1k
      context.statusPromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
$s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_
Line
Count
Source
180
42.0k
      context.statusPromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
$s4GRPC015ServerStreamingB7HandlerC26receiveInterceptedMetadatayy8NIOHPACK12HPACKHeadersVFys6ResultOyAA10GRPCStatusVs5Error_pGcACyxq_Gcfu1_yAMcfu2_
Line
Count
Source
180
42.0k
      context.statusPromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
181
42.0k
182
42.0k
      // Send response headers back via the interceptors.
183
42.0k
      self.interceptors.send(.metadata([:]), promise: nil)
184
42.0k
185
42.0k
    case .createdContext, .invokedFunction:
186
0
      self.handleError(GRPCError.InvalidState("Protocol violation: already received headers"))
187
42.0k
188
42.0k
    case .completed:
189
0
      // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
190
0
      // to an error or otherwise) and an interceptor doing some async work later emitting headers.
191
0
      // Dropping them is fine.
192
0
      ()
193
42.0k
    }
194
42.0k
  }
195
196
  @inlinable
197
17.4k
  internal func receiveInterceptedMessage(_ request: Request) {
198
17.4k
    switch self.state {
199
17.4k
    case .idle:
200
0
      self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
201
17.4k
202
17.4k
    case let .createdContext(context):
203
17.4k
      self.state = .invokedFunction(context)
204
17.4k
      // Complete the status promise with the function outcome.
205
17.4k
      context.statusPromise.completeWith(self.userFunction(request, context))
206
17.4k
207
17.4k
    case .invokedFunction:
208
0
      let error = GRPCError.ProtocolViolation("Multiple messages received on server streaming RPC")
209
0
      self.handleError(error)
210
17.4k
211
17.4k
    case .completed:
212
0
      // We received a message but we're already done: this may happen if we terminate the RPC
213
0
      // due to a channel error, for example.
214
0
      ()
215
17.4k
    }
216
17.4k
  }
217
218
  @inlinable
219
11.5k
  internal func receiveInterceptedEnd() {
220
11.5k
    switch self.state {
221
11.5k
    case .idle:
222
0
      self.handleError(GRPCError.ProtocolViolation("End received before headers"))
223
11.5k
224
11.5k
    case .createdContext:
225
11.5k
      self.handleError(GRPCError.ProtocolViolation("End received before message"))
226
11.5k
227
11.5k
    case .invokedFunction, .completed:
228
0
      ()
229
11.5k
    }
230
11.5k
  }
231
232
  // MARK: - User Function To Interceptors
233
234
  @inlinable
235
  internal func interceptResponse(
236
    _ response: Response,
237
    metadata: MessageMetadata,
238
    promise: EventLoopPromise<Void>?
239
247k
  ) {
240
247k
    switch self.state {
241
247k
    case .idle:
242
0
      // The observer block can't send responses if it doesn't exist.
243
0
      preconditionFailure()
244
247k
245
247k
    case .createdContext, .invokedFunction:
246
247k
      // The user has access to the response context before returning a future observer,
247
247k
      // so 'createdContext' is valid here (if a little strange).
248
247k
      self.interceptors.send(.message(response, metadata), promise: promise)
249
247k
250
247k
    case .completed:
251
0
      promise?.fail(GRPCError.AlreadyComplete())
252
247k
    }
253
247k
  }
254
255
  @inlinable
256
42.0k
  internal func userFunctionCompletedWithResult(_ result: Result<GRPCStatus, Error>) {
257
42.0k
    switch self.state {
258
42.0k
    case .idle:
259
0
      // Invalid state: the user function can only completed if it was created.
260
0
      preconditionFailure()
261
42.0k
262
42.0k
    case let .createdContext(context),
263
17.4k
      let .invokedFunction(context):
264
17.4k
265
17.4k
      switch result {
266
17.4k
      case let .success(status):
267
17.4k
        // We're sending end back, we're done.
268
17.4k
        self.state = .completed
269
17.4k
        self.interceptors.send(.end(status, context.trailers), promise: nil)
270
17.4k
271
17.4k
      case let .failure(error):
272
0
        self.handleError(error, thrownFromHandler: true)
273
42.0k
      }
274
42.0k
275
42.0k
    case .completed:
276
24.5k
      // We've already completed. Ignore this.
277
24.5k
      ()
278
42.0k
    }
279
42.0k
  }
280
281
  @inlinable
282
  internal func sendInterceptedPart(
283
    _ part: GRPCServerResponsePart<Response>,
284
    promise: EventLoopPromise<Void>?
285
332k
  ) {
286
332k
    switch part {
287
332k
    case let .metadata(headers):
288
42.0k
      self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)
289
332k
290
332k
    case let .message(message, metadata):
291
247k
      do {
292
247k
        let bytes = try self.serializer.serialize(message, allocator: self.context.allocator)
293
247k
        self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
294
247k
      } catch {
295
0
        // Serialization failed: fail the promise and send end.
296
0
        promise?.fail(error)
297
0
        let (status, trailers) = ServerErrorProcessor.processLibraryError(
298
0
          error,
299
0
          delegate: self.context.errorDelegate
300
0
        )
301
0
        // Loop back via the interceptors.
302
0
        self.interceptors.send(.end(status, trailers), promise: nil)
303
332k
      }
304
332k
305
332k
    case let .end(status, trailers):
306
42.0k
      self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
307
332k
    }
308
332k
  }
309
310
  @inlinable
311
25.4k
  internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
312
25.4k
    switch self.state {
313
25.4k
    case .idle:
314
0
      assert(!isHandlerError)
315
0
      self.state = .completed
316
0
      // We don't have a promise to fail. Just send back end.
317
0
      let (status, trailers) = ServerErrorProcessor.processLibraryError(
318
0
        error,
319
0
        delegate: self.context.errorDelegate
320
0
      )
321
0
      self.interceptors.send(.end(status, trailers), promise: nil)
322
25.4k
323
25.4k
    case let .createdContext(context),
324
24.5k
      let .invokedFunction(context):
325
24.5k
      // We don't have a promise to fail. Just send back end.
326
24.5k
      self.state = .completed
327
24.5k
328
24.5k
      let status: GRPCStatus
329
24.5k
      let trailers: HPACKHeaders
330
24.5k
331
24.5k
      if isHandlerError {
332
0
        (status, trailers) = ServerErrorProcessor.processObserverError(
333
0
          error,
334
0
          headers: context.headers,
335
0
          trailers: context.trailers,
336
0
          delegate: self.context.errorDelegate
337
0
        )
338
24.5k
      } else {
339
24.5k
        (status, trailers) = ServerErrorProcessor.processLibraryError(
340
24.5k
          error,
341
24.5k
          delegate: self.context.errorDelegate
342
24.5k
        )
343
24.5k
      }
344
24.5k
345
24.5k
      self.interceptors.send(.end(status, trailers), promise: nil)
346
24.5k
      // We're already in the 'completed' state so failing the promise will be a no-op in the
347
24.5k
      // callback to 'userFunctionCompletedWithResult' (but we also need to avoid leaking the
348
24.5k
      // promise.)
349
24.5k
      context.statusPromise.fail(error)
350
25.4k
351
25.4k
    case .completed:
352
897
      ()
353
25.4k
    }
354
25.4k
  }
355
}