Coverage Report

Created: 2026-02-11 07:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/GRPCClientStateMachine.swift
Line
Count
Source
1
/*
2
 * Copyright 2019, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Foundation
17
import Logging
18
import NIOCore
19
import NIOHPACK
20
import NIOHTTP1
21
import SwiftProtobuf
22
23
enum ReceiveResponseHeadError: Error, Equatable {
24
  /// The 'content-type' header was missing or the value is not supported by this implementation.
25
  case invalidContentType(String?)
26
27
  /// The HTTP response status from the server was not 200 OK.
28
  case invalidHTTPStatus(String?)
29
30
  /// The encoding used by the server is not supported.
31
  case unsupportedMessageEncoding(String)
32
33
  /// An invalid state was encountered. This is a serious implementation error.
34
  case invalidState
35
}
36
37
enum ReceiveEndOfResponseStreamError: Error, Equatable {
38
  /// The 'content-type' header was missing or the value is not supported by this implementation.
39
  case invalidContentType(String?)
40
41
  /// The HTTP response status from the server was not 200 OK.
42
  case invalidHTTPStatus(String?)
43
44
  /// An invalid state was encountered. This is a serious implementation error.
45
  case invalidState
46
}
47
48
enum SendRequestHeadersError: Error {
49
  /// An invalid state was encountered. This is a serious implementation error.
50
  case invalidState
51
}
52
53
enum SendEndOfRequestStreamError: Error {
54
  /// The request stream has already been closed. This may happen if the RPC was cancelled, timed
55
  /// out, the server terminated the RPC, or the user explicitly closed the stream multiple times.
56
  case alreadyClosed
57
58
  /// An invalid state was encountered. This is a serious implementation error.
59
  case invalidState
60
}
61
62
/// A state machine for a single gRPC call from the perspective of a client.
63
///
64
/// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
65
struct GRPCClientStateMachine {
66
  /// The combined state of the request (client) and response (server) streams for an RPC call.
67
  ///
68
  /// The following states are not possible:
69
  /// - `.clientIdleServerActive`: The client must initiate the call before the server moves
70
  ///   from the idle state.
71
  /// - `.clientIdleServerClosed`: The client must initiate the call before the server moves from
72
  ///   the idle state.
73
  /// - `.clientActiveServerClosed`: The client may not stream if the server is closed.
74
  ///
75
  /// Note: when a peer (client or server) state is "active" it means that messages _may_ be sent or
76
  /// received. That is, the headers for the stream have been processed by the state machine and
77
  /// end-of-stream has not yet been processed. A stream may expect any number of messages (i.e. up
78
  /// to one for a unary call and many for a streaming call).
79
  enum State {
80
    /// Initial state. Neither request stream nor response stream have been initiated. Holds the
81
    /// pending write state for the request stream and arity for the response stream, respectively.
82
    ///
83
    /// Valid transitions:
84
    /// - `clientActiveServerIdle`: if the client initiates the RPC,
85
    /// - `clientClosedServerClosed`: if the client terminates the RPC.
86
    case clientIdleServerIdle(pendingWriteState: PendingWriteState, readArity: MessageArity)
87
88
    /// The client has initiated an RPC and has not received initial metadata from the server. Holds
89
    /// the writing state for request stream and arity for the response stream.
90
    ///
91
    /// Valid transitions:
92
    /// - `clientActiveServerActive`: if the server acknowledges the RPC initiation,
93
    /// - `clientClosedServerIdle`: if the client closes the request stream,
94
    /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
95
    ///      RPC with a "trailers-only" response.
96
    case clientActiveServerIdle(writeState: WriteState, pendingReadState: PendingReadState)
97
98
    /// The client has indicated to the server that it has finished sending requests. The server
99
    /// has not yet sent response headers for the RPC. Holds the response stream arity.
100
    ///
101
    /// Valid transitions:
102
    /// - `clientClosedServerActive`: if the server acknowledges the RPC initiation,
103
    /// - `clientClosedServerClosed`: if the client terminates the RPC or the server terminates the
104
    ///      RPC with a "trailers-only" response.
105
    case clientClosedServerIdle(pendingReadState: PendingReadState)
106
107
    /// The client has initiated the RPC and the server has acknowledged it. Messages may have been
108
    /// sent and/or received. Holds the request stream write state and response stream read state.
109
    ///
110
    /// Valid transitions:
111
    /// - `clientClosedServerActive`: if the client closes the request stream,
112
    /// - `clientClosedServerClosed`: if the client or server terminates the RPC.
113
    case clientActiveServerActive(writeState: WriteState, readState: ReadState)
114
115
    /// The client has indicated to the server that it has finished sending requests. The server
116
    /// has acknowledged the RPC. Holds the response stream read state.
117
    ///
118
    /// Valid transitions:
119
    /// - `clientClosedServerClosed`: if the client or server terminate the RPC.
120
    case clientClosedServerActive(readState: ReadState)
121
122
    /// The RPC has terminated. There are no valid transitions from this state.
123
    case clientClosedServerClosed
124
125
    /// This isn't a real state. See `withStateAvoidingCoWs`.
126
    case modifying
127
  }
128
129
  /// The current state of the state machine.
130
  internal private(set) var state: State
131
132
  /// The default user-agent string.
133
  private static let userAgent = "grpc-swift-nio/\(Version.versionString)"
134
135
  /// Creates a state machine representing a gRPC client's request and response stream state.
136
  ///
137
  /// - Parameter requestArity: The expected number of messages on the request stream.
138
  /// - Parameter responseArity: The expected number of messages on the response stream.
139
0
  init(requestArity: MessageArity, responseArity: MessageArity) {
140
0
    self.state = .clientIdleServerIdle(
141
0
      pendingWriteState: .init(arity: requestArity, contentType: .protobuf),
142
0
      readArity: responseArity
143
0
    )
144
0
  }
145
146
  /// Creates a state machine representing a gRPC client's request and response stream state.
147
  ///
148
  /// - Parameter state: The initial state of the state machine.
149
0
  init(state: State) {
150
0
    self.state = state
151
0
  }
152
153
  /// Initiates an RPC.
154
  ///
155
  /// The only valid state transition is:
156
  /// - `.clientIdleServerIdle` → `.clientActiveServerIdle`
157
  ///
158
  /// All other states will result in an `.invalidState` error.
159
  ///
160
  /// On success the state will transition to `.clientActiveServerIdle`.
161
  ///
162
  /// - Parameter requestHead: The client request head for the RPC.
163
  mutating func sendRequestHeaders(
164
    requestHead: _GRPCRequestHead,
165
    allocator: ByteBufferAllocator
166
0
  ) -> Result<HPACKHeaders, SendRequestHeadersError> {
167
0
    return self.withStateAvoidingCoWs { state in
168
0
      state.sendRequestHeaders(requestHead: requestHead, allocator: allocator)
169
0
    }
170
0
  }
171
172
  /// Formats a request to send to the server.
173
  ///
174
  /// The client must be streaming in order for this to return successfully. Therefore the valid
175
  /// state transitions are:
176
  /// - `.clientActiveServerIdle` → `.clientActiveServerIdle`
177
  /// - `.clientActiveServerActive` → `.clientActiveServerActive`
178
  ///
179
  /// The client should not attempt to send requests once the request stream is closed, that is
180
  /// from one of the following states:
181
  /// - `.clientClosedServerIdle`
182
  /// - `.clientClosedServerActive`
183
  /// - `.clientClosedServerClosed`
184
  /// Doing so will result in a `.cardinalityViolation`.
185
  ///
186
  /// Sending a message when both peers are idle (in the `.clientIdleServerIdle` state) will result
187
  /// in a `.invalidState` error.
188
  ///
189
  /// - Parameter message: The serialized request to send to the server.
190
  /// - Parameter compressed: Whether the request should be compressed.
191
  /// - Parameter allocator: A `ByteBufferAllocator` to allocate the buffer into which the encoded
192
  ///     request will be written.
193
  mutating func sendRequest(
194
    _ message: ByteBuffer,
195
    compressed: Bool,
196
    promise: EventLoopPromise<Void>? = nil
197
0
  ) -> Result<Void, MessageWriteError> {
198
0
    return self.withStateAvoidingCoWs { state in
199
0
      state.sendRequest(message, compressed: compressed, promise: promise)
200
0
    }
201
0
  }
202
203
0
  mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
204
0
    return self.state.nextRequest()
205
0
  }
206
207
  /// Closes the request stream.
208
  ///
209
  /// The client must be streaming requests in order to terminate the request stream. Valid
210
  /// states transitions are:
211
  /// - `.clientActiveServerIdle` → `.clientClosedServerIdle`
212
  /// - `.clientActiveServerActive` → `.clientClosedServerActive`
213
  ///
214
  /// The client should not attempt to close the request stream if it is already closed, that is
215
  /// from one of the following states:
216
  /// - `.clientClosedServerIdle`
217
  /// - `.clientClosedServerActive`
218
  /// - `.clientClosedServerClosed`
219
  /// Doing so will result in an `.alreadyClosed` error.
220
  ///
221
  /// Closing the request stream when both peers are idle (in the `.clientIdleServerIdle` state)
222
  /// will result in a `.invalidState` error.
223
0
  mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
224
0
    return self.withStateAvoidingCoWs { state in
225
0
      state.sendEndOfRequestStream()
226
0
    }
227
0
  }
228
229
  /// Receive an acknowledgement of the RPC from the server. This **must not** be a "Trailers-Only"
230
  /// response.
231
  ///
232
  /// The server must be idle in order to receive response headers. The valid state transitions are:
233
  /// - `.clientActiveServerIdle` → `.clientActiveServerActive`
234
  /// - `.clientClosedServerIdle` → `.clientClosedServerActive`
235
  ///
236
  /// The response head will be parsed and validated against the gRPC specification. The following
237
  /// errors may be returned:
238
  /// - `.invalidHTTPStatus` if the status was not "200",
239
  /// - `.invalidContentType` if the "content-type" header does not start with "application/grpc",
240
  /// - `.unsupportedMessageEncoding` if the "grpc-encoding" header is not supported.
241
  ///
242
  /// It is not possible to receive response headers from the following states:
243
  /// - `.clientIdleServerIdle`
244
  /// - `.clientActiveServerActive`
245
  /// - `.clientClosedServerActive`
246
  /// - `.clientClosedServerClosed`
247
  /// Doing so will result in a `.invalidState` error.
248
  ///
249
  /// - Parameter headers: The headers received from the server.
250
  mutating func receiveResponseHeaders(
251
    _ headers: HPACKHeaders
252
0
  ) -> Result<Void, ReceiveResponseHeadError> {
253
0
    return self.withStateAvoidingCoWs { state in
254
0
      state.receiveResponseHeaders(headers)
255
0
    }
256
0
  }
257
258
  /// Read a response buffer from the server and return any decoded messages.
259
  ///
260
  /// If the response stream has an expected count of `.one` then this function is guaranteed to
261
  /// produce *at most* one `Response` in the `Result`.
262
  ///
263
  /// To receive a response buffer the server must be streaming. Valid states are:
264
  /// - `.clientClosedServerActive` → `.clientClosedServerActive`
265
  /// - `.clientActiveServerActive` → `.clientActiveServerActive`
266
  ///
267
  /// This function will read all of the bytes in the `buffer` and attempt to produce as many
268
  /// messages as possible. This may lead to a number of errors:
269
  /// - `.cardinalityViolation` if more than one message is received when the state reader is
270
  ///   expects at most one.
271
  /// - `.leftOverBytes` if bytes remain in the buffer after reading one message when at most one
272
  ///   message is expected.
273
  /// - `.deserializationFailed` if the message could not be deserialized.
274
  ///
275
  /// It is not possible to receive response headers from the following states:
276
  /// - `.clientIdleServerIdle`
277
  /// - `.clientClosedServerActive`
278
  /// - `.clientActiveServerActive`
279
  /// - `.clientClosedServerClosed`
280
  /// Doing so will result in a `.invalidState` error.
281
  ///
282
  /// - Parameter buffer: A buffer of bytes received from the server.
283
  mutating func receiveResponseBuffer(
284
    _ buffer: inout ByteBuffer,
285
    maxMessageLength: Int
286
0
  ) -> Result<[ByteBuffer], MessageReadError> {
287
0
    return self.withStateAvoidingCoWs { state in
288
0
      state.receiveResponseBuffer(&buffer, maxMessageLength: maxMessageLength)
289
0
    }
290
0
  }
291
292
  /// Receive the end of the response stream from the server and parse the results into
293
  /// a `GRPCStatus`.
294
  ///
295
  /// To close the response stream the server must be streaming or idle (since the server may choose
296
  /// to 'fast fail' the RPC). Valid states are:
297
  /// - `.clientActiveServerIdle` → `.clientClosedServerClosed`
298
  /// - `.clientActiveServerActive` → `.clientClosedServerClosed`
299
  /// - `.clientClosedServerIdle` → `.clientClosedServerClosed`
300
  /// - `.clientClosedServerActive` → `.clientClosedServerClosed`
301
  ///
302
  /// It is not possible to receive an end-of-stream if the RPC has not been initiated or has
303
  /// already been terminated. That is, in one of the following states:
304
  /// - `.clientIdleServerIdle`
305
  /// - `.clientClosedServerClosed`
306
  /// Doing so will result in a `.invalidState` error.
307
  ///
308
  /// - Parameter trailers: The trailers to parse.
309
  mutating func receiveEndOfResponseStream(
310
    _ trailers: HPACKHeaders
311
0
  ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
312
0
    return self.withStateAvoidingCoWs { state in
313
0
      state.receiveEndOfResponseStream(trailers)
314
0
    }
315
0
  }
316
317
  /// Receive a DATA frame with the end stream flag set. Determines whether it is safe for the
318
  /// caller to ignore the end stream flag or whether a synthesised status should be forwarded.
319
  ///
320
  /// Receiving a DATA frame with the end stream flag set is unexpected: the specification dictates
321
  /// that an RPC should be ended by the server sending the client a HEADERS frame with end stream
322
  /// set. However, we will tolerate end stream on a DATA frame if we believe the RPC has already
323
  /// completed (i.e. we are in the 'clientClosedServerClosed' state). In cases where we don't
324
  /// expect end of stream on a DATA frame we will emit a status with a message explaining
325
  /// the protocol violation.
326
0
  mutating func receiveEndOfResponseStream() -> GRPCStatus? {
327
0
    return self.withStateAvoidingCoWs { state in
328
0
      state.receiveEndOfResponseStream()
329
0
    }
330
0
  }
331
332
  /// Temporarily sets `self.state` to `.modifying` before calling the provided block and setting
333
  /// `self.state` to the `State` modified by the block.
334
  ///
335
  /// Since we hold state as associated data on our `State` enum, any modification to that state
336
  /// will trigger a copy on write for its heap allocated data. Temporarily setting the `self.state`
337
  /// to `.modifying` allows us to avoid an extra reference to any heap allocated data and therefore
338
  /// avoid a copy on write.
339
  @inline(__always)
340
  private mutating func withStateAvoidingCoWs<ResultType>(
341
    _ body: (inout State) -> ResultType
342
0
  ) -> ResultType {
343
0
    var state = State.modifying
344
0
    swap(&self.state, &state)
345
0
    defer {
346
0
      swap(&self.state, &state)
347
0
    }
348
0
    return body(&state)
349
0
  }
350
}
351
352
extension GRPCClientStateMachine.State {
353
  /// See `GRPCClientStateMachine.sendRequestHeaders(requestHead:)`.
354
  mutating func sendRequestHeaders(
355
    requestHead: _GRPCRequestHead,
356
    allocator: ByteBufferAllocator
357
0
  ) -> Result<HPACKHeaders, SendRequestHeadersError> {
358
0
    let result: Result<HPACKHeaders, SendRequestHeadersError>
359
0
360
0
    switch self {
361
0
    case let .clientIdleServerIdle(pendingWriteState, responseArity):
362
0
      let headers = self.makeRequestHeaders(
363
0
        method: requestHead.method,
364
0
        scheme: requestHead.scheme,
365
0
        host: requestHead.host,
366
0
        path: requestHead.path,
367
0
        timeout: GRPCTimeout(deadline: requestHead.deadline),
368
0
        customMetadata: requestHead.customMetadata,
369
0
        compression: requestHead.encoding
370
0
      )
371
0
      result = .success(headers)
372
0
373
0
      self = .clientActiveServerIdle(
374
0
        writeState: pendingWriteState.makeWriteState(
375
0
          messageEncoding: requestHead.encoding,
376
0
          allocator: allocator
377
0
        ),
378
0
        pendingReadState: .init(arity: responseArity, messageEncoding: requestHead.encoding)
379
0
      )
380
0
381
0
    case .clientActiveServerIdle,
382
0
      .clientClosedServerIdle,
383
0
      .clientClosedServerActive,
384
0
      .clientActiveServerActive,
385
0
      .clientClosedServerClosed:
386
0
      result = .failure(.invalidState)
387
0
388
0
    case .modifying:
389
0
      preconditionFailure("State left as 'modifying'")
390
0
    }
391
0
392
0
    return result
393
0
  }
394
395
  /// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
396
  mutating func sendRequest(
397
    _ message: ByteBuffer,
398
    compressed: Bool,
399
    promise: EventLoopPromise<Void>?
400
0
  ) -> Result<Void, MessageWriteError> {
401
0
    let result: Result<Void, MessageWriteError>
402
0
403
0
    switch self {
404
0
    case .clientActiveServerIdle(var writeState, let pendingReadState):
405
0
      let result = writeState.write(message, compressed: compressed, promise: promise)
406
0
      self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
407
0
      return result
408
0
409
0
    case .clientActiveServerActive(var writeState, let readState):
410
0
      let result = writeState.write(message, compressed: compressed, promise: promise)
411
0
      self = .clientActiveServerActive(writeState: writeState, readState: readState)
412
0
      return result
413
0
414
0
    case .clientClosedServerIdle,
415
0
      .clientClosedServerActive,
416
0
      .clientClosedServerClosed:
417
0
      result = .failure(.cardinalityViolation)
418
0
419
0
    case .clientIdleServerIdle:
420
0
      result = .failure(.invalidState)
421
0
422
0
    case .modifying:
423
0
      preconditionFailure("State left as 'modifying'")
424
0
    }
425
0
426
0
    return result
427
0
  }
428
429
0
  mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
430
0
    switch self {
431
0
    case .clientActiveServerIdle(var writeState, let pendingReadState):
432
0
      self = .modifying
433
0
      let result = writeState.next()
434
0
      self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
435
0
      return result
436
0
437
0
    case .clientActiveServerActive(var writeState, let readState):
438
0
      self = .modifying
439
0
      let result = writeState.next()
440
0
      self = .clientActiveServerActive(writeState: writeState, readState: readState)
441
0
      return result
442
0
443
0
    case .clientIdleServerIdle,
444
0
      .clientClosedServerIdle,
445
0
      .clientClosedServerActive,
446
0
      .clientClosedServerClosed:
447
0
      return nil
448
0
449
0
    case .modifying:
450
0
      preconditionFailure("State left as 'modifying'")
451
0
    }
452
0
  }
453
454
  /// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
455
0
  mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
456
0
    let result: Result<Void, SendEndOfRequestStreamError>
457
0
458
0
    switch self {
459
0
    case let .clientActiveServerIdle(_, pendingReadState):
460
0
      result = .success(())
461
0
      self = .clientClosedServerIdle(pendingReadState: pendingReadState)
462
0
463
0
    case let .clientActiveServerActive(_, readState):
464
0
      result = .success(())
465
0
      self = .clientClosedServerActive(readState: readState)
466
0
467
0
    case .clientClosedServerIdle,
468
0
      .clientClosedServerActive,
469
0
      .clientClosedServerClosed:
470
0
      result = .failure(.alreadyClosed)
471
0
472
0
    case .clientIdleServerIdle:
473
0
      result = .failure(.invalidState)
474
0
475
0
    case .modifying:
476
0
      preconditionFailure("State left as 'modifying'")
477
0
    }
478
0
479
0
    return result
480
0
  }
481
482
  /// See `GRPCClientStateMachine.receiveResponseHeaders(_:)`.
483
  mutating func receiveResponseHeaders(
484
    _ headers: HPACKHeaders
485
0
  ) -> Result<Void, ReceiveResponseHeadError> {
486
0
    let result: Result<Void, ReceiveResponseHeadError>
487
0
488
0
    switch self {
489
0
    case let .clientActiveServerIdle(writeState, pendingReadState):
490
0
      result = self.parseResponseHeaders(headers, pendingReadState: pendingReadState)
491
0
        .map { readState in
492
0
          self = .clientActiveServerActive(writeState: writeState, readState: readState)
493
0
        }
494
0
495
0
    case let .clientClosedServerIdle(pendingReadState):
496
0
      result = self.parseResponseHeaders(headers, pendingReadState: pendingReadState)
497
0
        .map { readState in
498
0
          self = .clientClosedServerActive(readState: readState)
499
0
        }
500
0
501
0
    case .clientIdleServerIdle,
502
0
      .clientClosedServerActive,
503
0
      .clientActiveServerActive,
504
0
      .clientClosedServerClosed:
505
0
      result = .failure(.invalidState)
506
0
507
0
    case .modifying:
508
0
      preconditionFailure("State left as 'modifying'")
509
0
    }
510
0
511
0
    return result
512
0
  }
513
514
  /// See `GRPCClientStateMachine.receiveResponseBuffer(_:)`.
515
  mutating func receiveResponseBuffer(
516
    _ buffer: inout ByteBuffer,
517
    maxMessageLength: Int
518
0
  ) -> Result<[ByteBuffer], MessageReadError> {
519
0
    let result: Result<[ByteBuffer], MessageReadError>
520
0
521
0
    switch self {
522
0
    case var .clientClosedServerActive(readState):
523
0
      result = readState.readMessages(&buffer, maxLength: maxMessageLength)
524
0
      self = .clientClosedServerActive(readState: readState)
525
0
526
0
    case .clientActiveServerActive(let writeState, var readState):
527
0
      result = readState.readMessages(&buffer, maxLength: maxMessageLength)
528
0
      self = .clientActiveServerActive(writeState: writeState, readState: readState)
529
0
530
0
    case .clientIdleServerIdle,
531
0
      .clientActiveServerIdle,
532
0
      .clientClosedServerIdle,
533
0
      .clientClosedServerClosed:
534
0
      result = .failure(.invalidState)
535
0
536
0
    case .modifying:
537
0
      preconditionFailure("State left as 'modifying'")
538
0
    }
539
0
540
0
    return result
541
0
  }
542
543
  /// See `GRPCClientStateMachine.receiveEndOfResponseStream(_:)`.
544
  mutating func receiveEndOfResponseStream(
545
    _ trailers: HPACKHeaders
546
0
  ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
547
0
    let result: Result<GRPCStatus, ReceiveEndOfResponseStreamError>
548
0
549
0
    switch self {
550
0
    case .clientActiveServerIdle,
551
0
      .clientClosedServerIdle:
552
0
      result = self.parseTrailersOnly(trailers).map { status in
553
0
        self = .clientClosedServerClosed
554
0
        return status
555
0
      }
556
0
557
0
    case .clientActiveServerActive,
558
0
      .clientClosedServerActive:
559
0
      result = .success(self.parseTrailers(trailers))
560
0
      self = .clientClosedServerClosed
561
0
562
0
    case .clientIdleServerIdle,
563
0
      .clientClosedServerClosed:
564
0
      result = .failure(.invalidState)
565
0
566
0
    case .modifying:
567
0
      preconditionFailure("State left as 'modifying'")
568
0
    }
569
0
570
0
    return result
571
0
  }
572
573
  /// See `GRPCClientStateMachine.receiveEndOfResponseStream()`.
574
0
  mutating func receiveEndOfResponseStream() -> GRPCStatus? {
575
0
    let status: GRPCStatus?
576
0
577
0
    switch self {
578
0
    case .clientIdleServerIdle:
579
0
      // Can't see end stream before writing on it.
580
0
      preconditionFailure()
581
0
582
0
    case .clientActiveServerIdle,
583
0
      .clientActiveServerActive,
584
0
      .clientClosedServerIdle,
585
0
      .clientClosedServerActive:
586
0
      self = .clientClosedServerClosed
587
0
      status = .init(
588
0
        code: .internalError,
589
0
        message: "Protocol violation: received DATA frame with end stream set"
590
0
      )
591
0
592
0
    case .clientClosedServerClosed:
593
0
      // We've already closed. Ignore this.
594
0
      status = nil
595
0
596
0
    case .modifying:
597
0
      preconditionFailure("State left as 'modifying'")
598
0
    }
599
0
600
0
    return status
601
0
  }
602
603
  /// Makes the request headers (`Request-Headers` in the specification) used to initiate an RPC
604
  /// call.
605
  ///
606
  /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
607
  ///
608
  /// - Parameter host: The host serving the RPC.
609
  /// - Parameter options: Any options related to the call.
610
  /// - Parameter requestID: A request ID associated with the call. An additional header will be
611
  ///     added using this value if `options.requestIDHeader` is specified.
612
  private func makeRequestHeaders(
613
    method: String,
614
    scheme: String,
615
    host: String,
616
    path: String,
617
    timeout: GRPCTimeout,
618
    customMetadata: HPACKHeaders,
619
    compression: ClientMessageEncoding
620
0
  ) -> HPACKHeaders {
621
0
    var headers = HPACKHeaders()
622
0
    // The 10 is:
623
0
    // - 6 which are required and added just below, and
624
0
    // - 4 which are possibly added, depending on conditions.
625
0
    headers.reserveCapacity(10 + customMetadata.count)
626
0
627
0
    // Add the required headers.
628
0
    headers.add(name: ":method", value: method)
629
0
    headers.add(name: ":path", value: path)
630
0
    headers.add(name: ":authority", value: host)
631
0
    headers.add(name: ":scheme", value: scheme)
632
0
    headers.add(name: "content-type", value: "application/grpc")
633
0
    // Used to detect incompatible proxies, part of the gRPC specification.
634
0
    headers.add(name: "te", value: "trailers")
635
0
636
0
    switch compression {
637
0
    case let .enabled(configuration):
638
0
      // Request encoding.
639
0
      if let outbound = configuration.outbound {
640
0
        headers.add(name: GRPCHeaderName.encoding, value: outbound.name)
641
0
      }
642
0
643
0
      // Response encoding.
644
0
      if !configuration.inbound.isEmpty {
645
0
        headers.add(name: GRPCHeaderName.acceptEncoding, value: configuration.acceptEncodingHeader)
646
0
      }
647
0
648
0
    case .disabled:
649
0
      ()
650
0
    }
651
0
652
0
    // Add the timeout header, if a timeout was specified.
653
0
    if timeout != .infinite {
654
0
      headers.add(name: GRPCHeaderName.timeout, value: String(describing: timeout))
655
0
    }
656
0
657
0
    // Add user-defined custom metadata: this should come after the call definition headers.
658
0
    // TODO: make header normalization user-configurable.
659
0
    headers.add(
660
0
      contentsOf: customMetadata.lazy.map { name, value, indexing in
661
0
        (name.lowercased(), value, indexing)
662
0
      }
663
0
    )
664
0
665
0
    // Add default user-agent value, if `customMetadata` didn't contain user-agent
666
0
    if !customMetadata.contains(name: "user-agent") {
667
0
      headers.add(name: "user-agent", value: GRPCClientStateMachine.userAgent)
668
0
    }
669
0
670
0
    return headers
671
0
  }
672
673
  /// Parses the response headers ("Response-Headers" in the specification) from the server into
674
  /// a `ReadState`.
675
  ///
676
  /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
677
  ///
678
  /// - Parameter headers: The headers to parse.
679
  private func parseResponseHeaders(
680
    _ headers: HPACKHeaders,
681
    pendingReadState: PendingReadState
682
0
  ) -> Result<ReadState, ReceiveResponseHeadError> {
683
0
    // From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
684
0
    //
685
0
    // "Implementations should expect broken deployments to send non-200 HTTP status codes in
686
0
    // responses as well as a variety of non-GRPC content-types and to omit Status & Status-Message.
687
0
    // Implementations must synthesize a Status & Status-Message to propagate to the application
688
0
    // layer when this occurs."
689
0
    let statusHeader = headers.first(name: ":status")
690
0
    let responseStatus =
691
0
      statusHeader
692
0
      .flatMap(Int.init)
693
0
      .map { code in
694
0
        HTTPResponseStatus(statusCode: code)
695
0
      } ?? .preconditionFailed
696
0
697
0
    guard responseStatus == .ok else {
698
0
      return .failure(.invalidHTTPStatus(statusHeader))
699
0
    }
700
0
701
0
    let contentTypeHeader = headers.first(name: "content-type")
702
0
    guard contentTypeHeader.flatMap(ContentType.init) != nil else {
703
0
      return .failure(.invalidContentType(contentTypeHeader))
704
0
    }
705
0
706
0
    let result: Result<ReadState, ReceiveResponseHeadError>
707
0
708
0
    // What compression mechanism is the server using, if any?
709
0
    if let encodingHeader = headers.first(name: GRPCHeaderName.encoding) {
710
0
      // Note: the server is allowed to encode messages using an algorithm which wasn't included in
711
0
      // the 'grpc-accept-encoding' header. If the client still supports that algorithm (despite not
712
0
      // permitting the server to use it) then it must still decode that message. Ideally we should
713
0
      // log a message here if that was the case but we don't hold that information.
714
0
      if let compression = CompressionAlgorithm(rawValue: encodingHeader) {
715
0
        result = .success(pendingReadState.makeReadState(compression: compression))
716
0
      } else {
717
0
        // The algorithm isn't one we support.
718
0
        result = .failure(.unsupportedMessageEncoding(encodingHeader))
719
0
      }
720
0
    } else {
721
0
      // No compression was specified, this is fine.
722
0
      result = .success(pendingReadState.makeReadState(compression: nil))
723
0
    }
724
0
725
0
    return result
726
0
  }
727
728
  /// Parses the response trailers ("Trailers" in the specification) from the server into
729
  /// a `GRPCStatus`.
730
  ///
731
  /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
732
  ///
733
  /// - Parameter trailers: Trailers to parse.
734
0
  private func parseTrailers(_ trailers: HPACKHeaders) -> GRPCStatus {
735
0
    // Extract the "Status" and "Status-Message"
736
0
    let code = self.readStatusCode(from: trailers) ?? .unknown
737
0
    let message = self.readStatusMessage(from: trailers)
738
0
    return .init(code: code, message: message)
739
0
  }
740
741
0
  private func readStatusCode(from trailers: HPACKHeaders) -> GRPCStatus.Code? {
742
0
    return trailers.first(name: GRPCHeaderName.statusCode)
743
0
      .flatMap(Int.init)
744
0
      .flatMap({ GRPCStatus.Code(rawValue: $0) })
745
0
  }
746
747
0
  private func readStatusMessage(from trailers: HPACKHeaders) -> String? {
748
0
    return trailers.first(name: GRPCHeaderName.statusMessage)
749
0
      .map(GRPCStatusMessageMarshaller.unmarshall)
750
0
  }
751
752
  /// Parses a "Trailers-Only" response from the server into a `GRPCStatus`.
753
  ///
754
  /// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
755
  ///
756
  /// - Parameter trailers: Trailers to parse.
757
  private func parseTrailersOnly(
758
    _ trailers: HPACKHeaders
759
0
  ) -> Result<GRPCStatus, ReceiveEndOfResponseStreamError> {
760
0
    // We need to check whether we have a valid HTTP status in the headers, if we don't then we also
761
0
    // need to check whether we have a gRPC status as it should take preference over a synthesising
762
0
    // one from the ":status".
763
0
    //
764
0
    // See: https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
765
0
    let statusHeader = trailers.first(name: ":status")
766
0
    let httpResponseStatus = statusHeader.flatMap(Int.init).map {
767
0
      HTTPResponseStatus(statusCode: $0)
768
0
    }
769
0
770
0
    guard let httpResponseStatus = httpResponseStatus else {
771
0
      return .failure(.invalidHTTPStatus(statusHeader))
772
0
    }
773
0
774
0
    guard httpResponseStatus == .ok else {
775
0
      // Non-200 response. If there's a 'grpc-status' message we should use that otherwise try
776
0
      // to create one from the HTTP status code.
777
0
      let grpcStatusCode =
778
0
        self.readStatusCode(from: trailers)
779
0
        ?? GRPCStatus.Code(httpStatus: Int(httpResponseStatus.code))
780
0
        ?? .unknown
781
0
      let message = self.readStatusMessage(from: trailers)
782
0
      return .success(GRPCStatus(code: grpcStatusCode, message: message))
783
0
    }
784
0
785
0
    // Only validate the content-type header if it's present. This is a small deviation from the
786
0
    // spec as the content-type is meant to be sent in "Trailers-Only" responses. However, if it's
787
0
    // missing then we should avoid the error and propagate the status code and message sent by
788
0
    // the server instead.
789
0
    if let contentTypeHeader = trailers.first(name: "content-type"),
790
0
      ContentType(value: contentTypeHeader) == nil
791
0
    {
792
0
      return .failure(.invalidContentType(contentTypeHeader))
793
0
    }
794
0
795
0
    // We've verified the status and content type are okay: parse the trailers.
796
0
    return .success(self.parseTrailers(trailers))
797
0
  }
798
}