/src/grpc-swift/Sources/GRPC/CoalescingLengthPrefixedMessageWriter.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2022, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import DequeModule |
17 | | import NIOCore |
18 | | |
19 | | internal struct CoalescingLengthPrefixedMessageWriter { |
20 | | /// Length of the gRPC message header (1 compression byte, 4 bytes for the length). |
21 | | static let metadataLength = 5 |
22 | | |
23 | | /// Message size above which we emit two buffers: one containing the header and one with the |
24 | | /// actual message bytes. At or below the limit we copy the message into a new buffer containing |
25 | | /// both the header and the message. |
26 | | /// |
27 | | /// Using two buffers avoids expensive copies of large messages. For smaller messages the copy |
28 | | /// is cheaper than the additional allocations and overhead required to send an extra HTTP/2 DATA |
29 | | /// frame. |
30 | | /// |
31 | | /// The value of 16k was chosen empirically. We subtract the length of the message header |
32 | | /// as `ByteBuffer` reserve capacity in powers of two and want to avoid overallocating. |
33 | | static let singleBufferSizeLimit = 16384 - Self.metadataLength |
34 | | |
35 | | /// The compression algorithm to use, if one should be used. |
36 | | private let compression: CompressionAlgorithm? |
37 | | /// Any compressor associated with the compression algorithm. |
38 | | private let compressor: Zlib.Deflate? |
39 | | |
40 | | /// Whether the compression message flag should be set. |
41 | 0 | private var supportsCompression: Bool { |
42 | 0 | return self.compression != nil |
43 | 0 | } |
44 | | |
45 | | /// A scratch buffer that we encode messages into: if the buffer isn't held elsewhere then we |
46 | | /// can avoid having to allocate a new one. |
47 | | private var scratch: ByteBuffer |
48 | | |
49 | | /// Outbound buffers waiting to be written. |
50 | | private var pending: OneOrManyQueue<Pending> |
51 | | |
52 | | private struct Pending { |
53 | | var buffer: ByteBuffer |
54 | | var promise: EventLoopPromise<Void>? |
55 | | var compress: Bool |
56 | | |
57 | 3.69M | init(buffer: ByteBuffer, compress: Bool, promise: EventLoopPromise<Void>?) { |
58 | 3.69M | self.buffer = buffer |
59 | 3.69M | self.promise = promise |
60 | 3.69M | self.compress = compress |
61 | 3.69M | } |
62 | | |
63 | 1.20M | var isSmallEnoughToCoalesce: Bool { |
64 | 1.20M | let limit = CoalescingLengthPrefixedMessageWriter.singleBufferSizeLimit |
65 | 1.20M | return self.buffer.readableBytes <= limit |
66 | 1.20M | } |
67 | | |
68 | 1.20M | var shouldCoalesce: Bool { |
69 | 1.20M | return self.isSmallEnoughToCoalesce || self.compress |
70 | 1.20M | } |
71 | | } |
72 | | |
73 | | private enum State { |
74 | | // Coalescing pending messages. |
75 | | case coalescing |
76 | | // Emitting a non-coalesced message; the header has been written, the body |
77 | | // needs to be written next. |
78 | | case emittingLargeFrame(ByteBuffer, EventLoopPromise<Void>?) |
79 | | } |
80 | | |
81 | | private var state: State |
82 | | |
83 | 124k | init(compression: CompressionAlgorithm? = nil, allocator: ByteBufferAllocator) { |
84 | 124k | self.compression = compression |
85 | 124k | self.scratch = allocator.buffer(capacity: 0) |
86 | 124k | self.state = .coalescing |
87 | 124k | self.pending = .init() |
88 | 124k | |
89 | 124k | switch self.compression?.algorithm { |
90 | 124k | case .none, .some(.identity): |
91 | 124k | self.compressor = nil |
92 | 124k | case .some(.deflate): |
93 | 0 | self.compressor = Zlib.Deflate(format: .deflate) |
94 | 124k | case .some(.gzip): |
95 | 0 | self.compressor = Zlib.Deflate(format: .gzip) |
96 | 124k | } |
97 | 124k | } |
98 | | |
99 | | /// Append a serialized message buffer to the writer. |
100 | 3.31M | mutating func append(buffer: ByteBuffer, compress: Bool, promise: EventLoopPromise<Void>?) { |
101 | 3.31M | let pending = Pending( |
102 | 3.31M | buffer: buffer, |
103 | 3.31M | compress: compress && self.supportsCompression, |
104 | 3.31M | promise: promise |
105 | 3.31M | ) |
106 | 3.31M | |
107 | 3.31M | self.pending.append(pending) |
108 | 3.31M | } |
109 | | |
110 | | /// Return a tuple of the next buffer to write and its associated write promise. |
111 | 185k | mutating func next() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? { |
112 | 185k | switch self.state { |
113 | 185k | case .coalescing: |
114 | 185k | // Nothing pending: exit early. |
115 | 185k | if self.pending.isEmpty { |
116 | 115k | return nil |
117 | 115k | } |
118 | 69.9k | |
119 | 69.9k | // First up we need to work out how many elements we're going to pop off the front |
120 | 69.9k | // and coalesce. |
121 | 69.9k | // |
122 | 69.9k | // At the same time we'll compute how much capacity we'll need in the buffer and cascade |
123 | 69.9k | // their promises. |
124 | 69.9k | var messagesToCoalesce = 0 |
125 | 69.9k | var requiredCapacity = 0 |
126 | 69.9k | var promise: EventLoopPromise<Void>? |
127 | 69.9k | |
128 | 828k | for element in self.pending { |
129 | 828k | if !element.shouldCoalesce { |
130 | 0 | break |
131 | 828k | } |
132 | 828k | |
133 | 828k | messagesToCoalesce &+= 1 |
134 | 828k | requiredCapacity += element.buffer.readableBytes + Self.metadataLength |
135 | 828k | if let existing = promise { |
136 | 0 | existing.futureResult.cascade(to: element.promise) |
137 | 828k | } else { |
138 | 828k | promise = element.promise |
139 | 828k | } |
140 | 828k | } |
141 | 69.9k | |
142 | 69.9k | if messagesToCoalesce == 0 { |
143 | 0 | // Nothing to coalesce; this means the first element should be emitted with its header in |
144 | 0 | // a separate buffer. Note: the force unwrap is okay here: we early exit if `self.pending` |
145 | 0 | // is empty. |
146 | 0 | let pending = self.pending.pop()! |
147 | 0 |
|
148 | 0 | // Set the scratch buffer to just be a message header then store the message bytes. |
149 | 0 | self.scratch.clear(minimumCapacity: Self.metadataLength) |
150 | 0 | self.scratch.writeMultipleIntegers(UInt8(0), UInt32(pending.buffer.readableBytes)) |
151 | 0 | self.state = .emittingLargeFrame(pending.buffer, pending.promise) |
152 | 0 | return (.success(self.scratch), nil) |
153 | 69.9k | } else { |
154 | 69.9k | self.scratch.clear(minimumCapacity: requiredCapacity) |
155 | 69.9k | |
156 | 69.9k | // Drop and encode the messages. |
157 | 897k | while messagesToCoalesce > 0, let next = self.pending.pop() { |
158 | 828k | messagesToCoalesce &-= 1 |
159 | 828k | do { |
160 | 828k | try self.encode(next.buffer, compress: next.compress) |
161 | 828k | } catch { |
162 | 0 | return (.failure(error), promise) |
163 | 828k | } |
164 | 828k | } |
165 | 69.9k | |
166 | 69.9k | return (.success(self.scratch), promise) |
167 | 69.9k | } |
168 | 185k | |
169 | 185k | case let .emittingLargeFrame(buffer, promise): |
170 | 0 | // We just emitted the header, now emit the body. |
171 | 0 | self.state = .coalescing |
172 | 0 | return (.success(buffer), promise) |
173 | 185k | } |
174 | 185k | } |
175 | | |
176 | 1.20M | private mutating func encode(_ buffer: ByteBuffer, compress: Bool) throws { |
177 | 1.20M | if let compressor = self.compressor, compress { |
178 | 0 | try self.encode(buffer, compressor: compressor) |
179 | 1.20M | } else { |
180 | 1.20M | try self.encode(buffer) |
181 | 1.20M | } |
182 | 1.20M | } |
183 | | |
184 | 0 | private mutating func encode(_ buffer: ByteBuffer, compressor: Zlib.Deflate) throws { |
185 | 0 | // Set the compression byte. |
186 | 0 | self.scratch.writeInteger(UInt8(1)) |
187 | 0 | // Set the length to zero; we'll write the actual value in a moment. |
188 | 0 | let payloadSizeIndex = self.scratch.writerIndex |
189 | 0 | self.scratch.writeInteger(UInt32(0)) |
190 | 0 |
|
191 | 0 | let bytesWritten: Int |
192 | 0 | do { |
193 | 0 | var buffer = buffer |
194 | 0 | bytesWritten = try compressor.deflate(&buffer, into: &self.scratch) |
195 | 0 | } catch { |
196 | 0 | throw error |
197 | 0 | } |
198 | 0 |
|
199 | 0 | self.scratch.setInteger(UInt32(bytesWritten), at: payloadSizeIndex) |
200 | 0 |
|
201 | 0 | // Finally, the compression context should be reset between messages. |
202 | 0 | compressor.reset() |
203 | 0 | } |
204 | | |
205 | 1.20M | private mutating func encode(_ buffer: ByteBuffer) throws { |
206 | 1.20M | self.scratch.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes)) |
207 | 1.20M | self.scratch.writeImmutableBuffer(buffer) |
208 | 1.20M | } |
209 | | } |
210 | | |
211 | | /// A FIFO-queue which allows for a single to be stored on the stack and defers to a |
212 | | /// heap-implementation if further elements are added. |
213 | | /// |
214 | | /// This is useful when optimising for unary streams where avoiding the cost of a heap |
215 | | /// allocation is desirable. |
216 | | internal struct OneOrManyQueue<Element>: Collection { |
217 | | private var backing: Backing |
218 | | |
219 | | private enum Backing: Collection { |
220 | | case none |
221 | | case one(Element) |
222 | | case many(Deque<Element>) |
223 | | |
224 | 101k | var startIndex: Int { |
225 | 101k | switch self { |
226 | 101k | case .none, .one: |
227 | 91.5k | return 0 |
228 | 101k | case let .many(elements): |
229 | 9.82k | return elements.startIndex |
230 | 101k | } |
231 | 101k | } |
232 | | |
233 | 1.31M | var endIndex: Int { |
234 | 1.31M | switch self { |
235 | 1.31M | case .none: |
236 | 0 | return 0 |
237 | 1.31M | case .one: |
238 | 183k | return 1 |
239 | 1.31M | case let .many(elements): |
240 | 1.12M | return elements.endIndex |
241 | 1.31M | } |
242 | 1.31M | } |
243 | | |
244 | 1.20M | subscript(index: Int) -> Element { |
245 | 1.20M | switch self { |
246 | 1.20M | case .none: |
247 | 0 | fatalError("Invalid index") |
248 | 1.20M | case let .one(element): |
249 | 91.5k | assert(index == 0) |
250 | 91.5k | return element |
251 | 1.20M | case let .many(elements): |
252 | 1.11M | return elements[index] |
253 | 1.20M | } |
254 | 0 | } |
255 | | |
256 | 1.20M | func index(after index: Int) -> Int { |
257 | 1.20M | switch self { |
258 | 1.20M | case .none: |
259 | 0 | return 0 |
260 | 1.20M | case .one: |
261 | 91.5k | return 1 |
262 | 1.20M | case let .many(elements): |
263 | 1.11M | return elements.index(after: index) |
264 | 1.20M | } |
265 | 1.20M | } |
266 | | |
267 | 0 | var count: Int { |
268 | 0 | switch self { |
269 | 0 | case .none: |
270 | 0 | return 0 |
271 | 0 | case .one: |
272 | 0 | return 1 |
273 | 0 | case let .many(elements): |
274 | 0 | return elements.count |
275 | 0 | } |
276 | 0 | } |
277 | | |
278 | 269k | var isEmpty: Bool { |
279 | 269k | switch self { |
280 | 269k | case .none: |
281 | 158k | return true |
282 | 269k | case .one: |
283 | 91.5k | return false |
284 | 269k | case let .many(elements): |
285 | 19.9k | return elements.isEmpty |
286 | 269k | } |
287 | 269k | } |
288 | | |
289 | 1.20M | mutating func append(_ element: Element) { |
290 | 1.20M | switch self { |
291 | 1.20M | case .none: |
292 | 101k | self = .one(element) |
293 | 1.20M | case let .one(one): |
294 | 9.82k | var elements = Deque<Element>() |
295 | 9.82k | elements.reserveCapacity(16) |
296 | 9.82k | elements.append(one) |
297 | 9.82k | elements.append(element) |
298 | 9.82k | self = .many(elements) |
299 | 1.20M | case var .many(elements): |
300 | 1.09M | self = .none |
301 | 1.09M | elements.append(element) |
302 | 1.09M | self = .many(elements) |
303 | 1.20M | } |
304 | 1.20M | } |
305 | | |
306 | 1.20M | mutating func pop() -> Element? { |
307 | 1.20M | switch self { |
308 | 1.20M | case .none: |
309 | 0 | return nil |
310 | 1.20M | case let .one(element): |
311 | 91.5k | self = .none |
312 | 91.5k | return element |
313 | 1.20M | case var .many(many): |
314 | 1.11M | self = .none |
315 | 1.11M | let element = many.popFirst() |
316 | 1.11M | self = .many(many) |
317 | 1.11M | return element |
318 | 1.20M | } |
319 | 1.20M | } |
320 | | } |
321 | | |
322 | 124k | init() { |
323 | 124k | self.backing = .none |
324 | 124k | } |
325 | | |
326 | 185k | var isEmpty: Bool { |
327 | 185k | return self.backing.isEmpty |
328 | 185k | } |
329 | | |
330 | 0 | var count: Int { |
331 | 0 | return self.backing.count |
332 | 0 | } |
333 | | |
334 | 69.9k | var startIndex: Int { |
335 | 69.9k | return self.backing.startIndex |
336 | 69.9k | } |
337 | | |
338 | 897k | var endIndex: Int { |
339 | 897k | return self.backing.endIndex |
340 | 897k | } |
341 | | |
342 | 828k | subscript(index: Int) -> Element { |
343 | 828k | return self.backing[index] |
344 | 828k | } |
345 | | |
346 | 828k | func index(after index: Int) -> Int { |
347 | 828k | return self.backing.index(after: index) |
348 | 828k | } |
349 | | |
350 | 3.31M | mutating func append(_ element: Element) { |
351 | 3.31M | self.backing.append(element) |
352 | 3.31M | } |
353 | | |
354 | 828k | mutating func pop() -> Element? { |
355 | 828k | return self.backing.pop() |
356 | 828k | } |
357 | | } |