Coverage Report

Created: 2025-09-08 07:07

/src/grpc-swift/Sources/GRPC/CoalescingLengthPrefixedMessageWriter.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2022, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import DequeModule
17
import NIOCore
18
19
internal struct CoalescingLengthPrefixedMessageWriter {
20
  /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
21
  static let metadataLength = 5
22
23
  /// Message size above which we emit two buffers: one containing the header and one with the
24
  /// actual message bytes. At or below the limit we copy the message into a new buffer containing
25
  /// both the header and the message.
26
  ///
27
  /// Using two buffers avoids expensive copies of large messages. For smaller messages the copy
28
  /// is cheaper than the additional allocations and overhead required to send an extra HTTP/2 DATA
29
  /// frame.
30
  ///
31
  /// The value of 16k was chosen empirically. We subtract the length of the message header
32
  /// as `ByteBuffer` reserve capacity in powers of two and want to avoid overallocating.
33
  static let singleBufferSizeLimit = 16384 - Self.metadataLength
34
35
  /// The compression algorithm to use, if one should be used.
36
  private let compression: CompressionAlgorithm?
37
  /// Any compressor associated with the compression algorithm.
38
  private let compressor: Zlib.Deflate?
39
40
  /// Whether the compression message flag should be set.
41
0
  private var supportsCompression: Bool {
42
0
    return self.compression != nil
43
0
  }
44
45
  /// A scratch buffer that we encode messages into: if the buffer isn't held elsewhere then we
46
  /// can avoid having to allocate a new one.
47
  private var scratch: ByteBuffer
48
49
  /// Outbound buffers waiting to be written.
50
  private var pending: OneOrManyQueue<Pending>
51
52
  private struct Pending {
53
    var buffer: ByteBuffer
54
    var promise: EventLoopPromise<Void>?
55
    var compress: Bool
56
57
3.69M
    init(buffer: ByteBuffer, compress: Bool, promise: EventLoopPromise<Void>?) {
58
3.69M
      self.buffer = buffer
59
3.69M
      self.promise = promise
60
3.69M
      self.compress = compress
61
3.69M
    }
62
63
1.20M
    var isSmallEnoughToCoalesce: Bool {
64
1.20M
      let limit = CoalescingLengthPrefixedMessageWriter.singleBufferSizeLimit
65
1.20M
      return self.buffer.readableBytes <= limit
66
1.20M
    }
67
68
1.20M
    var shouldCoalesce: Bool {
69
1.20M
      return self.isSmallEnoughToCoalesce || self.compress
70
1.20M
    }
71
  }
72
73
  private enum State {
74
    // Coalescing pending messages.
75
    case coalescing
76
    // Emitting a non-coalesced message; the header has been written, the body
77
    // needs to be written next.
78
    case emittingLargeFrame(ByteBuffer, EventLoopPromise<Void>?)
79
  }
80
81
  private var state: State
82
83
124k
  init(compression: CompressionAlgorithm? = nil, allocator: ByteBufferAllocator) {
84
124k
    self.compression = compression
85
124k
    self.scratch = allocator.buffer(capacity: 0)
86
124k
    self.state = .coalescing
87
124k
    self.pending = .init()
88
124k
89
124k
    switch self.compression?.algorithm {
90
124k
    case .none, .some(.identity):
91
124k
      self.compressor = nil
92
124k
    case .some(.deflate):
93
0
      self.compressor = Zlib.Deflate(format: .deflate)
94
124k
    case .some(.gzip):
95
0
      self.compressor = Zlib.Deflate(format: .gzip)
96
124k
    }
97
124k
  }
98
99
  /// Append a serialized message buffer to the writer.
100
3.31M
  mutating func append(buffer: ByteBuffer, compress: Bool, promise: EventLoopPromise<Void>?) {
101
3.31M
    let pending = Pending(
102
3.31M
      buffer: buffer,
103
3.31M
      compress: compress && self.supportsCompression,
104
3.31M
      promise: promise
105
3.31M
    )
106
3.31M
107
3.31M
    self.pending.append(pending)
108
3.31M
  }
109
110
  /// Return a tuple of the next buffer to write and its associated write promise.
111
185k
  mutating func next() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
112
185k
    switch self.state {
113
185k
    case .coalescing:
114
185k
      // Nothing pending: exit early.
115
185k
      if self.pending.isEmpty {
116
115k
        return nil
117
115k
      }
118
69.9k
119
69.9k
      // First up we need to work out how many elements we're going to pop off the front
120
69.9k
      // and coalesce.
121
69.9k
      //
122
69.9k
      // At the same time we'll compute how much capacity we'll need in the buffer and cascade
123
69.9k
      // their promises.
124
69.9k
      var messagesToCoalesce = 0
125
69.9k
      var requiredCapacity = 0
126
69.9k
      var promise: EventLoopPromise<Void>?
127
69.9k
128
828k
      for element in self.pending {
129
828k
        if !element.shouldCoalesce {
130
0
          break
131
828k
        }
132
828k
133
828k
        messagesToCoalesce &+= 1
134
828k
        requiredCapacity += element.buffer.readableBytes + Self.metadataLength
135
828k
        if let existing = promise {
136
0
          existing.futureResult.cascade(to: element.promise)
137
828k
        } else {
138
828k
          promise = element.promise
139
828k
        }
140
828k
      }
141
69.9k
142
69.9k
      if messagesToCoalesce == 0 {
143
0
        // Nothing to coalesce; this means the first element should be emitted with its header in
144
0
        // a separate buffer. Note: the force unwrap is okay here: we early exit if `self.pending`
145
0
        // is empty.
146
0
        let pending = self.pending.pop()!
147
0
148
0
        // Set the scratch buffer to just be a message header then store the message bytes.
149
0
        self.scratch.clear(minimumCapacity: Self.metadataLength)
150
0
        self.scratch.writeMultipleIntegers(UInt8(0), UInt32(pending.buffer.readableBytes))
151
0
        self.state = .emittingLargeFrame(pending.buffer, pending.promise)
152
0
        return (.success(self.scratch), nil)
153
69.9k
      } else {
154
69.9k
        self.scratch.clear(minimumCapacity: requiredCapacity)
155
69.9k
156
69.9k
        // Drop and encode the messages.
157
897k
        while messagesToCoalesce > 0, let next = self.pending.pop() {
158
828k
          messagesToCoalesce &-= 1
159
828k
          do {
160
828k
            try self.encode(next.buffer, compress: next.compress)
161
828k
          } catch {
162
0
            return (.failure(error), promise)
163
828k
          }
164
828k
        }
165
69.9k
166
69.9k
        return (.success(self.scratch), promise)
167
69.9k
      }
168
185k
169
185k
    case let .emittingLargeFrame(buffer, promise):
170
0
      // We just emitted the header, now emit the body.
171
0
      self.state = .coalescing
172
0
      return (.success(buffer), promise)
173
185k
    }
174
185k
  }
