/src/grpc-swift/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | import NIOHPACK |
19 | | import NIOHTTP2 |
20 | | |
21 | | internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter { |
22 | | typealias InboundIn = HTTP2Frame.FramePayload |
23 | | typealias OutboundOut = HTTP2Frame.FramePayload |
24 | | |
25 | | private var logger: Logger |
26 | | private var state: HTTP2ToRawGRPCStateMachine |
27 | | private let errorDelegate: ServerErrorDelegate? |
28 | | private var context: ChannelHandlerContext! |
29 | | |
30 | | private let servicesByName: [Substring: CallHandlerProvider] |
31 | | private let encoding: ServerMessageEncoding |
32 | | private let normalizeHeaders: Bool |
33 | | private let maxReceiveMessageLength: Int |
34 | | |
35 | | /// The configuration state of the handler. |
36 | 49.0k | private var configurationState: Configuration = .notConfigured |
37 | | |
38 | | /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a |
39 | | /// burst of reading has completed. |
40 | 49.0k | private var isReading = false |
41 | | |
42 | | /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true` |
43 | | /// then it is held until the read completes in order to elide unnecessary flushes. |
44 | 49.0k | private var flushPending = false |
45 | | |
46 | | private enum Configuration { |
47 | | case notConfigured |
48 | | case configured(GRPCServerHandlerProtocol) |
49 | | |
50 | 132k | var isConfigured: Bool { |
51 | 132k | switch self { |
52 | 132k | case .configured: |
53 | 0 | return true |
54 | 132k | case .notConfigured: |
55 | 132k | return false |
56 | 132k | } |
57 | 132k | } |
58 | | |
59 | 207k | mutating func tearDown() -> GRPCServerHandlerProtocol? { |
60 | 207k | switch self { |
61 | 207k | case .notConfigured: |
62 | 46.2k | return nil |
63 | 207k | case let .configured(handler): |
64 | 161k | self = .notConfigured |
65 | 161k | return handler |
66 | 207k | } |
67 | 207k | } |
68 | | } |
69 | | |
70 | | init( |
71 | | servicesByName: [Substring: CallHandlerProvider], |
72 | | encoding: ServerMessageEncoding, |
73 | | errorDelegate: ServerErrorDelegate?, |
74 | | normalizeHeaders: Bool, |
75 | | maximumReceiveMessageLength: Int, |
76 | | logger: Logger |
77 | 32.2k | ) { |
78 | 32.2k | self.logger = logger |
79 | 32.2k | self.errorDelegate = errorDelegate |
80 | 32.2k | self.servicesByName = servicesByName |
81 | 32.2k | self.encoding = encoding |
82 | 32.2k | self.normalizeHeaders = normalizeHeaders |
83 | 32.2k | self.maxReceiveMessageLength = maximumReceiveMessageLength |
84 | 32.2k | self.state = HTTP2ToRawGRPCStateMachine() |
85 | 32.2k | } |
86 | | |
87 | 32.2k | internal func handlerAdded(context: ChannelHandlerContext) { |
88 | 32.2k | self.context = context |
89 | 32.2k | } |
90 | | |
91 | 32.2k | internal func handlerRemoved(context: ChannelHandlerContext) { |
92 | 32.2k | self.context = nil |
93 | 32.2k | self.configurationState = .notConfigured |
94 | 32.2k | } |
95 | | |
96 | 10.4k | internal func errorCaught(context: ChannelHandlerContext, error: Error) { |
97 | 10.4k | switch self.configurationState { |
98 | 10.4k | case .notConfigured: |
99 | 8.89k | context.close(mode: .all, promise: nil) |
100 | 10.4k | case let .configured(hander): |
101 | 1.60k | hander.receiveError(error) |
102 | 10.4k | } |
103 | 10.4k | } |
104 | | |
105 | 32.2k | internal func channelInactive(context: ChannelHandlerContext) { |
106 | 32.2k | if let handler = self.configurationState.tearDown() { |
107 | 1.60k | handler.finish() |
108 | 32.2k | } else { |
109 | 30.6k | context.fireChannelInactive() |
110 | 32.2k | } |
111 | 32.2k | } |
112 | | |
113 | 939k | internal func channelRead(context: ChannelHandlerContext, data: NIOAny) { |
114 | 939k | self.isReading = true |
115 | 939k | let payload = self.unwrapInboundIn(data) |
116 | 939k | |
117 | 939k | switch payload { |
118 | 939k | case let .headers(payload): |
119 | 254k | let receiveHeaders = self.state.receive( |
120 | 254k | headers: payload.headers, |
121 | 254k | eventLoop: context.eventLoop, |
122 | 254k | errorDelegate: self.errorDelegate, |
123 | 254k | remoteAddress: context.channel.remoteAddress, |
124 | 254k | logger: self.logger, |
125 | 254k | allocator: context.channel.allocator, |
126 | 254k | responseWriter: self, |
127 | 254k | closeFuture: context.channel.closeFuture, |
128 | 254k | services: self.servicesByName, |
129 | 254k | encoding: self.encoding, |
130 | 254k | normalizeHeaders: self.normalizeHeaders |
131 | 254k | ) |
132 | 254k | |
133 | 254k | switch receiveHeaders { |
134 | 254k | case let .configure(handler): |
135 | 105k | assert(!self.configurationState.isConfigured) |
136 | 105k | self.configurationState = .configured(handler) |
137 | 105k | self.configured() |
138 | 254k | |
139 | 254k | case let .rejectRPC(trailers): |
140 | 148k | assert(!self.configurationState.isConfigured) |
141 | 148k | // We're not handling this request: write headers and end stream. |
142 | 148k | let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true)) |
143 | 148k | context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil) |
144 | 939k | } |
145 | 939k | |
146 | 939k | case let .data(payload): |
147 | 685k | switch payload.data { |
148 | 685k | case var .byteBuffer(buffer): |
149 | 685k | let action = self.state.receive(buffer: &buffer, endStream: payload.endStream) |
150 | 685k | switch action { |
151 | 685k | case .tryReading: |
152 | 129k | self.tryReadingMessage() |
153 | 685k | |
154 | 685k | case .finishHandler: |
155 | 30.4k | let handler = self.configurationState.tearDown() |
156 | 30.4k | handler?.finish() |
157 | 685k | |
158 | 685k | case .nothing: |
159 | 525k | () |
160 | 685k | } |
161 | 685k | |
162 | 685k | case .fileRegion: |
163 | 0 | preconditionFailure("Unexpected IOData.fileRegion") |
164 | 939k | } |
165 | 939k | |
166 | 939k | // Ignored. |
167 | 939k | case .alternativeService, |
168 | 403 | .goAway, |
169 | 403 | .origin, |
170 | 403 | .ping, |
171 | 403 | .priority, |
172 | 403 | .pushPromise, |
173 | 403 | .rstStream, |
174 | 403 | .settings, |
175 | 403 | .windowUpdate: |
176 | 403 | () |
177 | 939k | } |
178 | 939k | } |
179 | | |
180 | 33.2k | internal func channelReadComplete(context: ChannelHandlerContext) { |
181 | 33.2k | self.isReading = false |
182 | 33.2k | |
183 | 33.2k | if self.flushPending { |
184 | 2.64k | self.deliverPendingResponses() |
185 | 2.64k | self.flushPending = false |
186 | 2.64k | context.flush() |
187 | 33.2k | } |
188 | 33.2k | |
189 | 33.2k | context.fireChannelReadComplete() |
190 | 33.2k | } |
191 | | |
192 | 166k | private func deliverPendingResponses() { |
193 | 166k | while let (result, promise) = self.state.nextResponse() { |
194 | 106k | switch result { |
195 | 106k | case let .success(buffer): |
196 | 106k | let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))) |
197 | 106k | self.context.write(self.wrapOutboundOut(payload), promise: promise) |
198 | 106k | case let .failure(error): |
199 | 0 | promise?.fail(error) |
200 | 106k | } |
201 | 166k | } |
202 | 166k | } |
203 | | |
204 | | /// Called when the pipeline has finished configuring. |
205 | 161k | private func configured() { |
206 | 161k | switch self.state.pipelineConfigured() { |
207 | 161k | case let .forwardHeaders(headers): |
208 | 161k | switch self.configurationState { |
209 | 161k | case .notConfigured: |
210 | 0 | preconditionFailure() |
211 | 161k | case let .configured(handler): |
212 | 161k | handler.receiveMetadata(headers) |
213 | 161k | } |
214 | 161k | |
215 | 161k | case let .forwardHeadersAndRead(headers): |
216 | 0 | switch self.configurationState { |
217 | 0 | case .notConfigured: |
218 | 0 | preconditionFailure() |
219 | 0 | case let .configured(handler): |
220 | 0 | handler.receiveMetadata(headers) |
221 | 0 | } |
222 | 0 | self.tryReadingMessage() |
223 | 161k | } |
224 | 161k | } |
225 | | |
226 | | /// Try to read a request message from the buffer. |
227 | 207k | private func tryReadingMessage() { |
228 | 207k | // This while loop exists to break the recursion in `.forwardMessageThenReadNextMessage`. |
229 | 207k | // Almost all cases return directly out of the loop. |
230 | 1.93M | while true { |
231 | 1.93M | let action = self.state.readNextRequest( |
232 | 1.93M | maxLength: self.maxReceiveMessageLength |
233 | 1.93M | ) |
234 | 1.93M | switch action { |
235 | 1.93M | case .none: |
236 | 75.2k | return |
237 | 1.93M | |
238 | 1.93M | case let .forwardMessage(buffer): |
239 | 16.6k | switch self.configurationState { |
240 | 16.6k | case .notConfigured: |
241 | 0 | preconditionFailure() |
242 | 16.6k | case let .configured(handler): |
243 | 16.6k | handler.receiveMessage(buffer) |
244 | 16.6k | } |
245 | 16.6k | |
246 | 16.6k | return |
247 | 1.93M | |
248 | 1.93M | case let .forwardMessageThenReadNextMessage(buffer): |
249 | 1.73M | switch self.configurationState { |
250 | 1.73M | case .notConfigured: |
251 | 0 | preconditionFailure() |
252 | 1.73M | case let .configured(handler): |
253 | 1.73M | handler.receiveMessage(buffer) |
254 | 1.73M | } |
255 | 1.73M | |
256 | 1.73M | continue |
257 | 1.93M | |
258 | 1.93M | case .forwardEnd: |
259 | 108k | switch self.configurationState { |
260 | 108k | case .notConfigured: |
261 | 0 | preconditionFailure() |
262 | 108k | case let .configured(handler): |
263 | 108k | handler.receiveEnd() |
264 | 108k | } |
265 | 108k | |
266 | 108k | return |
267 | 1.93M | |
268 | 1.93M | case let .errorCaught(error): |
269 | 7.08k | switch self.configurationState { |
270 | 7.08k | case .notConfigured: |
271 | 0 | preconditionFailure() |
272 | 7.08k | case let .configured(handler): |
273 | 7.08k | handler.receiveError(error) |
274 | 7.08k | } |
275 | 7.08k | |
276 | 7.08k | return |
277 | 1.93M | } |
278 | 1.93M | } |
279 | 0 | } |
280 | | |
281 | | internal func sendMetadata( |
282 | | _ headers: HPACKHeaders, |
283 | | flush: Bool, |
284 | | promise: EventLoopPromise<Void>? |
285 | 105k | ) { |
286 | 105k | switch self.state.send(headers: headers) { |
287 | 105k | case let .success(headers): |
288 | 105k | let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers)) |
289 | 105k | self.context.write(self.wrapOutboundOut(payload), promise: promise) |
290 | 105k | if flush { |
291 | 93.1k | self.markFlushPoint() |
292 | 105k | } |
293 | 105k | |
294 | 105k | case let .failure(error): |
295 | 0 | promise?.fail(error) |
296 | 105k | } |
297 | 105k | } |
298 | | |
299 | | internal func sendMessage( |
300 | | _ buffer: ByteBuffer, |
301 | | metadata: MessageMetadata, |
302 | | promise: EventLoopPromise<Void>? |
303 | 859k | ) { |
304 | 859k | let result = self.state.send( |
305 | 859k | buffer: buffer, |
306 | 859k | compress: metadata.compress, |
307 | 859k | promise: promise |
308 | 859k | ) |
309 | 859k | |
310 | 859k | switch result { |
311 | 859k | case .success: |
312 | 859k | if metadata.flush { |
313 | 756k | self.markFlushPoint() |
314 | 859k | } |
315 | 859k | |
316 | 859k | case let .failure(error): |
317 | 0 | promise?.fail(error) |
318 | 859k | } |
319 | 859k | } |
320 | | |
321 | | internal func sendEnd( |
322 | | status: GRPCStatus, |
323 | | trailers: HPACKHeaders, |
324 | | promise: EventLoopPromise<Void>? |
325 | 105k | ) { |
326 | 105k | // About to end the stream: send any pending responses. |
327 | 105k | self.deliverPendingResponses() |
328 | 105k | |
329 | 105k | switch self.state.send(status: status, trailers: trailers) { |
330 | 105k | case let .sendTrailers(trailers): |
331 | 32.0k | self.sendTrailers(trailers, promise: promise) |
332 | 105k | |
333 | 105k | case let .sendTrailersAndFinish(trailers): |
334 | 73.3k | self.sendTrailers(trailers, promise: promise) |
335 | 73.3k | |
336 | 73.3k | // 'finish' the handler. |
337 | 73.3k | let handler = self.configurationState.tearDown() |
338 | 73.3k | handler?.finish() |
339 | 105k | |
340 | 105k | case let .failure(error): |
341 | 0 | promise?.fail(error) |
342 | 105k | } |
343 | 105k | } |
344 | | |
345 | 161k | private func sendTrailers(_ trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) { |
346 | 161k | // Always end stream for status and trailers. |
347 | 161k | let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true)) |
348 | 161k | self.context.write(self.wrapOutboundOut(payload), promise: promise) |
349 | 161k | // We'll always flush on end. |
350 | 161k | self.markFlushPoint() |
351 | 161k | } |
352 | | |
353 | | /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading, |
354 | | /// or emit a flush now if we are not. |
355 | 1.39M | private func markFlushPoint() { |
356 | 1.39M | if self.isReading { |
357 | 1.39M | self.flushPending = true |
358 | 1.39M | } else { |
359 | 770 | // About to flush: send any pending responses. |
360 | 770 | self.deliverPendingResponses() |
361 | 770 | self.flushPending = false |
362 | 770 | self.context.flush() |
363 | 1.39M | } |
364 | 1.39M | } |
365 | | } |