Coverage Report

Created: 2026-03-11 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/CoalescingLengthPrefixedMessageWriter.swift
Line
Count
Source
1
/*
2
 * Copyright 2022, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import DequeModule
17
import NIOCore
18
19
internal struct CoalescingLengthPrefixedMessageWriter {
20
  /// Length of the gRPC message header (1 compression byte, 4 bytes for the length).
21
  static let metadataLength = 5
22
23
  /// Message size above which we emit two buffers: one containing the header and one with the
24
  /// actual message bytes. At or below the limit we copy the message into a new buffer containing
25
  /// both the header and the message.
26
  ///
27
  /// Using two buffers avoids expensive copies of large messages. For smaller messages the copy
28
  /// is cheaper than the additional allocations and overhead required to send an extra HTTP/2 DATA
29
  /// frame.
30
  ///
31
  /// The value of 16k was chosen empirically. We subtract the length of the message header
32
  /// as `ByteBuffer` reserve capacity in powers of two and want to avoid overallocating.
33
  static let singleBufferSizeLimit = 16384 - Self.metadataLength
34
35
  /// The compression algorithm to use, if one should be used.
36
  private let compression: CompressionAlgorithm?
37
  /// Any compressor associated with the compression algorithm.
38
  private let compressor: Zlib.Deflate?
39
40
  /// Whether the compression message flag should be set.
41
0
  private var supportsCompression: Bool {
42
0
    return self.compression != nil
43
0
  }
44
45
  /// A scratch buffer that we encode messages into: if the buffer isn't held elsewhere then we
46
  /// can avoid having to allocate a new one.
47
  private var scratch: ByteBuffer
48
49
  /// Outbound buffers waiting to be written.
50
  private var pending: OneOrManyQueue<Pending>
51
52
  private struct Pending {
53
    var buffer: ByteBuffer
54
    var promise: EventLoopPromise<Void>?
55
    var compress: Bool
56
57
3.61M
    init(buffer: ByteBuffer, compress: Bool, promise: EventLoopPromise<Void>?) {
58
3.61M
      self.buffer = buffer
59
3.61M
      self.promise = promise
60
3.61M
      self.compress = compress
61
3.61M
    }
62
63
1.31M
    var isSmallEnoughToCoalesce: Bool {
64
1.31M
      let limit = CoalescingLengthPrefixedMessageWriter.singleBufferSizeLimit
65
1.31M
      return self.buffer.readableBytes <= limit
66
1.31M
    }
67
68
1.31M
    var shouldCoalesce: Bool {
69
1.31M
      return self.isSmallEnoughToCoalesce || self.compress
70
1.31M
    }
71
  }
72
73
  private enum State {
74
    // Coalescing pending messages.
75
    case coalescing
76
    // Emitting a non-coalesced message; the header has been written, the body
77
    // needs to be written next.
78
    case emittingLargeFrame(ByteBuffer, EventLoopPromise<Void>?)
79
  }
80
81
  private var state: State
82
83
417k
  init(compression: CompressionAlgorithm? = nil, allocator: ByteBufferAllocator) {
84
417k
    self.compression = compression
85
417k
    self.scratch = allocator.buffer(capacity: 0)
86
417k
    self.state = .coalescing
87
417k
    self.pending = .init()
88
417k
89
417k
    switch self.compression?.algorithm {
90
417k
    case .none, .some(.identity):
91
417k
      self.compressor = nil
92
417k
    case .some(.deflate):
93
0
      self.compressor = Zlib.Deflate(format: .deflate)
94
417k
    case .some(.gzip):
95
0
      self.compressor = Zlib.Deflate(format: .gzip)
96
417k
    }
97
417k
  }
98
99
  /// Append a serialized message buffer to the writer.
100
3.06M
  mutating func append(buffer: ByteBuffer, compress: Bool, promise: EventLoopPromise<Void>?) {
101
3.06M
    let pending = Pending(
102
3.06M
      buffer: buffer,
103
3.06M
      compress: compress && self.supportsCompression,
104
3.06M
      promise: promise
105
3.06M
    )
106
3.06M
107
3.06M
    self.pending.append(pending)
108
3.06M
  }
109
110
  /// Return a tuple of the next buffer to write and its associated write promise.
111
154k
  mutating func next() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
112
154k
    switch self.state {
113
154k
    case .coalescing:
114
154k
      // Nothing pending: exit early.
115
154k
      if self.pending.isEmpty {
116
94.6k
        return nil
117
94.6k
      }
118
59.9k
119
59.9k
      // First up we need to work out how many elements we're going to pop off the front
120
59.9k
      // and coalesce.
121
59.9k
      //
122
59.9k
      // At the same time we'll compute how much capacity we'll need in the buffer and cascade
123
59.9k
      // their promises.
124
59.9k
      var messagesToCoalesce = 0
125
59.9k
      var requiredCapacity = 0
126
59.9k
      var promise: EventLoopPromise<Void>?
127
59.9k
128
765k
      for element in self.pending {
129
765k
        if !element.shouldCoalesce {
130
0
          break
131
765k
        }
132
765k
133
765k
        messagesToCoalesce &+= 1
134
765k
        requiredCapacity += element.buffer.readableBytes + Self.metadataLength
135
765k
        if let existing = promise {
136
0
          existing.futureResult.cascade(to: element.promise)
137
765k
        } else {
138
765k
          promise = element.promise
139
765k
        }
140
765k
      }
141
59.9k
142
59.9k
      if messagesToCoalesce == 0 {
143
0
        // Nothing to coalesce; this means the first element should be emitted with its header in
144
0
        // a separate buffer. Note: the force unwrap is okay here: we early exit if `self.pending`
145
0
        // is empty.
146
0
        let pending = self.pending.pop()!
147
0
148
0
        // Set the scratch buffer to just be a message header then store the message bytes.
149
0
        self.scratch.clear(minimumCapacity: Self.metadataLength)
150
0
        self.scratch.writeMultipleIntegers(UInt8(0), UInt32(pending.buffer.readableBytes))
151
0
        self.state = .emittingLargeFrame(pending.buffer, pending.promise)
152
0
        return (.success(self.scratch), nil)
153
59.9k
      } else {
154
59.9k
        self.scratch.clear(minimumCapacity: requiredCapacity)
155
59.9k
156
59.9k
        // Drop and encode the messages.
157
825k
        while messagesToCoalesce > 0, let next = self.pending.pop() {
158
765k
          messagesToCoalesce &-= 1
159
765k
          do {
160
765k
            try self.encode(next.buffer, compress: next.compress)
161
765k
          } catch {
162
0
            return (.failure(error), promise)
163
765k
          }
164
765k
        }
165
59.9k
166
59.9k
        return (.success(self.scratch), promise)
167
59.9k
      }
168
154k
169
154k
    case let .emittingLargeFrame(buffer, promise):
170
0
      // We just emitted the header, now emit the body.
171
0
      self.state = .coalescing
172
0
      return (.success(buffer), promise)
173
154k
    }
174
154k
  }
175
176
1.31M
  private mutating func encode(_ buffer: ByteBuffer, compress: Bool) throws {
177
1.31M
    if let compressor = self.compressor, compress {
178
0
      try self.encode(buffer, compressor: compressor)
179
1.31M
    } else {
180
1.31M
      try self.encode(buffer)
181
1.31M
    }
182
1.31M
  }
183
184
0
  private mutating func encode(_ buffer: ByteBuffer, compressor: Zlib.Deflate) throws {
185
0
    // Set the compression byte.
186
0
    self.scratch.writeInteger(UInt8(1))
187
0
    // Set the length to zero; we'll write the actual value in a moment.
188
0
    let payloadSizeIndex = self.scratch.writerIndex
189
0
    self.scratch.writeInteger(UInt32(0))
190
0
191
0
    let bytesWritten: Int
192
0
    do {
193
0
      var buffer = buffer
194
0
      bytesWritten = try compressor.deflate(&buffer, into: &self.scratch)
195
0
    } catch {
196
0
      throw error
197
0
    }
198
0
199
0
    self.scratch.setInteger(UInt32(bytesWritten), at: payloadSizeIndex)
200
0
201
0
    // Finally, the compression context should be reset between messages.
202
0
    compressor.reset()
203
0
  }
204
205
1.31M
  private mutating func encode(_ buffer: ByteBuffer) throws {
206
1.31M
    self.scratch.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
207
1.31M
    self.scratch.writeImmutableBuffer(buffer)
208
1.31M
  }
209
}
210
211
/// A FIFO-queue which allows for a single to be stored on the stack and defers to a
212
/// heap-implementation if further elements are added.
213
///
214
/// This is useful when optimising for unary streams where avoiding the cost of a heap
215
/// allocation is desirable.
216
internal struct OneOrManyQueue<Element>: Collection {
217
  private var backing: Backing
218
219
  private enum Backing: Collection {
220
    case none
221
    case one(Element)
222
    case many(Deque<Element>)
223
224
77.9k
    var startIndex: Int {
225
77.9k
      switch self {
226
77.9k
      case .none, .one:
227
68.8k
        return 0
228
77.9k
      case let .many(elements):
229
9.09k
        return elements.startIndex
230
77.9k
      }
231
77.9k
    }
232
233
1.39M
    var endIndex: Int {
234
1.39M
      switch self {
235
1.39M
      case .none:
236
0
        return 0
237
1.39M
      case .one:
238
137k
        return 1
239
1.39M
      case let .many(elements):
240
1.25M
        return elements.endIndex
241
1.39M
      }
242
1.39M
    }
243
244
1.31M
    subscript(index: Int) -> Element {
245
1.31M
      switch self {
246
1.31M
      case .none:
247
0
        fatalError("Invalid index")
248
1.31M
      case let .one(element):
249
68.8k
        assert(index == 0)
250
68.8k
        return element
251
1.31M
      case let .many(elements):
252
1.24M
        return elements[index]
253
1.31M
      }
254
0
    }
255
256
1.31M
    func index(after index: Int) -> Int {
257
1.31M
      switch self {
258
1.31M
      case .none:
259
0
        return 0
260
1.31M
      case .one:
261
68.8k
        return 1
262
1.31M
      case let .many(elements):
263
1.24M
        return elements.index(after: index)
264
1.31M
      }
265
1.31M
    }
266
267
0
    var count: Int {
268
0
      switch self {
269
0
      case .none:
270
0
        return 0
271
0
      case .one:
272
0
        return 1
273
0
      case let .many(elements):
274
0
        return elements.count
275
0
      }
276
0
    }
277
278
215k
    var isEmpty: Bool {
279
215k
      switch self {
280
215k
      case .none:
281
128k
        return true
282
215k
      case .one:
283
68.8k
        return false
284
215k
      case let .many(elements):
285
18.4k
        return elements.isEmpty
286
215k
      }
287
215k
    }
288
289
1.31M
    mutating func append(_ element: Element) {
290
1.31M
      switch self {
291
1.31M
      case .none:
292
77.9k
        self = .one(element)
293
1.31M
      case let .one(one):
294
9.09k
        var elements = Deque<Element>()
295
9.09k
        elements.reserveCapacity(16)
296
9.09k
        elements.append(one)
297
9.09k
        elements.append(element)
298
9.09k
        self = .many(elements)
299
1.31M
      case var .many(elements):
300
1.22M
        self = .none
301
1.22M
        elements.append(element)
302
1.22M
        self = .many(elements)
303
1.31M
      }
304
1.31M
    }
305
306
1.31M
    mutating func pop() -> Element? {
307
1.31M
      switch self {
308
1.31M
      case .none:
309
0
        return nil
310
1.31M
      case let .one(element):
311
68.8k
        self = .none
312
68.8k
        return element
313
1.31M
      case var .many(many):
314
1.24M
        self = .none
315
1.24M
        let element = many.popFirst()
316
1.24M
        self = .many(many)
317
1.24M
        return element
318
1.31M
      }
319
1.31M
    }
320
  }
321
322
417k
  init() {
323
417k
    self.backing = .none
324
417k
  }
325
326
154k
  var isEmpty: Bool {
327
154k
    return self.backing.isEmpty
328
154k
  }
329
330
0
  var count: Int {
331
0
    return self.backing.count
332
0
  }
333
334
59.9k
  var startIndex: Int {
335
59.9k
    return self.backing.startIndex
336
59.9k
  }
337
338
825k
  var endIndex: Int {
339
825k
    return self.backing.endIndex
340
825k
  }
341
342
765k
  subscript(index: Int) -> Element {
343
765k
    return self.backing[index]
344
765k
  }
345
346
765k
  func index(after index: Int) -> Int {
347
765k
    return self.backing.index(after: index)
348
765k
  }
349
350
3.06M
  mutating func append(_ element: Element) {
351
3.06M
    self.backing.append(element)
352
3.06M
  }
353
354
765k
  mutating func pop() -> Element? {
355
765k
    return self.backing.pop()
356
765k
  }
357
}