175
176
1.20M
  private mutating func encode(_ buffer: ByteBuffer, compress: Bool) throws {
177
1.20M
    if let compressor = self.compressor, compress {
178
0
      try self.encode(buffer, compressor: compressor)
179
1.20M
    } else {
180
1.20M
      try self.encode(buffer)
181
1.20M
    }
182
1.20M
  }
183
184
0
  private mutating func encode(_ buffer: ByteBuffer, compressor: Zlib.Deflate) throws {
185
0
    // Set the compression byte.
186
0
    self.scratch.writeInteger(UInt8(1))
187
0
    // Set the length to zero; we'll write the actual value in a moment.
188
0
    let payloadSizeIndex = self.scratch.writerIndex
189
0
    self.scratch.writeInteger(UInt32(0))
190
0
191
0
    let bytesWritten: Int
192
0
    do {
193
0
      var buffer = buffer
194
0
      bytesWritten = try compressor.deflate(&buffer, into: &self.scratch)
195
0
    } catch {
196
0
      throw error
197
0
    }
198
0
199
0
    self.scratch.setInteger(UInt32(bytesWritten), at: payloadSizeIndex)
200
0
201
0
    // Finally, the compression context should be reset between messages.
202
0
    compressor.reset()
203
0
  }
204
205
1.20M
  private mutating func encode(_ buffer: ByteBuffer) throws {
206
1.20M
    self.scratch.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
207
1.20M
    self.scratch.writeImmutableBuffer(buffer)
208
1.20M
  }
