/rust/git/checkouts/micro-http-22be4cdcbef12607/ef43cef/src/server.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
2 | | // SPDX-License-Identifier: Apache-2.0 |
3 | | |
4 | | use std::collections::HashMap; |
5 | | use std::io::{Read, Write}; |
6 | | use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; |
7 | | use std::os::unix::net::{UnixListener, UnixStream}; |
8 | | use std::path::Path; |
9 | | |
10 | | use crate::common::{Body, Version}; |
11 | | pub use crate::common::{ConnectionError, ServerError}; |
12 | | use crate::connection::HttpConnection; |
13 | | use crate::request::Request; |
14 | | use crate::response::{Response, StatusCode}; |
15 | | use vmm_sys_util::{epoll, eventfd::EventFd, sock_ctrl_msg::ScmSocket}; |
16 | | |
17 | | static SERVER_FULL_ERROR_MESSAGE: &[u8] = b"HTTP/1.1 503\r\n\ |
18 | | Server: Firecracker API\r\n\ |
19 | | Connection: close\r\n\ |
20 | | Content-Length: 40\r\n\r\n{ \"error\": \"Too many open connections\" }"; |
21 | | const MAX_CONNECTIONS: usize = 10; |
22 | | /// Payload max size |
23 | | pub(crate) const MAX_PAYLOAD_SIZE: usize = 51200; |
24 | | |
25 | | type Result<T> = std::result::Result<T, ServerError>; |
26 | | |
27 | | /// Wrapper over `Request` which adds an identification token. |
28 | 0 | #[derive(Debug)] |
29 | | pub struct ServerRequest { |
30 | | /// Inner request. |
31 | | pub request: Request, |
32 | | /// Identification token. |
33 | | id: u64, |
34 | | } |
35 | | |
36 | | impl ServerRequest { |
37 | | /// Creates a new `ServerRequest` object from an existing `Request`, |
38 | | /// adding an identification token. |
39 | 0 | pub fn new(request: Request, id: u64) -> Self { |
40 | 0 | Self { request, id } |
41 | 0 | } |
42 | | |
43 | | /// Returns a reference to the inner request. |
44 | 0 | pub fn inner(&self) -> &Request { |
45 | 0 | &self.request |
46 | 0 | } |
47 | | |
48 | | /// Calls the function provided on the inner request to obtain the response. |
49 | | /// The response is then wrapped in a `ServerResponse`. |
50 | | /// |
51 | | /// Returns a `ServerResponse` ready for yielding to the server |
52 | 0 | pub fn process<F>(&self, mut callable: F) -> ServerResponse |
53 | 0 | where |
54 | 0 | F: FnMut(&Request) -> Response, |
55 | 0 | { |
56 | 0 | let http_response = callable(self.inner()); |
57 | 0 | ServerResponse::new(http_response, self.id) |
58 | 0 | } Unexecuted instantiation: <micro_http::server::ServerRequest>::process::<vmm::api::http::start_http_thread::{closure#0}::{closure#1}::{closure#0}> Unexecuted instantiation: <micro_http::server::ServerRequest>::process::<_> |
59 | | } |
60 | | |
61 | | /// Wrapper over `Response` which adds an identification token. |
62 | | pub struct ServerResponse { |
63 | | /// Inner response. |
64 | | response: Response, |
65 | | /// Identification token. |
66 | | id: u64, |
67 | | } |
68 | | |
69 | | impl ServerResponse { |
70 | 0 | fn new(response: Response, id: u64) -> Self { |
71 | 0 | Self { response, id } |
72 | 0 | } |
73 | | } |
74 | | |
75 | | /// Describes the state of the connection as far as data exchange |
76 | | /// on the stream is concerned. |
77 | 0 | #[derive(PartialOrd, PartialEq)] |
78 | | enum ClientConnectionState { |
79 | | AwaitingIncoming, |
80 | | AwaitingOutgoing, |
81 | | Closed, |
82 | | } |
83 | | |
84 | | /// Wrapper over `HttpConnection` which keeps track of yielded |
85 | | /// requests and absorbed responses. |
86 | | struct ClientConnection<T> { |
87 | | /// The `HttpConnection` object which handles data exchange. |
88 | | connection: HttpConnection<T>, |
89 | | /// The state of the connection in the `epoll` structure. |
90 | | state: ClientConnectionState, |
91 | | /// Represents the difference between yielded requests and |
92 | | /// absorbed responses. |
93 | | /// This has to be `0` if we want to drop the connection. |
94 | | in_flight_response_count: u32, |
95 | | } |
96 | | |
97 | | impl<T: Read + Write + ScmSocket> ClientConnection<T> { |
98 | 0 | fn new(connection: HttpConnection<T>) -> Self { |
99 | 0 | Self { |
100 | 0 | connection, |
101 | 0 | state: ClientConnectionState::AwaitingIncoming, |
102 | 0 | in_flight_response_count: 0, |
103 | 0 | } |
104 | 0 | } |
105 | | |
106 | 0 | fn read(&mut self) -> Result<Vec<Request>> { |
107 | 0 | // Data came into the connection. |
108 | 0 | let mut parsed_requests = vec![]; |
109 | 0 | match self.connection.try_read() { |
110 | | Err(ConnectionError::ConnectionClosed) => { |
111 | | // Connection timeout. |
112 | 0 | self.state = ClientConnectionState::Closed; |
113 | 0 | // We don't want to propagate this to the server and we will |
114 | 0 | // return no requests and wait for the connection to become |
115 | 0 | // safe to drop. |
116 | 0 | return Ok(vec![]); |
117 | | } |
118 | 0 | Err(ConnectionError::StreamReadError(inner)) => { |
119 | 0 | // Reading from the connection failed. |
120 | 0 | // We should try to write an error message regardless. |
121 | 0 | let mut internal_error_response = |
122 | 0 | Response::new(Version::Http11, StatusCode::InternalServerError); |
123 | 0 | internal_error_response.set_body(Body::new(inner.to_string())); |
124 | 0 | self.connection.enqueue_response(internal_error_response); |
125 | 0 | } |
126 | 0 | Err(ConnectionError::ParseError(inner)) => { |
127 | | // An error occurred while parsing the read bytes. |
128 | | // Check if there are any valid parsed requests in the queue. |
129 | 0 | while let Some(_discarded_request) = self.connection.pop_parsed_request() {} |
130 | | |
131 | | // Send an error response for the request that gave us the error. |
132 | 0 | let mut error_response = Response::new(Version::Http11, StatusCode::BadRequest); |
133 | 0 | error_response.set_body(Body::new(format!( |
134 | 0 | "{{ \"error\": \"{}\nAll previous unanswered requests will be dropped.\" }}", |
135 | 0 | inner |
136 | 0 | ))); |
137 | 0 | self.connection.enqueue_response(error_response); |
138 | | } |
139 | | Err(ConnectionError::InvalidWrite) | Err(ConnectionError::StreamWriteError(_)) => { |
140 | | // This is unreachable because `HttpConnection::try_read()` cannot return this error variant. |
141 | 0 | unreachable!(); |
142 | | } |
143 | | Ok(()) => { |
144 | 0 | while let Some(request) = self.connection.pop_parsed_request() { |
145 | 0 | // Add all valid requests to `parsed_requests`. |
146 | 0 | parsed_requests.push(request); |
147 | 0 | } |
148 | | } |
149 | | } |
150 | 0 | self.in_flight_response_count = self |
151 | 0 | .in_flight_response_count |
152 | 0 | .checked_add(parsed_requests.len() as u32) |
153 | 0 | .ok_or(ServerError::Overflow)?; |
154 | | // If the state of the connection has changed, we need to update |
155 | | // the event set in the `epoll` structure. |
156 | 0 | if self.connection.pending_write() { |
157 | 0 | self.state = ClientConnectionState::AwaitingOutgoing; |
158 | 0 | } |
159 | | |
160 | 0 | Ok(parsed_requests) |
161 | 0 | } |
162 | | |
163 | 0 | fn write(&mut self) -> Result<()> { |
164 | 0 | // The stream is available for writing. |
165 | 0 | match self.connection.try_write() { |
166 | 0 | Err(ConnectionError::ConnectionClosed) | Err(ConnectionError::StreamWriteError(_)) => { |
167 | 0 | // Writing to the stream failed so it will be removed. |
168 | 0 | self.state = ClientConnectionState::Closed; |
169 | 0 | } |
170 | | Err(ConnectionError::InvalidWrite) => { |
171 | | // A `try_write` call was performed on a connection that has nothing |
172 | | // to write. |
173 | 0 | return Err(ServerError::ConnectionError(ConnectionError::InvalidWrite)); |
174 | | } |
175 | | _ => { |
176 | | // Check if we still have bytes to write for this connection. |
177 | 0 | if !self.connection.pending_write() { |
178 | 0 | self.state = ClientConnectionState::AwaitingIncoming; |
179 | 0 | } |
180 | | } |
181 | | } |
182 | 0 | Ok(()) |
183 | 0 | } |
184 | | |
185 | 0 | fn enqueue_response(&mut self, response: Response) -> Result<()> { |
186 | 0 | if self.state != ClientConnectionState::Closed { |
187 | 0 | self.connection.enqueue_response(response); |
188 | 0 | } |
189 | 0 | self.in_flight_response_count = self |
190 | 0 | .in_flight_response_count |
191 | 0 | .checked_sub(1) |
192 | 0 | .ok_or(ServerError::Underflow)?; |
193 | 0 | Ok(()) |
194 | 0 | } |
195 | | |
196 | | /// Discards all pending writes from the inner connection. |
197 | 0 | fn clear_write_buffer(&mut self) { |
198 | 0 | self.connection.clear_write_buffer(); |
199 | 0 | } |
200 | | |
201 | | // Returns `true` if the connection is closed and safe to drop. |
202 | 0 | fn is_done(&self) -> bool { |
203 | 0 | self.state == ClientConnectionState::Closed |
204 | 0 | && !self.connection.pending_write() |
205 | 0 | && self.in_flight_response_count == 0 |
206 | 0 | } |
207 | | } |
208 | | |
209 | | /// HTTP Server implementation using Unix Domain Sockets and `EPOLL` to |
210 | | /// handle multiple connections on the same thread. |
211 | | /// |
212 | | /// The function that handles incoming connections, parses incoming |
213 | | /// requests and sends responses for awaiting requests is `requests`. |
214 | | /// It can be called in a loop, which will render the thread that the |
215 | | /// server runs on incapable of performing other operations, or it can |
216 | | /// be used in another `EPOLL` structure, as it provides its `epoll`, |
217 | | /// which is a wrapper over the file descriptor of the epoll structure |
218 | | /// used within the server, and it can be added to another one using |
219 | | /// the `EPOLLIN` flag. Whenever there is a notification on that fd, |
220 | | /// `requests` should be called once. |
221 | | /// |
222 | | /// # Example |
223 | | /// |
224 | | /// ## Starting and running the server |
225 | | /// |
226 | | /// ``` |
227 | | /// use micro_http::{HttpServer, Response, StatusCode}; |
228 | | /// |
229 | | /// let path_to_socket = "/tmp/example.sock"; |
230 | | /// std::fs::remove_file(path_to_socket).unwrap_or_default(); |
231 | | /// |
232 | | /// // Start the server. |
233 | | /// let mut server = HttpServer::new(path_to_socket).unwrap(); |
234 | | /// server.start_server().unwrap(); |
235 | | /// |
236 | | /// // Connect a client to the server so it doesn't block in our example. |
237 | | /// let mut socket = std::os::unix::net::UnixStream::connect(path_to_socket).unwrap(); |
238 | | /// |
239 | | /// // Server loop processing requests. |
240 | | /// loop { |
241 | | /// for request in server.requests().unwrap() { |
242 | | /// let response = request.process(|request| { |
243 | | /// // Your code here. |
244 | | /// Response::new(request.http_version(), StatusCode::NoContent) |
245 | | /// }); |
246 | | /// server.respond(response); |
247 | | /// } |
248 | | /// // Break this example loop. |
249 | | /// break; |
250 | | /// } |
251 | | /// ``` |
252 | | pub struct HttpServer { |
253 | | /// Socket on which we listen for new connections. |
254 | | socket: UnixListener, |
255 | | /// Server's epoll instance. |
256 | | epoll: epoll::Epoll, |
257 | | /// Event requesting micro-http shutdown. |
258 | | /// Used to break out of inner `epoll_wait` and reports shutdown event. |
259 | | kill_switch: Option<EventFd>, |
260 | | /// Holds the token-connection pairs of the server. |
261 | | /// Each connection has an associated identification token, which is |
262 | | /// the file descriptor of the underlying stream. |
263 | | /// We use the file descriptor of the stream as the key for mapping |
264 | | /// connections because the 1-to-1 relation is guaranteed by the OS. |
265 | | connections: HashMap<RawFd, ClientConnection<UnixStream>>, |
266 | | /// Payload max size |
267 | | payload_max_size: usize, |
268 | | } |
269 | | |
270 | | impl HttpServer { |
271 | | /// Constructor for `HttpServer`. |
272 | | /// |
273 | | /// Returns the newly formed `HttpServer`. |
274 | | /// |
275 | | /// # Errors |
276 | | /// Returns an `IOError` when binding or `epoll::create` fails. |
277 | 0 | pub fn new<P: AsRef<Path>>(path_to_socket: P) -> Result<Self> { |
278 | 0 | let socket = UnixListener::bind(path_to_socket).map_err(ServerError::IOError)?; |
279 | 0 | let epoll = epoll::Epoll::new().map_err(ServerError::IOError)?; |
280 | 0 | Ok(Self { |
281 | 0 | socket, |
282 | 0 | epoll, |
283 | 0 | kill_switch: None, |
284 | 0 | connections: HashMap::new(), |
285 | 0 | payload_max_size: MAX_PAYLOAD_SIZE, |
286 | 0 | }) |
287 | 0 | } |
288 | | |
289 | | /// Constructor for `HttpServer`. |
290 | | /// |
291 | | /// Returns the newly formed `HttpServer`. |
292 | | /// |
293 | | /// # Safety |
294 | | /// This function requires the socket_fd to be solely owned |
295 | | /// and not be associated with another File in the caller as it uses |
296 | | /// the unsafe `UnixListener::from_raw_fd method`. |
297 | | /// |
298 | | /// # Errors |
299 | | /// Returns an `IOError` when `epoll::create` fails. |
300 | 0 | pub unsafe fn new_from_fd(socket_fd: RawFd) -> Result<Self> { |
301 | 0 | let socket = UnixListener::from_raw_fd(socket_fd); |
302 | 0 | let epoll = epoll::Epoll::new().map_err(ServerError::IOError)?; |
303 | 0 | Ok(HttpServer { |
304 | 0 | socket, |
305 | 0 | epoll, |
306 | 0 | kill_switch: None, |
307 | 0 | connections: HashMap::new(), |
308 | 0 | payload_max_size: MAX_PAYLOAD_SIZE, |
309 | 0 | }) |
310 | 0 | } |
311 | | |
312 | | /// Adds a `kill_switch` event fd used to break out of inner `epoll_wait` |
313 | | /// and report a shutdown event. |
314 | 0 | pub fn add_kill_switch(&mut self, kill_switch: EventFd) -> Result<()> { |
315 | 0 | // Add the kill switch to the `epoll` structure. |
316 | 0 | let ret = Self::epoll_add(&self.epoll, kill_switch.as_raw_fd()); |
317 | 0 | self.kill_switch = Some(kill_switch); |
318 | 0 | ret |
319 | 0 | } |
320 | | |
321 | | /// This function sets the limit for PUT/PATCH requests. It overwrites the |
322 | | /// default limit of 0.05MiB with the one allowed by server. |
323 | 0 | pub fn set_payload_max_size(&mut self, request_payload_max_size: usize) { |
324 | 0 | self.payload_max_size = request_payload_max_size; |
325 | 0 | } |
326 | | |
327 | | /// Starts the HTTP Server. |
328 | 0 | pub fn start_server(&mut self) -> Result<()> { |
329 | 0 | // Add the socket on which we listen for new connections to the |
330 | 0 | // `epoll` structure. |
331 | 0 | Self::epoll_add(&self.epoll, self.socket.as_raw_fd()) |
332 | 0 | } |
333 | | |
334 | | /// This function is responsible for the data exchange with the clients and should |
335 | | /// be called when we are either notified through `epoll` that we need to exchange |
336 | | /// data with at least a client or when we don't need to perform any other operations |
337 | | /// on this thread and we can afford to call it in a loop. |
338 | | /// |
339 | | /// Note that this function will block the current thread if there are no notifications |
340 | | /// to be handled by the server. |
341 | | /// |
342 | | /// Returns a collection of complete and valid requests to be processed by the user |
343 | | /// of the server. Once processed, responses should be sent using `enqueue_responses()`. |
344 | | /// |
345 | | /// # Errors |
346 | | /// `IOError` is returned when `read`, `write` or `epoll::ctl` operations fail. |
347 | | /// `ServerFull` is returned when a client is trying to connect to the server, but |
348 | | /// full capacity has already been reached. |
349 | | /// `InvalidWrite` is returned when the server attempted to perform a write operation |
350 | | /// on a connection on which it is not possible. |
351 | 0 | pub fn requests(&mut self) -> Result<Vec<ServerRequest>> { |
352 | 0 | let mut parsed_requests: Vec<ServerRequest> = vec![]; |
353 | 0 | // Possible events coming from FDs: 1 + 1 + MAX_CONNECTIONS: |
354 | 0 | // exit-eventfd, sock-listen-fd, active-connections-fds. |
355 | 0 | let mut events = [epoll::EpollEvent::default(); MAX_CONNECTIONS + 2]; |
356 | | // This is a wrapper over the syscall `epoll_wait` and it will block the |
357 | | // current thread until at least one event is received. |
358 | | // The received notifications will then populate the `events` array with |
359 | | // `event_count` elements, where 1 <= event_count <= MAX_CONNECTIONS + 2. |
360 | 0 | let event_count = match self.epoll.wait(-1, &mut events[..]) { |
361 | 0 | Ok(event_count) => event_count, |
362 | 0 | Err(e) if e.raw_os_error() == Some(libc::EINTR) => 0, |
363 | 0 | Err(e) => return Err(ServerError::IOError(e)), |
364 | | }; |
365 | | |
366 | | // Getting the file descriptor for kill switch. |
367 | | // If there is no kill switch fd, we use value -1 as an invalid fd. |
368 | 0 | let kill_fd = self.kill_switch.as_ref().map_or(-1, |ks| ks.as_raw_fd()); |
369 | | |
370 | | // We only iterate over first `event_count` events and discard empty elements |
371 | | // at the end of the array. |
372 | 0 | for e in events[..event_count].iter() { |
373 | | // Check the file descriptor which produced the notification `e`. |
374 | | // It could be that we need to shutdown, or have a new connection, or |
375 | | // one of our open connections is ready to exchange data with a client. |
376 | 0 | if e.fd() == kill_fd { |
377 | | // Report that the kill switch was triggered. |
378 | 0 | return Err(ServerError::ShutdownEvent); |
379 | 0 | } else if e.fd() == self.socket.as_raw_fd() { |
380 | | // We have received a notification on the listener socket, which |
381 | | // means we have a new connection to accept. |
382 | 0 | match self.handle_new_connection() { |
383 | | // If the server is full, we send a message to the client |
384 | | // notifying them that we will close the connection, then |
385 | | // we discard it. |
386 | | Err(ServerError::ServerFull) => { |
387 | 0 | self.socket |
388 | 0 | .accept() |
389 | 0 | .map_err(ServerError::IOError) |
390 | 0 | .and_then(move |(mut stream, _)| { |
391 | 0 | stream |
392 | 0 | .write(SERVER_FULL_ERROR_MESSAGE) |
393 | 0 | .map_err(ServerError::IOError) |
394 | 0 | })?; |
395 | | } |
396 | | // An internal error will compromise any in-flight requests. |
397 | 0 | Err(error) => return Err(error), |
398 | 0 | Ok(()) => {} |
399 | | }; |
400 | | } else { |
401 | | // We have a notification on one of our open connections. |
402 | 0 | let fd = e.fd(); |
403 | 0 | let client_connection = self.connections.get_mut(&fd).unwrap(); |
404 | 0 |
|
405 | 0 | // If we receive a hang up on a connection, we clear the write buffer and set |
406 | 0 | // the connection state to closed to mark it ready for removal from the |
407 | 0 | // connections map, which will gracefully close the socket. |
408 | 0 | // The connection is also marked for removal when encountering `EPOLLERR`, |
409 | 0 | // since this is an "error condition happened on the associated file |
410 | 0 | // descriptor", according to the `epoll_ctl` man page. |
411 | 0 | if e.event_set().contains(epoll::EventSet::ERROR) |
412 | 0 | || e.event_set().contains(epoll::EventSet::HANG_UP) |
413 | 0 | || e.event_set().contains(epoll::EventSet::READ_HANG_UP) |
414 | | { |
415 | 0 | client_connection.clear_write_buffer(); |
416 | 0 | client_connection.state = ClientConnectionState::Closed; |
417 | 0 | continue; |
418 | 0 | } |
419 | 0 |
|
420 | 0 | if e.event_set().contains(epoll::EventSet::IN) { |
421 | | // We have bytes to read from this connection. |
422 | | // If our `read` yields `Request` objects, we wrap them with an ID before |
423 | | // handing them to the user. |
424 | 0 | parsed_requests.append( |
425 | 0 | &mut client_connection |
426 | 0 | .read()? |
427 | 0 | .into_iter() |
428 | 0 | .map(|request| ServerRequest::new(request, e.data())) |
429 | 0 | .collect(), |
430 | 0 | ); |
431 | 0 | // If the connection was incoming before we read and we now have to write |
432 | 0 | // either an error message or an `expect` response, we change its `epoll` |
433 | 0 | // event set to notify us when the stream is ready for writing. |
434 | 0 | if client_connection.state == ClientConnectionState::AwaitingOutgoing { |
435 | 0 | Self::epoll_mod( |
436 | 0 | &self.epoll, |
437 | 0 | fd, |
438 | 0 | epoll::EventSet::OUT | epoll::EventSet::READ_HANG_UP, |
439 | 0 | )?; |
440 | 0 | } |
441 | 0 | } else if e.event_set().contains(epoll::EventSet::OUT) { |
442 | | // We have bytes to write on this connection. |
443 | 0 | client_connection.write()?; |
444 | | // If the connection was outgoing before we tried to write the responses |
445 | | // and we don't have any more responses to write, we change the `epoll` |
446 | | // event set to notify us when we have bytes to read from the stream. |
447 | 0 | if client_connection.state == ClientConnectionState::AwaitingIncoming { |
448 | 0 | Self::epoll_mod( |
449 | 0 | &self.epoll, |
450 | 0 | fd, |
451 | 0 | epoll::EventSet::IN | epoll::EventSet::READ_HANG_UP, |
452 | 0 | )?; |
453 | 0 | } |
454 | 0 | } |
455 | | } |
456 | | } |
457 | | |
458 | | // Remove dead connections. |
459 | 0 | let epoll = &self.epoll; |
460 | 0 | self.connections.retain(|rawfd, client_connection| { |
461 | 0 | if client_connection.is_done() { |
462 | | // The rawfd should have been registered to the epoll fd. |
463 | 0 | Self::epoll_del(epoll, *rawfd).unwrap(); |
464 | 0 | false |
465 | | } else { |
466 | 0 | true |
467 | | } |
468 | 0 | }); |
469 | 0 |
|
470 | 0 | Ok(parsed_requests) |
471 | 0 | } |
472 | | |
473 | | /// This function is responsible with flushing any remaining outgoing |
474 | | /// requests on the server. |
475 | | /// |
476 | | /// Note that this function can block the thread on write, since the |
477 | | /// operation is blocking. |
478 | 0 | pub fn flush_outgoing_writes(&mut self) { |
479 | 0 | for (_, connection) in self.connections.iter_mut() { |
480 | 0 | while connection.state == ClientConnectionState::AwaitingOutgoing { |
481 | 0 | if let Err(e) = connection.write() { |
482 | 0 | if let ServerError::ConnectionError(ConnectionError::InvalidWrite) = e { |
483 | 0 | // Nothing is logged since an InvalidWrite means we have successfully |
484 | 0 | // flushed the connection |
485 | 0 | } |
486 | 0 | break; |
487 | 0 | } |
488 | | } |
489 | | } |
490 | 0 | } |
491 | | |
492 | | /// The file descriptor of the `epoll` structure can enable the server to become |
493 | | /// a non-blocking structure in an application. |
494 | | /// |
495 | | /// Returns a reference to the instance of the server's internal `epoll` structure. |
496 | | /// |
497 | | /// # Example |
498 | | /// |
499 | | /// ## Non-blocking server |
500 | | /// ``` |
501 | | /// use std::os::unix::io::AsRawFd; |
502 | | /// |
503 | | /// use micro_http::{HttpServer, Response, StatusCode}; |
504 | | /// use vmm_sys_util::epoll; |
505 | | /// |
506 | | /// // Create our epoll manager. |
507 | | /// let epoll = epoll::Epoll::new().unwrap(); |
508 | | /// |
509 | | /// let path_to_socket = "/tmp/epoll_example.sock"; |
510 | | /// std::fs::remove_file(path_to_socket).unwrap_or_default(); |
511 | | /// |
512 | | /// // Start the server. |
513 | | /// let mut server = HttpServer::new(path_to_socket).unwrap(); |
514 | | /// server.start_server().unwrap(); |
515 | | /// |
516 | | /// // Add our server to the `epoll` manager. |
517 | | /// epoll |
518 | | /// .ctl( |
519 | | /// epoll::ControlOperation::Add, |
520 | | /// server.epoll().as_raw_fd(), |
521 | | /// epoll::EpollEvent::new(epoll::EventSet::IN, 1234u64), |
522 | | /// ) |
523 | | /// .unwrap(); |
524 | | /// |
525 | | /// // Connect a client to the server so it doesn't block in our example. |
526 | | /// let mut socket = std::os::unix::net::UnixStream::connect(path_to_socket).unwrap(); |
527 | | /// |
528 | | /// // Control loop of the application. |
529 | | /// let mut events = Vec::with_capacity(10); |
530 | | /// loop { |
531 | | /// let num_ev = epoll.wait(-1, events.as_mut_slice()); |
532 | | /// for event in events { |
533 | | /// match event.data() { |
534 | | /// // The server notification. |
535 | | /// 1234 => { |
536 | | /// let request = server.requests(); |
537 | | /// // Process... |
538 | | /// } |
539 | | /// // Other `epoll` notifications. |
540 | | /// _ => { |
541 | | /// // Do other computation. |
542 | | /// } |
543 | | /// } |
544 | | /// } |
545 | | /// // Break this example loop. |
546 | | /// break; |
547 | | /// } |
548 | | /// ``` |
549 | 0 | pub fn epoll(&self) -> &epoll::Epoll { |
550 | 0 | &self.epoll |
551 | 0 | } |
552 | | |
553 | | /// Enqueues the provided responses in the outgoing connection. |
554 | | /// |
555 | | /// # Errors |
556 | | /// `IOError` is returned when an `epoll::ctl` operation fails. |
557 | 0 | pub fn enqueue_responses(&mut self, responses: Vec<ServerResponse>) -> Result<()> { |
558 | 0 | for response in responses { |
559 | 0 | self.respond(response)?; |
560 | | } |
561 | | |
562 | 0 | Ok(()) |
563 | 0 | } |
564 | | |
565 | | /// Adds the provided response to the outgoing buffer in the corresponding connection. |
566 | | /// |
567 | | /// # Errors |
568 | | /// `IOError` is returned when an `epoll::ctl` operation fails. |
569 | | /// `Underflow` is returned when `enqueue_response` fails. |
570 | 0 | pub fn respond(&mut self, response: ServerResponse) -> Result<()> { |
571 | 0 | if let Some(client_connection) = self.connections.get_mut(&(response.id as i32)) { |
572 | | // If the connection was incoming before we enqueue the response, we change its |
573 | | // `epoll` event set to notify us when the stream is ready for writing. |
574 | 0 | if let ClientConnectionState::AwaitingIncoming = client_connection.state { |
575 | 0 | client_connection.state = ClientConnectionState::AwaitingOutgoing; |
576 | 0 | Self::epoll_mod( |
577 | 0 | &self.epoll, |
578 | 0 | response.id as RawFd, |
579 | 0 | epoll::EventSet::OUT | epoll::EventSet::READ_HANG_UP, |
580 | 0 | )?; |
581 | 0 | } |
582 | 0 | client_connection.enqueue_response(response.response)?; |
583 | 0 | } |
584 | 0 | Ok(()) |
585 | 0 | } |
586 | | |
587 | | /// Accepts a new incoming connection and adds it to the `epoll` notification structure. |
588 | | /// |
589 | | /// # Errors |
590 | | /// `IOError` is returned when socket or epoll operations fail. |
591 | | /// `ServerFull` is returned if server full capacity has been reached. |
592 | 0 | fn handle_new_connection(&mut self) -> Result<()> { |
593 | 0 | if self.connections.len() == MAX_CONNECTIONS { |
594 | | // If we want a replacement policy for connections |
595 | | // this is where we will have it. |
596 | 0 | return Err(ServerError::ServerFull); |
597 | 0 | } |
598 | 0 |
|
599 | 0 | self.socket |
600 | 0 | .accept() |
601 | 0 | .map_err(ServerError::IOError) |
602 | 0 | .and_then(|(stream, _)| { |
603 | 0 | // `HttpConnection` is supposed to work with non-blocking streams. |
604 | 0 | stream |
605 | 0 | .set_nonblocking(true) |
606 | 0 | .map(|_| stream) |
607 | 0 | .map_err(ServerError::IOError) |
608 | 0 | }) |
609 | 0 | .and_then(|stream| { |
610 | 0 | // Add the stream to the `epoll` structure and listen for bytes to be read. |
611 | 0 | let raw_fd = stream.as_raw_fd(); |
612 | 0 | Self::epoll_add(&self.epoll, raw_fd)?; |
613 | 0 | let mut conn = HttpConnection::new(stream); |
614 | 0 | conn.set_payload_max_size(self.payload_max_size); |
615 | 0 | // Then add it to our open connections. |
616 | 0 | self.connections.insert(raw_fd, ClientConnection::new(conn)); |
617 | 0 | Ok(()) |
618 | 0 | }) |
619 | 0 | } |
620 | | |
621 | | /// Changes the event type for a connection to either listen for incoming bytes |
622 | | /// or for when the stream is ready for writing. |
623 | | /// |
624 | | /// # Errors |
625 | | /// `IOError` is returned when an `EPOLL_CTL_MOD` control operation fails. |
626 | 0 | fn epoll_mod(epoll: &epoll::Epoll, stream_fd: RawFd, evset: epoll::EventSet) -> Result<()> { |
627 | 0 | let event = epoll::EpollEvent::new(evset, stream_fd as u64); |
628 | 0 | epoll |
629 | 0 | .ctl(epoll::ControlOperation::Modify, stream_fd, event) |
630 | 0 | .map_err(ServerError::IOError) |
631 | 0 | } |
632 | | |
633 | | /// Adds a stream to the `epoll` notification structure with the `EPOLLIN` event set. |
634 | | /// |
635 | | /// # Errors |
636 | | /// `IOError` is returned when an `EPOLL_CTL_ADD` control operation fails. |
637 | 0 | fn epoll_add(epoll: &epoll::Epoll, stream_fd: RawFd) -> Result<()> { |
638 | 0 | epoll |
639 | 0 | .ctl( |
640 | 0 | epoll::ControlOperation::Add, |
641 | 0 | stream_fd, |
642 | 0 | epoll::EpollEvent::new( |
643 | 0 | epoll::EventSet::IN | epoll::EventSet::READ_HANG_UP, |
644 | 0 | stream_fd as u64, |
645 | 0 | ), |
646 | 0 | ) |
647 | 0 | .map_err(ServerError::IOError) |
648 | 0 | } |
649 | | |
650 | | /// Removes a stream to the `epoll` notification structure. |
651 | 0 | fn epoll_del(epoll: &epoll::Epoll, stream_fd: RawFd) -> Result<()> { |
652 | 0 | epoll |
653 | 0 | .ctl( |
654 | 0 | epoll::ControlOperation::Delete, |
655 | 0 | stream_fd, |
656 | 0 | epoll::EpollEvent::new(epoll::EventSet::IN, stream_fd as u64), |
657 | 0 | ) |
658 | 0 | .map_err(ServerError::IOError) |
659 | 0 | } |
660 | | } |
661 | | |
662 | | #[cfg(test)] |
663 | | mod tests { |
664 | | #![allow(clippy::undocumented_unsafe_blocks)] |
665 | | |
666 | | use super::*; |
667 | | use std::io::{Read, Write}; |
668 | | use std::net::Shutdown; |
669 | | use std::os::unix::net::UnixStream; |
670 | | use std::sync::{Arc, Mutex}; |
671 | | |
672 | | use crate::common::Body; |
673 | | use vmm_sys_util::{eventfd::EFD_NONBLOCK, tempfile::TempFile}; |
674 | | |
675 | | fn get_temp_socket_file() -> TempFile { |
676 | | let mut path_to_socket = TempFile::new().unwrap(); |
677 | | path_to_socket.remove().unwrap(); |
678 | | path_to_socket |
679 | | } |
680 | | |
681 | | #[test] |
682 | | fn test_wait_one_connection() { |
683 | | let path_to_socket = get_temp_socket_file(); |
684 | | |
685 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
686 | | server.start_server().unwrap(); |
687 | | |
688 | | // Test one incoming connection. |
689 | | let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
690 | | assert!(server.requests().unwrap().is_empty()); |
691 | | |
692 | | socket |
693 | | .write_all( |
694 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
695 | | Content-Length: 13\r\n\ |
696 | | Content-Type: application/json\r\n\r\nwhatever body", |
697 | | ) |
698 | | .unwrap(); |
699 | | |
700 | | let mut req_vec = server.requests().unwrap(); |
701 | | let server_request = req_vec.remove(0); |
702 | | |
703 | | server |
704 | | .respond(server_request.process(|_request| { |
705 | | let mut response = Response::new(Version::Http11, StatusCode::OK); |
706 | | let response_body = b"response body"; |
707 | | response.set_body(Body::new(response_body.to_vec())); |
708 | | response |
709 | | })) |
710 | | .unwrap(); |
711 | | assert!(server.requests().unwrap().is_empty()); |
712 | | |
713 | | let mut buf: [u8; 1024] = [0; 1024]; |
714 | | assert!(socket.read(&mut buf[..]).unwrap() > 0); |
715 | | } |
716 | | |
717 | | #[test] |
718 | | fn test_connection_size_limit_exceeded() { |
719 | | let path_to_socket = get_temp_socket_file(); |
720 | | |
721 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
722 | | server.start_server().unwrap(); |
723 | | |
724 | | // Test one incoming connection. |
725 | | let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
726 | | assert!(server.requests().unwrap().is_empty()); |
727 | | |
728 | | socket |
729 | | .write_all( |
730 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
731 | | Content-Length: 51201\r\n\ |
732 | | Content-Type: application/json\r\n\r\naaaaa", |
733 | | ) |
734 | | .unwrap(); |
735 | | assert!(server.requests().unwrap().is_empty()); |
736 | | assert!(server.requests().unwrap().is_empty()); |
737 | | let mut buf: [u8; 265] = [0; 265]; |
738 | | assert!(socket.read(&mut buf[..]).unwrap() > 0); |
739 | | let error_message = b"HTTP/1.1 400 \r\n\ |
740 | | Server: Firecracker API\r\n\ |
741 | | Connection: keep-alive\r\n\ |
742 | | Content-Type: application/json\r\n\ |
743 | | Content-Length: 149\r\n\r\n{ \"error\": \"\ |
744 | | Request payload with size 51201 is larger than \ |
745 | | the limit of 51200 allowed by server.\nAll \ |
746 | | previous unanswered requests will be dropped."; |
747 | | assert_eq!(&buf[..], &error_message[..]); |
748 | | } |
749 | | |
750 | | #[test] |
751 | | fn test_set_payload_size() { |
752 | | let path_to_socket = get_temp_socket_file(); |
753 | | |
754 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
755 | | server.start_server().unwrap(); |
756 | | server.set_payload_max_size(4); |
757 | | |
758 | | // Test one incoming connection. |
759 | | let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
760 | | assert!(server.requests().unwrap().is_empty()); |
761 | | |
762 | | socket |
763 | | .write_all( |
764 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
765 | | Content-Length: 5\r\n\ |
766 | | Content-Type: application/json\r\n\r\naaaaa", |
767 | | ) |
768 | | .unwrap(); |
769 | | assert!(server.requests().unwrap().is_empty()); |
770 | | assert!(server.requests().unwrap().is_empty()); |
771 | | let mut buf: [u8; 260] = [0; 260]; |
772 | | assert!(socket.read(&mut buf[..]).unwrap() > 0); |
773 | | let error_message = b"HTTP/1.1 400 \r\n\ |
774 | | Server: Firecracker API\r\n\ |
775 | | Connection: keep-alive\r\n\ |
776 | | Content-Type: application/json\r\n\ |
777 | | Content-Length: 141\r\n\r\n{ \"error\": \"\ |
778 | | Request payload with size 5 is larger than the \ |
779 | | limit of 4 allowed by server.\nAll previous \ |
780 | | unanswered requests will be dropped.\" }"; |
781 | | assert_eq!(&buf[..], &error_message[..]); |
782 | | } |
783 | | |
784 | | #[test] |
785 | | fn test_wait_one_fd_connection() { |
786 | | use std::os::unix::io::IntoRawFd; |
787 | | let path_to_socket = get_temp_socket_file(); |
788 | | |
789 | | let socket_listener = UnixListener::bind(path_to_socket.as_path()).unwrap(); |
790 | | let socket_fd = socket_listener.into_raw_fd(); |
791 | | |
792 | | let mut server = unsafe { HttpServer::new_from_fd(socket_fd).unwrap() }; |
793 | | server.start_server().unwrap(); |
794 | | |
795 | | // Test one incoming connection. |
796 | | let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
797 | | assert!(server.requests().unwrap().is_empty()); |
798 | | |
799 | | socket |
800 | | .write_all( |
801 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
802 | | Content-Length: 13\r\n\ |
803 | | Content-Type: application/json\r\n\r\nwhatever body", |
804 | | ) |
805 | | .unwrap(); |
806 | | |
807 | | let mut req_vec = server.requests().unwrap(); |
808 | | let server_request = req_vec.remove(0); |
809 | | |
810 | | server |
811 | | .respond(server_request.process(|request| { |
812 | | assert_eq!( |
813 | | std::str::from_utf8(&request.body.as_ref().unwrap().body).unwrap(), |
814 | | "whatever body" |
815 | | ); |
816 | | let mut response = Response::new(Version::Http11, StatusCode::OK); |
817 | | let response_body = b"response body"; |
818 | | response.set_body(Body::new(response_body.to_vec())); |
819 | | response |
820 | | })) |
821 | | .unwrap(); |
822 | | assert!(server.requests().unwrap().is_empty()); |
823 | | |
824 | | let mut buf: [u8; 1024] = [0; 1024]; |
825 | | assert!(socket.read(&mut buf[..]).unwrap() > 0); |
826 | | assert!(String::from_utf8_lossy(&buf).contains("response body")); |
827 | | } |
828 | | |
829 | | #[test] |
830 | | fn test_wait_concurrent_connections() { |
831 | | let path_to_socket = get_temp_socket_file(); |
832 | | |
833 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
834 | | server.start_server().unwrap(); |
835 | | |
836 | | // Test two concurrent connections. |
837 | | let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
838 | | assert!(server.requests().unwrap().is_empty()); |
839 | | |
840 | | first_socket |
841 | | .write_all( |
842 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
843 | | Content-Length: 13\r\n\ |
844 | | Content-Type: application/json\r\n\r\nwhatever body", |
845 | | ) |
846 | | .unwrap(); |
847 | | let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
848 | | |
849 | | let mut req_vec = server.requests().unwrap(); |
850 | | let server_request = req_vec.remove(0); |
851 | | |
852 | | server |
853 | | .respond(server_request.process(|_request| { |
854 | | let mut response = Response::new(Version::Http11, StatusCode::OK); |
855 | | let response_body = b"response body"; |
856 | | response.set_body(Body::new(response_body.to_vec())); |
857 | | response |
858 | | })) |
859 | | .unwrap(); |
860 | | second_socket |
861 | | .write_all( |
862 | | b"GET /machine-config HTTP/1.1\r\n\ |
863 | | Content-Type: application/json\r\n\r\n", |
864 | | ) |
865 | | .unwrap(); |
866 | | |
867 | | let mut req_vec = server.requests().unwrap(); |
868 | | let second_server_request = req_vec.remove(0); |
869 | | |
870 | | assert_eq!( |
871 | | second_server_request.request, |
872 | | Request::try_from( |
873 | | b"GET /machine-config HTTP/1.1\r\n\ |
874 | | Content-Type: application/json\r\n\r\n", |
875 | | None |
876 | | ) |
877 | | .unwrap() |
878 | | ); |
879 | | |
880 | | let mut buf: [u8; 1024] = [0; 1024]; |
881 | | assert!(first_socket.read(&mut buf[..]).unwrap() > 0); |
882 | | first_socket.shutdown(std::net::Shutdown::Both).unwrap(); |
883 | | |
884 | | server |
885 | | .respond(second_server_request.process(|_request| { |
886 | | let mut response = Response::new(Version::Http11, StatusCode::OK); |
887 | | let response_body = b"response second body"; |
888 | | response.set_body(Body::new(response_body.to_vec())); |
889 | | response |
890 | | })) |
891 | | .unwrap(); |
892 | | |
893 | | assert!(server.requests().unwrap().is_empty()); |
894 | | let mut buf: [u8; 1024] = [0; 1024]; |
895 | | assert!(second_socket.read(&mut buf[..]).unwrap() > 0); |
896 | | second_socket.shutdown(std::net::Shutdown::Both).unwrap(); |
897 | | assert!(server.requests().unwrap().is_empty()); |
898 | | } |
899 | | |
900 | | #[test] |
901 | | fn test_wait_expect_connection() { |
902 | | let path_to_socket = get_temp_socket_file(); |
903 | | |
904 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
905 | | server.start_server().unwrap(); |
906 | | |
907 | | // Test one incoming connection with `Expect: 100-continue`. |
908 | | let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
909 | | assert!(server.requests().unwrap().is_empty()); |
910 | | |
911 | | socket |
912 | | .write_all( |
913 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
914 | | Content-Length: 13\r\n\ |
915 | | Expect: 100-continue\r\n\r\n", |
916 | | ) |
917 | | .unwrap(); |
918 | | // `wait` on server to receive what the client set on the socket. |
919 | | // This will set the stream direction to `Outgoing`, as we need to send a `100 CONTINUE` response. |
920 | | let req_vec = server.requests().unwrap(); |
921 | | assert!(req_vec.is_empty()); |
922 | | // Another `wait`, this time to send the response. |
923 | | // Will be called because of an `EPOLLOUT` notification. |
924 | | let req_vec = server.requests().unwrap(); |
925 | | assert!(req_vec.is_empty()); |
926 | | let mut buf: [u8; 1024] = [0; 1024]; |
927 | | assert!(socket.read(&mut buf[..]).unwrap() > 0); |
928 | | |
929 | | socket.write_all(b"whatever body").unwrap(); |
930 | | let mut req_vec = server.requests().unwrap(); |
931 | | let server_request = req_vec.remove(0); |
932 | | |
933 | | server |
934 | | .respond(server_request.process(|_request| { |
935 | | let mut response = Response::new(Version::Http11, StatusCode::OK); |
936 | | let response_body = b"response body"; |
937 | | response.set_body(Body::new(response_body.to_vec())); |
938 | | response |
939 | | })) |
940 | | .unwrap(); |
941 | | |
942 | | let req_vec = server.requests().unwrap(); |
943 | | assert!(req_vec.is_empty()); |
944 | | |
945 | | let mut buf: [u8; 1024] = [0; 1024]; |
946 | | assert!(socket.read(&mut buf[..]).unwrap() > 0); |
947 | | } |
948 | | |
949 | | #[test] |
950 | | fn test_wait_many_connections() { |
951 | | let path_to_socket = get_temp_socket_file(); |
952 | | |
953 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
954 | | server.start_server().unwrap(); |
955 | | |
956 | | let mut sockets: Vec<UnixStream> = Vec::with_capacity(MAX_CONNECTIONS + 1); |
957 | | for _ in 0..MAX_CONNECTIONS { |
958 | | sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap()); |
959 | | assert!(server.requests().unwrap().is_empty()); |
960 | | } |
961 | | |
962 | | sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap()); |
963 | | assert!(server.requests().unwrap().is_empty()); |
964 | | let mut buf: [u8; 120] = [0; 120]; |
965 | | sockets[MAX_CONNECTIONS].read_exact(&mut buf).unwrap(); |
966 | | assert_eq!(&buf[..], SERVER_FULL_ERROR_MESSAGE); |
967 | | assert_eq!(server.connections.len(), 10); |
968 | | { |
969 | | // Drop this stream. |
970 | | let _refused_stream = sockets.pop().unwrap(); |
971 | | } |
972 | | assert_eq!(server.connections.len(), 10); |
973 | | |
974 | | // Check that the server detects a connection shutdown. |
975 | | let sock: &UnixStream = sockets.first().unwrap(); |
976 | | sock.shutdown(Shutdown::Both).unwrap(); |
977 | | assert!(server.requests().unwrap().is_empty()); |
978 | | // Server should drop a closed connection. |
979 | | assert_eq!(server.connections.len(), 9); |
980 | | |
981 | | // Close the backing FD of this connection by dropping |
982 | | // it out of scope. |
983 | | { |
984 | | // Enforce the drop call on the stream |
985 | | let _sock = sockets.pop().unwrap(); |
986 | | } |
987 | | assert!(server.requests().unwrap().is_empty()); |
988 | | // Server should drop a closed connection. |
989 | | assert_eq!(server.connections.len(), 8); |
990 | | |
991 | | let sock: &UnixStream = sockets.get(1).unwrap(); |
992 | | // Close both the read and write sides of the socket |
993 | | // separately and check that the server detects it. |
994 | | sock.shutdown(Shutdown::Read).unwrap(); |
995 | | sock.shutdown(Shutdown::Write).unwrap(); |
996 | | assert!(server.requests().unwrap().is_empty()); |
997 | | // Server should drop a closed connection. |
998 | | assert_eq!(server.connections.len(), 7); |
999 | | } |
1000 | | |
1001 | | #[test] |
1002 | | fn test_wait_parse_error() { |
1003 | | let path_to_socket = get_temp_socket_file(); |
1004 | | |
1005 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
1006 | | server.start_server().unwrap(); |
1007 | | |
1008 | | // Test one incoming connection. |
1009 | | let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
1010 | | socket.set_nonblocking(true).unwrap(); |
1011 | | assert!(server.requests().unwrap().is_empty()); |
1012 | | |
1013 | | socket |
1014 | | .write_all( |
1015 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
1016 | | Content-Length: alpha\r\n\ |
1017 | | Content-Type: application/json\r\n\r\nwhatever body", |
1018 | | ) |
1019 | | .unwrap(); |
1020 | | |
1021 | | assert!(server.requests().unwrap().is_empty()); |
1022 | | assert!(server.requests().unwrap().is_empty()); |
1023 | | let mut buf: [u8; 255] = [0; 255]; |
1024 | | assert!(socket.read(&mut buf[..]).unwrap() > 0); |
1025 | | let error_message = b"HTTP/1.1 400 \r\n\ |
1026 | | Server: Firecracker API\r\n\ |
1027 | | Connection: keep-alive\r\n\ |
1028 | | Content-Type: application/json\r\n\ |
1029 | | Content-Length: 136\r\n\r\n{ \"error\": \"Invalid header. \ |
1030 | | Reason: Invalid value. Key:Content-Length; Value: alpha\nAll previous unanswered requests will be dropped.\" }"; |
1031 | | assert_eq!(&buf[..], &error_message[..]); |
1032 | | |
1033 | | socket |
1034 | | .write_all( |
1035 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
1036 | | Content-Length: alpha\r\n\ |
1037 | | Content-Type: application/json\r\n\r\nwhatever body", |
1038 | | ) |
1039 | | .unwrap(); |
1040 | | } |
1041 | | |
1042 | | #[test] |
1043 | | fn test_wait_in_flight_responses() { |
1044 | | let path_to_socket = get_temp_socket_file(); |
1045 | | |
1046 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
1047 | | server.start_server().unwrap(); |
1048 | | |
1049 | | // Test a connection dropped and then a new one appearing |
1050 | | // before the user had a chance to send the response to the |
1051 | | // first one. |
1052 | | let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
1053 | | assert!(server.requests().unwrap().is_empty()); |
1054 | | |
1055 | | first_socket |
1056 | | .write_all( |
1057 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
1058 | | Content-Length: 13\r\n\ |
1059 | | Content-Type: application/json\r\n\r\nwhatever body", |
1060 | | ) |
1061 | | .unwrap(); |
1062 | | |
1063 | | let mut req_vec = server.requests().unwrap(); |
1064 | | let server_request = req_vec.remove(0); |
1065 | | |
1066 | | first_socket.shutdown(std::net::Shutdown::Both).unwrap(); |
1067 | | assert!(server.requests().unwrap().is_empty()); |
1068 | | let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
1069 | | second_socket.set_nonblocking(true).unwrap(); |
1070 | | assert!(server.requests().unwrap().is_empty()); |
1071 | | |
1072 | | server |
1073 | | .enqueue_responses(vec![server_request.process(|_request| { |
1074 | | let mut response = Response::new(Version::Http11, StatusCode::OK); |
1075 | | let response_body = b"response body"; |
1076 | | response.set_body(Body::new(response_body.to_vec())); |
1077 | | response |
1078 | | })]) |
1079 | | .unwrap(); |
1080 | | assert!(server.requests().unwrap().is_empty()); |
1081 | | assert_eq!(server.connections.len(), 1); |
1082 | | let mut buf: [u8; 1024] = [0; 1024]; |
1083 | | assert!(second_socket.read(&mut buf[..]).is_err()); |
1084 | | |
1085 | | second_socket |
1086 | | .write_all( |
1087 | | b"GET /machine-config HTTP/1.1\r\n\ |
1088 | | Content-Type: application/json\r\n\r\n", |
1089 | | ) |
1090 | | .unwrap(); |
1091 | | |
1092 | | let mut req_vec = server.requests().unwrap(); |
1093 | | let second_server_request = req_vec.remove(0); |
1094 | | |
1095 | | assert_eq!( |
1096 | | second_server_request.request, |
1097 | | Request::try_from( |
1098 | | b"GET /machine-config HTTP/1.1\r\n\ |
1099 | | Content-Type: application/json\r\n\r\n", |
1100 | | None |
1101 | | ) |
1102 | | .unwrap() |
1103 | | ); |
1104 | | |
1105 | | server |
1106 | | .respond(second_server_request.process(|_request| { |
1107 | | let mut response = Response::new(Version::Http11, StatusCode::OK); |
1108 | | let response_body = b"response second body"; |
1109 | | response.set_body(Body::new(response_body.to_vec())); |
1110 | | response |
1111 | | })) |
1112 | | .unwrap(); |
1113 | | |
1114 | | assert!(server.requests().unwrap().is_empty()); |
1115 | | let mut buf: [u8; 1024] = [0; 1024]; |
1116 | | assert!(second_socket.read(&mut buf[..]).unwrap() > 0); |
1117 | | second_socket.shutdown(std::net::Shutdown::Both).unwrap(); |
1118 | | assert!(server.requests().is_ok()); |
1119 | | } |
1120 | | |
1121 | | #[test] |
1122 | | fn test_kill_switch() { |
1123 | | let path_to_socket = get_temp_socket_file(); |
1124 | | |
1125 | | let mut server = HttpServer::new(path_to_socket.as_path()).unwrap(); |
1126 | | let kill_switch = EventFd::new(EFD_NONBLOCK).unwrap(); |
1127 | | server |
1128 | | .add_kill_switch(kill_switch.try_clone().unwrap()) |
1129 | | .unwrap(); |
1130 | | server.start_server().unwrap(); |
1131 | | |
1132 | | let request_result = Arc::new(Mutex::new(Ok(vec![]))); |
1133 | | let res_clone = request_result.clone(); |
1134 | | // Start a thread running the server, expect it to report shutdown event. |
1135 | | let handler = std::thread::spawn(move || { |
1136 | | *res_clone.lock().unwrap() = server.requests(); |
1137 | | }); |
1138 | | |
1139 | | // Trigger kill switch. |
1140 | | kill_switch.write(1).unwrap(); |
1141 | | // Then send request. |
1142 | | let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap(); |
1143 | | socket |
1144 | | .write_all( |
1145 | | b"PATCH /machine-config HTTP/1.1\r\n\ |
1146 | | Content-Length: 13\r\n\ |
1147 | | Content-Type: application/json\r\n\r\nwhatever body", |
1148 | | ) |
1149 | | .unwrap(); |
1150 | | // Wait for server thread to handle events. |
1151 | | handler.join().unwrap(); |
1152 | | |
1153 | | // Expect shutdown event instead of http request event. |
1154 | | let res = request_result.lock().unwrap(); |
1155 | | assert_eq!( |
1156 | | res.as_ref().unwrap_err(), |
1157 | | &ServerError::ShutdownEvent, |
1158 | | "Expected shutdown event, instead got {res:?}" |
1159 | | ); |
1160 | | } |
1161 | | } |