/src/grpc-swift/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | import NIOHPACK |
19 | | import NIOHTTP2 |
20 | | |
21 | | internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServerResponseWriter { |
22 | | typealias InboundIn = HTTP2Frame.FramePayload |
23 | | typealias OutboundOut = HTTP2Frame.FramePayload |
24 | | |
25 | | private var logger: Logger |
26 | | private var state: HTTP2ToRawGRPCStateMachine |
27 | | private let errorDelegate: ServerErrorDelegate? |
28 | | private var context: ChannelHandlerContext! |
29 | | |
30 | | private let servicesByName: [Substring: CallHandlerProvider] |
31 | | private let encoding: ServerMessageEncoding |
32 | | private let normalizeHeaders: Bool |
33 | | private let maxReceiveMessageLength: Int |
34 | | |
35 | | /// The configuration state of the handler. |
36 | 46.7k | private var configurationState: Configuration = .notConfigured |
37 | | |
38 | | /// Whether we are currently reading data from the `Channel`. Should be set to `false` once a |
39 | | /// burst of reading has completed. |
40 | 46.7k | private var isReading = false |
41 | | |
42 | | /// Indicates whether a flush event is pending. If a flush is received while `isReading` is `true` |
43 | | /// then it is held until the read completes in order to elide unnecessary flushes. |
44 | 46.7k | private var flushPending = false |
45 | | |
46 | | private enum Configuration { |
47 | | case notConfigured |
48 | | case configured(GRPCServerHandlerProtocol) |
49 | | |
50 | 144k | var isConfigured: Bool { |
51 | 144k | switch self { |
52 | 144k | case .configured: |
53 | 0 | return true |
54 | 144k | case .notConfigured: |
55 | 144k | return false |
56 | 144k | } |
57 | 144k | } |
58 | | |
59 | 218k | mutating func tearDown() -> GRPCServerHandlerProtocol? { |
60 | 218k | switch self { |
61 | 218k | case .notConfigured: |
62 | 44.0k | return nil |
63 | 218k | case let .configured(handler): |
64 | 174k | self = .notConfigured |
65 | 174k | return handler |
66 | 218k | } |
67 | 218k | } |
68 | | } |
69 | | |
70 | | init( |
71 | | servicesByName: [Substring: CallHandlerProvider], |
72 | | encoding: ServerMessageEncoding, |
73 | | errorDelegate: ServerErrorDelegate?, |
74 | | normalizeHeaders: Bool, |
75 | | maximumReceiveMessageLength: Int, |
76 | | logger: Logger |
77 | 28.3k | ) { |
78 | 28.3k | self.logger = logger |
79 | 28.3k | self.errorDelegate = errorDelegate |
80 | 28.3k | self.servicesByName = servicesByName |
81 | 28.3k | self.encoding = encoding |
82 | 28.3k | self.normalizeHeaders = normalizeHeaders |
83 | 28.3k | self.maxReceiveMessageLength = maximumReceiveMessageLength |
84 | 28.3k | self.state = HTTP2ToRawGRPCStateMachine() |
85 | 28.3k | } |
86 | | |
87 | 28.3k | internal func handlerAdded(context: ChannelHandlerContext) { |
88 | 28.3k | self.context = context |
89 | 28.3k | } |
90 | | |
91 | 28.3k | internal func handlerRemoved(context: ChannelHandlerContext) { |
92 | 28.3k | self.context = nil |
93 | 28.3k | self.configurationState = .notConfigured |
94 | 28.3k | } |
95 | | |
96 | 12.3k | internal func errorCaught(context: ChannelHandlerContext, error: Error) { |
97 | 12.3k | switch self.configurationState { |
98 | 12.3k | case .notConfigured: |
99 | 10.7k | context.close(mode: .all, promise: nil) |
100 | 12.3k | case let .configured(hander): |
101 | 1.57k | hander.receiveError(error) |
102 | 12.3k | } |
103 | 12.3k | } |
104 | | |
105 | 28.3k | internal func channelInactive(context: ChannelHandlerContext) { |
106 | 28.3k | if let handler = self.configurationState.tearDown() { |
107 | 1.57k | handler.finish() |
108 | 26.7k | } else { |
109 | 26.7k | context.fireChannelInactive() |
110 | 26.7k | } |
111 | 28.3k | } |
112 | | |
113 | 1.49M | internal func channelRead(context: ChannelHandlerContext, data: NIOAny) { |
114 | 1.49M | self.isReading = true |
115 | 1.49M | let payload = self.unwrapInboundIn(data) |
116 | 1.49M | |
117 | 1.49M | switch payload { |
118 | 1.49M | case let .headers(payload): |
119 | 300k | let receiveHeaders = self.state.receive( |
120 | 300k | headers: payload.headers, |
121 | 300k | eventLoop: context.eventLoop, |
122 | 300k | errorDelegate: self.errorDelegate, |
123 | 300k | remoteAddress: context.channel.remoteAddress, |
124 | 300k | logger: self.logger, |
125 | 300k | allocator: context.channel.allocator, |
126 | 300k | responseWriter: self, |
127 | 300k | closeFuture: context.channel.closeFuture, |
128 | 300k | services: self.servicesByName, |
129 | 300k | encoding: self.encoding, |
130 | 300k | normalizeHeaders: self.normalizeHeaders |
131 | 300k | ) |
132 | 300k | |
133 | 300k | switch receiveHeaders { |
134 | 300k | case let .configure(handler): |
135 | 121k | assert(!self.configurationState.isConfigured) |
136 | 121k | self.configurationState = .configured(handler) |
137 | 121k | self.configured() |
138 | 300k | |
139 | 300k | case let .rejectRPC(trailers): |
140 | 179k | assert(!self.configurationState.isConfigured) |
141 | 179k | // We're not handling this request: write headers and end stream. |
142 | 179k | let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true)) |
143 | 179k | context.writeAndFlush(self.wrapOutboundOut(payload), promise: nil) |
144 | 300k | } |
145 | 1.49M | |
146 | 1.49M | case let .data(payload): |
147 | 1.19M | switch payload.data { |
148 | 1.19M | case var .byteBuffer(buffer): |
149 | 1.19M | let action = self.state.receive(buffer: &buffer, endStream: payload.endStream) |
150 | 1.19M | switch action { |
151 | 1.19M | case .tryReading: |
152 | 151k | self.tryReadingMessage() |
153 | 1.19M | |
154 | 1.19M | case .finishHandler: |
155 | 61.0k | let handler = self.configurationState.tearDown() |
156 | 61.0k | handler?.finish() |
157 | 1.19M | |
158 | 1.19M | case .nothing: |
159 | 981k | () |
160 | 1.19M | } |
161 | 1.19M | |
162 | 1.19M | case .fileRegion: |
163 | 0 | preconditionFailure("Unexpected IOData.fileRegion") |
164 | 1.19M | } |
165 | 1.49M | |
166 | 1.49M | // Ignored. |
167 | 1.49M | case .alternativeService, |
168 | 393 | .goAway, |
169 | 393 | .origin, |
170 | 393 | .ping, |
171 | 393 | .priority, |
172 | 393 | .pushPromise, |
173 | 393 | .rstStream, |
174 | 393 | .settings, |
175 | 393 | .windowUpdate: |
176 | 393 | () |
177 | 1.49M | } |
178 | 1.49M | } |
179 | | |
180 | 29.3k | internal func channelReadComplete(context: ChannelHandlerContext) { |
181 | 29.3k | self.isReading = false |
182 | 29.3k | |
183 | 29.3k | if self.flushPending { |
184 | 2.65k | self.deliverPendingResponses() |
185 | 2.65k | self.flushPending = false |
186 | 2.65k | context.flush() |
187 | 2.65k | } |
188 | 29.3k | |
189 | 29.3k | context.fireChannelReadComplete() |
190 | 29.3k | } |
191 | | |
192 | 179k | private func deliverPendingResponses() { |
193 | 179k | while let (result, promise) = self.state.nextResponse() { |
194 | 97.1k | switch result { |
195 | 97.1k | case let .success(buffer): |
196 | 97.1k | let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))) |
197 | 97.1k | self.context.write(self.wrapOutboundOut(payload), promise: promise) |
198 | 97.1k | case let .failure(error): |
199 | 0 | promise?.fail(error) |
200 | 97.1k | } |
201 | 179k | } |
202 | 179k | } |
203 | | |
204 | | /// Called when the pipeline has finished configuring. |
205 | 174k | private func configured() { |
206 | 174k | switch self.state.pipelineConfigured() { |
207 | 174k | case let .forwardHeaders(headers): |
208 | 174k | switch self.configurationState { |
209 | 174k | case .notConfigured: |
210 | 0 | preconditionFailure() |
211 | 174k | case let .configured(handler): |
212 | 174k | handler.receiveMetadata(headers) |
213 | 174k | } |
214 | 174k | |
215 | 174k | case let .forwardHeadersAndRead(headers): |
216 | 0 | switch self.configurationState { |
217 | 0 | case .notConfigured: |
218 | 0 | preconditionFailure() |
219 | 0 | case let .configured(handler): |
220 | 0 | handler.receiveMetadata(headers) |
221 | 0 | } |
222 | 0 | self.tryReadingMessage() |
223 | 174k | } |
224 | 174k | } |
225 | | |
226 | | /// Try to read a request message from the buffer. |
227 | 228k | private func tryReadingMessage() { |
228 | 228k | // This while loop exists to break the recursion in `.forwardMessageThenReadNextMessage`. |
229 | 228k | // Almost all cases return directly out of the loop. |
230 | 1.94M | while true { |
231 | 1.94M | let action = self.state.readNextRequest( |
232 | 1.94M | maxLength: self.maxReceiveMessageLength |
233 | 1.94M | ) |
234 | 1.94M | switch action { |
235 | 1.94M | case .none: |
236 | 101k | return |
237 | 1.94M | |
238 | 1.94M | case let .forwardMessage(buffer): |
239 | 43.2k | switch self.configurationState { |
240 | 43.2k | case .notConfigured: |
241 | 0 | preconditionFailure() |
242 | 43.2k | case let .configured(handler): |
243 | 43.2k | handler.receiveMessage(buffer) |
244 | 43.2k | } |
245 | 43.2k | |
246 | 43.2k | return |
247 | 1.94M | |
248 | 1.94M | case let .forwardMessageThenReadNextMessage(buffer): |
249 | 1.71M | switch self.configurationState { |
250 | 1.71M | case .notConfigured: |
251 | 0 | preconditionFailure() |
252 | 1.71M | case let .configured(handler): |
253 | 1.71M | handler.receiveMessage(buffer) |
254 | 1.71M | } |
255 | 1.71M | |
256 | 1.71M | continue |
257 | 1.94M | |
258 | 1.94M | case .forwardEnd: |
259 | 76.3k | switch self.configurationState { |
260 | 76.3k | case .notConfigured: |
261 | 0 | preconditionFailure() |
262 | 76.3k | case let .configured(handler): |
263 | 76.3k | handler.receiveEnd() |
264 | 76.3k | } |
265 | 76.3k | |
266 | 76.3k | return |
267 | 1.94M | |
268 | 1.94M | case let .errorCaught(error): |
269 | 7.88k | switch self.configurationState { |
270 | 7.88k | case .notConfigured: |
271 | 0 | preconditionFailure() |
272 | 7.88k | case let .configured(handler): |
273 | 7.88k | handler.receiveError(error) |
274 | 7.88k | } |
275 | 7.88k | |
276 | 7.88k | return |
277 | 1.94M | } |
278 | 1.94M | } |
279 | 0 | } |
280 | | |
281 | | internal func sendMetadata( |
282 | | _ headers: HPACKHeaders, |
283 | | flush: Bool, |
284 | | promise: EventLoopPromise<Void>? |
285 | 121k | ) { |
286 | 121k | switch self.state.send(headers: headers) { |
287 | 121k | case let .success(headers): |
288 | 121k | let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers)) |
289 | 121k | self.context.write(self.wrapOutboundOut(payload), promise: promise) |
290 | 121k | if flush { |
291 | 91.4k | self.markFlushPoint() |
292 | 91.4k | } |
293 | 121k | |
294 | 121k | case let .failure(error): |
295 | 0 | promise?.fail(error) |
296 | 121k | } |
297 | 121k | } |
298 | | |
299 | | internal func sendMessage( |
300 | | _ buffer: ByteBuffer, |
301 | | metadata: MessageMetadata, |
302 | | promise: EventLoopPromise<Void>? |
303 | 1.33M | ) { |
304 | 1.33M | let result = self.state.send( |
305 | 1.33M | buffer: buffer, |
306 | 1.33M | compress: metadata.compress, |
307 | 1.33M | promise: promise |
308 | 1.33M | ) |
309 | 1.33M | |
310 | 1.33M | switch result { |
311 | 1.33M | case .success: |
312 | 1.33M | if metadata.flush { |
313 | 1.21M | self.markFlushPoint() |
314 | 1.21M | } |
315 | 1.33M | |
316 | 1.33M | case let .failure(error): |
317 | 0 | promise?.fail(error) |
318 | 1.33M | } |
319 | 1.33M | } |
320 | | |
321 | | internal func sendEnd( |
322 | | status: GRPCStatus, |
323 | | trailers: HPACKHeaders, |
324 | | promise: EventLoopPromise<Void>? |
325 | 121k | ) { |
326 | 121k | // About to end the stream: send any pending responses. |
327 | 121k | self.deliverPendingResponses() |
328 | 121k | |
329 | 121k | switch self.state.send(status: status, trailers: trailers) { |
330 | 121k | case let .sendTrailers(trailers): |
331 | 62.6k | self.sendTrailers(trailers, promise: promise) |
332 | 121k | |
333 | 121k | case let .sendTrailersAndFinish(trailers): |
334 | 58.5k | self.sendTrailers(trailers, promise: promise) |
335 | 58.5k | |
336 | 58.5k | // 'finish' the handler. |
337 | 58.5k | let handler = self.configurationState.tearDown() |
338 | 58.5k | handler?.finish() |
339 | 121k | |
340 | 121k | case let .failure(error): |
341 | 0 | promise?.fail(error) |
342 | 121k | } |
343 | 121k | } |
344 | | |
345 | 174k | private func sendTrailers(_ trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) { |
346 | 174k | // Always end stream for status and trailers. |
347 | 174k | let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true)) |
348 | 174k | self.context.write(self.wrapOutboundOut(payload), promise: promise) |
349 | 174k | // We'll always flush on end. |
350 | 174k | self.markFlushPoint() |
351 | 174k | } |
352 | | |
353 | | /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading, |
354 | | /// or emit a flush now if we are not. |
355 | 1.87M | private func markFlushPoint() { |
356 | 1.87M | if self.isReading { |
357 | 1.87M | self.flushPending = true |
358 | 1.87M | } else { |
359 | 742 | // About to flush: send any pending responses. |
360 | 742 | self.deliverPendingResponses() |
361 | 742 | self.flushPending = false |
362 | 742 | self.context.flush() |
363 | 742 | } |
364 | 1.87M | } |
365 | | } |