209
}
210
211
/// A FIFO-queue which allows for a single to be stored on the stack and defers to a
212
/// heap-implementation if further elements are added.
213
///
214
/// This is useful when optimising for unary streams where avoiding the cost of a heap
215
/// allocation is desirable.
216
internal struct OneOrManyQueue<Element>: Collection {
217
  private var backing: Backing
218
219
  private enum Backing: Collection {
220
    case none
221
    case one(Element)
222
    case many(Deque<Element>)
223
224
101k
    var startIndex: Int {
225
101k
      switch self {
226
101k
      case .none, .one:
227
91.5k
        return 0
228
101k
      case let .many(elements):
229
9.82k
        return elements.startIndex
230
101k
      }
231
101k
    }
232
233
1.31M
    var endIndex: Int {
234
1.31M
      switch self {
235
1.31M
      case .none:
236
0
        return 0
237
1.31M
      case .one:
238
183k
        return 1
239
1.31M
      case let .many(elements):
240
1.12M
        return elements.endIndex
241
1.31M
      }
242
1.31M
    }
243
244
1.20M
    subscript(index: Int) -> Element {
245
1.20M
      switch self {
246
1.20M
      case .none:
247
0
        fatalError("Invalid index")
248
1.20M
      case let .one(element):
249
91.5k
        assert(index == 0)
250
91.5k
        return element
251
1.20M
      case let .many(elements):
252
1.11M
        return elements[index]
253
1.20M
      }
254
0
    }
255
256
1.20M
    func index(after index: Int) -> Int {
257
1.20M
      switch self {
258
1.20M
      case .none:
259
0
        return 0
260
1.20M
      case .one:
261
91.5k
        return 1
262
1.20M
      case let .many(elements):
263
1.11M
        return elements.index(after: index)
264
1.20M
      }
265
1.20M
    }
266
267
0
    var count: Int {
268
0
      switch self {
269
0
      case .none:
270
0
        return 0
271
0
      case .one:
272
0
        return 1
273
0
      case let .many(elements):
274
0
        return elements.count
275
0
      }
276
0
    }
277
278
269k
    var isEmpty: Bool {
279
269k
      switch self {
280
269k
      case .none:
281
158k
        return true
282
269k
      case .one:
283
91.5k
        return false
284
269k
      case let .many(elements):
285
19.9k
        return elements.isEmpty
286
269k
      }
287
269k
    }
288
289
1.20M
    mutating func append(_ element: Element) {
290
1.20M
      switch self {
291
1.20M
      case .none:
292
101k
        self = .one(element)
293
1.20M
      case let .one(one):
294
9.82k
        var elements = Deque<Element>()
295
9.82k
        elements.reserveCapacity(16)
296
9.82k
        elements.append(one)
297
9.82k
        elements.append(element)
298
9.82k
        self = .many(elements)
299
1.20M
      case var .many(elements):
300
1.09M
        self = .none
301
1.09M
        elements.append(element)
302
1.09M
        self = .many(elements)
303
1.20M
      }
304
1.20M
    }
305
306
1.20M
    mutating func pop() -> Element? {
307
1.20M
      switch self {
308
1.20M
      case .none:
309
0
        return nil
310
1.20M
      case let .one(element):
311
91.5k
        self = .none
312
91.5k
        return element
313
1.20M
      case var .many(many):
314
1.11M
        self = .none
315
1.11M
        let element = many.popFirst()
316
1.11M
        self = .many(many)
317
1.11M
        return element
318
1.20M
      }
319
1.20M
    }
320
  }
321
322
124k
  init() {
323
124k
    self.backing = .none
324
124k
  }
325
326
185k
  var isEmpty: Bool {
327
185k
    return self.backing.isEmpty
328
185k
  }
329
330
0
  var count: Int {
331
0
    return self.backing.count
332
0
  }
333
334
69.9k
  var startIndex: Int {
335
69.9k
    return self.backing.startIndex
336
69.9k
  }
337
338
897k
  var endIndex: Int {
339
897k
    return self.backing.endIndex
340
897k
  }
341
342
828k
  subscript(index: Int) -> Element {
343
828k
    return self.backing[index]
344
828k
  }
345
346
828k
  func index(after index: Int) -> Int {
347
828k
    return self.backing.index(after: index)
348
828k
  }
349
350
3.31M
  mutating func append(_ element: Element) {
351
3.31M
    self.backing.append(element)
352
3.31M
  }
353
354
828k
  mutating func pop() -> Element? {
355
828k
    return self.backing.pop()
356
828k
  }
357
}