Coverage Report

Created: 2024-05-21 06:19

/rust/git/checkouts/micro-http-22be4cdcbef12607/ef43cef/src/server.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
// SPDX-License-Identifier: Apache-2.0
3
4
use std::collections::HashMap;
5
use std::io::{Read, Write};
6
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
7
use std::os::unix::net::{UnixListener, UnixStream};
8
use std::path::Path;
9
10
use crate::common::{Body, Version};
11
pub use crate::common::{ConnectionError, ServerError};
12
use crate::connection::HttpConnection;
13
use crate::request::Request;
14
use crate::response::{Response, StatusCode};
15
use vmm_sys_util::{epoll, eventfd::EventFd, sock_ctrl_msg::ScmSocket};
16
17
static SERVER_FULL_ERROR_MESSAGE: &[u8] = b"HTTP/1.1 503\r\n\
18
                                            Server: Firecracker API\r\n\
19
                                            Connection: close\r\n\
20
                                            Content-Length: 40\r\n\r\n{ \"error\": \"Too many open connections\" }";
21
const MAX_CONNECTIONS: usize = 10;
22
/// Payload max size
23
pub(crate) const MAX_PAYLOAD_SIZE: usize = 51200;
24
25
type Result<T> = std::result::Result<T, ServerError>;
26
27
/// Wrapper over `Request` which adds an identification token.
28
0
#[derive(Debug)]
29
pub struct ServerRequest {
30
    /// Inner request.
31
    pub request: Request,
32
    /// Identification token.
33
    id: u64,
34
}
35
36
impl ServerRequest {
37
    /// Creates a new `ServerRequest` object from an existing `Request`,
38
    /// adding an identification token.
39
0
    pub fn new(request: Request, id: u64) -> Self {
40
0
        Self { request, id }
41
0
    }
42
43
    /// Returns a reference to the inner request.
44
0
    pub fn inner(&self) -> &Request {
45
0
        &self.request
46
0
    }
47
48
    /// Calls the function provided on the inner request to obtain the response.
49
    /// The response is then wrapped in a `ServerResponse`.
50
    ///
51
    /// Returns a `ServerResponse` ready for yielding to the server
52
0
    pub fn process<F>(&self, mut callable: F) -> ServerResponse
53
0
    where
54
0
        F: FnMut(&Request) -> Response,
55
0
    {
56
0
        let http_response = callable(self.inner());
57
0
        ServerResponse::new(http_response, self.id)
58
0
    }
Unexecuted instantiation: <micro_http::server::ServerRequest>::process::<vmm::api::http::start_http_thread::{closure#0}::{closure#1}::{closure#0}>
Unexecuted instantiation: <micro_http::server::ServerRequest>::process::<_>
59
}
60
61
/// Wrapper over `Response` which adds an identification token.
62
pub struct ServerResponse {
63
    /// Inner response.
64
    response: Response,
65
    /// Identification token.
66
    id: u64,
67
}
68
69
impl ServerResponse {
70
0
    fn new(response: Response, id: u64) -> Self {
71
0
        Self { response, id }
72
0
    }
73
}
74
75
/// Describes the state of the connection as far as data exchange
76
/// on the stream is concerned.
77
0
#[derive(PartialOrd, PartialEq)]
78
enum ClientConnectionState {
79
    AwaitingIncoming,
80
    AwaitingOutgoing,
81
    Closed,
82
}
83
84
/// Wrapper over `HttpConnection` which keeps track of yielded
85
/// requests and absorbed responses.
86
struct ClientConnection<T> {
87
    /// The `HttpConnection` object which handles data exchange.
88
    connection: HttpConnection<T>,
89
    /// The state of the connection in the `epoll` structure.
90
    state: ClientConnectionState,
91
    /// Represents the difference between yielded requests and
92
    /// absorbed responses.
93
    /// This has to be `0` if we want to drop the connection.
94
    in_flight_response_count: u32,
95
}
96
97
impl<T: Read + Write + ScmSocket> ClientConnection<T> {
98
0
    fn new(connection: HttpConnection<T>) -> Self {
99
0
        Self {
100
0
            connection,
101
0
            state: ClientConnectionState::AwaitingIncoming,
102
0
            in_flight_response_count: 0,
103
0
        }
104
0
    }
105
106
0
    fn read(&mut self) -> Result<Vec<Request>> {
107
0
        // Data came into the connection.
108
0
        let mut parsed_requests = vec![];
109
0
        match self.connection.try_read() {
110
            Err(ConnectionError::ConnectionClosed) => {
111
                // Connection timeout.
112
0
                self.state = ClientConnectionState::Closed;
113
0
                // We don't want to propagate this to the server and we will
114
0
                // return no requests and wait for the connection to become
115
0
                // safe to drop.
116
0
                return Ok(vec![]);
117
            }
118
0
            Err(ConnectionError::StreamReadError(inner)) => {
119
0
                // Reading from the connection failed.
120
0
                // We should try to write an error message regardless.
121
0
                let mut internal_error_response =
122
0
                    Response::new(Version::Http11, StatusCode::InternalServerError);
123
0
                internal_error_response.set_body(Body::new(inner.to_string()));
124
0
                self.connection.enqueue_response(internal_error_response);
125
0
            }
126
0
            Err(ConnectionError::ParseError(inner)) => {
127
                // An error occurred while parsing the read bytes.
128
                // Check if there are any valid parsed requests in the queue.
129
0
                while let Some(_discarded_request) = self.connection.pop_parsed_request() {}
130
131
                // Send an error response for the request that gave us the error.
132
0
                let mut error_response = Response::new(Version::Http11, StatusCode::BadRequest);
133
0
                error_response.set_body(Body::new(format!(
134
0
                    "{{ \"error\": \"{}\nAll previous unanswered requests will be dropped.\" }}",
135
0
                    inner
136
0
                )));
137
0
                self.connection.enqueue_response(error_response);
138
            }
139
            Err(ConnectionError::InvalidWrite) | Err(ConnectionError::StreamWriteError(_)) => {
140
                // This is unreachable because `HttpConnection::try_read()` cannot return this error variant.
141
0
                unreachable!();
142
            }
143
            Ok(()) => {
144
0
                while let Some(request) = self.connection.pop_parsed_request() {
145
0
                    // Add all valid requests to `parsed_requests`.
146
0
                    parsed_requests.push(request);
147
0
                }
148
            }
149
        }
150
0
        self.in_flight_response_count = self
151
0
            .in_flight_response_count
152
0
            .checked_add(parsed_requests.len() as u32)
153
0
            .ok_or(ServerError::Overflow)?;
154
        // If the state of the connection has changed, we need to update
155
        // the event set in the `epoll` structure.
156
0
        if self.connection.pending_write() {
157
0
            self.state = ClientConnectionState::AwaitingOutgoing;
158
0
        }
159
160
0
        Ok(parsed_requests)
161
0
    }
162
163
0
    fn write(&mut self) -> Result<()> {
164
0
        // The stream is available for writing.
165
0
        match self.connection.try_write() {
166
0
            Err(ConnectionError::ConnectionClosed) | Err(ConnectionError::StreamWriteError(_)) => {
167
0
                // Writing to the stream failed so it will be removed.
168
0
                self.state = ClientConnectionState::Closed;
169
0
            }
170
            Err(ConnectionError::InvalidWrite) => {
171
                // A `try_write` call was performed on a connection that has nothing
172
                // to write.
173
0
                return Err(ServerError::ConnectionError(ConnectionError::InvalidWrite));
174
            }
175
            _ => {
176
                // Check if we still have bytes to write for this connection.
177
0
                if !self.connection.pending_write() {
178
0
                    self.state = ClientConnectionState::AwaitingIncoming;
179
0
                }
180
            }
181
        }
182
0
        Ok(())
183
0
    }
184
185
0
    fn enqueue_response(&mut self, response: Response) -> Result<()> {
186
0
        if self.state != ClientConnectionState::Closed {
187
0
            self.connection.enqueue_response(response);
188
0
        }
189
0
        self.in_flight_response_count = self
190
0
            .in_flight_response_count
191
0
            .checked_sub(1)
192
0
            .ok_or(ServerError::Underflow)?;
193
0
        Ok(())
194
0
    }
195
196
    /// Discards all pending writes from the inner connection.
197
0
    fn clear_write_buffer(&mut self) {
198
0
        self.connection.clear_write_buffer();
199
0
    }
200
201
    // Returns `true` if the connection is closed and safe to drop.
202
0
    fn is_done(&self) -> bool {
203
0
        self.state == ClientConnectionState::Closed
204
0
            && !self.connection.pending_write()
205
0
            && self.in_flight_response_count == 0
206
0
    }
207
}
208
209
/// HTTP Server implementation using Unix Domain Sockets and `EPOLL` to
210
/// handle multiple connections on the same thread.
211
///
212
/// The function that handles incoming connections, parses incoming
213
/// requests and sends responses for awaiting requests is `requests`.
214
/// It can be called in a loop, which will render the thread that the
215
/// server runs on incapable of performing other operations, or it can
216
/// be used in another `EPOLL` structure, as it provides its `epoll`,
217
/// which is a wrapper over the file descriptor of the epoll structure
218
/// used within the server, and it can be added to another one using
219
/// the `EPOLLIN` flag. Whenever there is a notification on that fd,
220
/// `requests` should be called once.
221
///
222
/// # Example
223
///
224
/// ## Starting and running the server
225
///
226
/// ```
227
/// use micro_http::{HttpServer, Response, StatusCode};
228
///
229
/// let path_to_socket = "/tmp/example.sock";
230
/// std::fs::remove_file(path_to_socket).unwrap_or_default();
231
///
232
/// // Start the server.
233
/// let mut server = HttpServer::new(path_to_socket).unwrap();
234
/// server.start_server().unwrap();
235
///
236
/// // Connect a client to the server so it doesn't block in our example.
237
/// let mut socket = std::os::unix::net::UnixStream::connect(path_to_socket).unwrap();
238
///
239
/// // Server loop processing requests.
240
/// loop {
241
///     for request in server.requests().unwrap() {
242
///         let response = request.process(|request| {
243
///             // Your code here.
244
///             Response::new(request.http_version(), StatusCode::NoContent)
245
///         });
246
///         server.respond(response);
247
///     }
248
///     // Break this example loop.
249
///     break;
250
/// }
251
/// ```
252
pub struct HttpServer {
253
    /// Socket on which we listen for new connections.
254
    socket: UnixListener,
255
    /// Server's epoll instance.
256
    epoll: epoll::Epoll,
257
    /// Event requesting micro-http shutdown.
258
    /// Used to break out of inner `epoll_wait` and reports shutdown event.
259
    kill_switch: Option<EventFd>,
260
    /// Holds the token-connection pairs of the server.
261
    /// Each connection has an associated identification token, which is
262
    /// the file descriptor of the underlying stream.
263
    /// We use the file descriptor of the stream as the key for mapping
264
    /// connections because the 1-to-1 relation is guaranteed by the OS.
265
    connections: HashMap<RawFd, ClientConnection<UnixStream>>,
266
    /// Payload max size
267
    payload_max_size: usize,
268
}
269
270
impl HttpServer {
271
    /// Constructor for `HttpServer`.
272
    ///
273
    /// Returns the newly formed `HttpServer`.
274
    ///
275
    /// # Errors
276
    /// Returns an `IOError` when binding or `epoll::create` fails.
277
0
    pub fn new<P: AsRef<Path>>(path_to_socket: P) -> Result<Self> {
278
0
        let socket = UnixListener::bind(path_to_socket).map_err(ServerError::IOError)?;
279
0
        let epoll = epoll::Epoll::new().map_err(ServerError::IOError)?;
280
0
        Ok(Self {
281
0
            socket,
282
0
            epoll,
283
0
            kill_switch: None,
284
0
            connections: HashMap::new(),
285
0
            payload_max_size: MAX_PAYLOAD_SIZE,
286
0
        })
287
0
    }
288
289
    /// Constructor for `HttpServer`.
290
    ///
291
    /// Returns the newly formed `HttpServer`.
292
    ///
293
    /// # Safety
294
    /// This function requires the socket_fd to be solely owned
295
    /// and not be associated with another File in the caller as it uses
296
    /// the unsafe `UnixListener::from_raw_fd method`.
297
    ///
298
    /// # Errors
299
    /// Returns an `IOError` when `epoll::create` fails.
300
0
    pub unsafe fn new_from_fd(socket_fd: RawFd) -> Result<Self> {
301
0
        let socket = UnixListener::from_raw_fd(socket_fd);
302
0
        let epoll = epoll::Epoll::new().map_err(ServerError::IOError)?;
303
0
        Ok(HttpServer {
304
0
            socket,
305
0
            epoll,
306
0
            kill_switch: None,
307
0
            connections: HashMap::new(),
308
0
            payload_max_size: MAX_PAYLOAD_SIZE,
309
0
        })
310
0
    }
311
312
    /// Adds a `kill_switch` event fd used to break out of inner `epoll_wait`
313
    /// and report a shutdown event.
314
0
    pub fn add_kill_switch(&mut self, kill_switch: EventFd) -> Result<()> {
315
0
        // Add the kill switch to the `epoll` structure.
316
0
        let ret = Self::epoll_add(&self.epoll, kill_switch.as_raw_fd());
317
0
        self.kill_switch = Some(kill_switch);
318
0
        ret
319
0
    }
320
321
    /// This function sets the limit for PUT/PATCH requests. It overwrites the
322
    /// default limit of 0.05MiB with the one allowed by server.
323
0
    pub fn set_payload_max_size(&mut self, request_payload_max_size: usize) {
324
0
        self.payload_max_size = request_payload_max_size;
325
0
    }
326
327
    /// Starts the HTTP Server.
328
0
    pub fn start_server(&mut self) -> Result<()> {
329
0
        // Add the socket on which we listen for new connections to the
330
0
        // `epoll` structure.
331
0
        Self::epoll_add(&self.epoll, self.socket.as_raw_fd())
332
0
    }
333
334
    /// This function is responsible for the data exchange with the clients and should
335
    /// be called when we are either notified through `epoll` that we need to exchange
336
    /// data with at least a client or when we don't need to perform any other operations
337
    /// on this thread and we can afford to call it in a loop.
338
    ///
339
    /// Note that this function will block the current thread if there are no notifications
340
    /// to be handled by the server.
341
    ///
342
    /// Returns a collection of complete and valid requests to be processed by the user
343
    /// of the server. Once processed, responses should be sent using `enqueue_responses()`.
344
    ///
345
    /// # Errors
346
    /// `IOError` is returned when `read`, `write` or `epoll::ctl` operations fail.
347
    /// `ServerFull` is returned when a client is trying to connect to the server, but
348
    /// full capacity has already been reached.
349
    /// `InvalidWrite` is returned when the server attempted to perform a write operation
350
    /// on a connection on which it is not possible.
351
0
    pub fn requests(&mut self) -> Result<Vec<ServerRequest>> {
352
0
        let mut parsed_requests: Vec<ServerRequest> = vec![];
353
0
        // Possible events coming from FDs: 1 + 1 + MAX_CONNECTIONS:
354
0
        // exit-eventfd, sock-listen-fd, active-connections-fds.
355
0
        let mut events = [epoll::EpollEvent::default(); MAX_CONNECTIONS + 2];
356
        // This is a wrapper over the syscall `epoll_wait` and it will block the
357
        // current thread until at least one event is received.
358
        // The received notifications will then populate the `events` array with
359
        // `event_count` elements, where 1 <= event_count <= MAX_CONNECTIONS + 2.
360
0
        let event_count = match self.epoll.wait(-1, &mut events[..]) {
361
0
            Ok(event_count) => event_count,
362
0
            Err(e) if e.raw_os_error() == Some(libc::EINTR) => 0,
363
0
            Err(e) => return Err(ServerError::IOError(e)),
364
        };
365
366
        // Getting the file descriptor for kill switch.
367
        // If there is no kill switch fd, we use value -1 as an invalid fd.
368
0
        let kill_fd = self.kill_switch.as_ref().map_or(-1, |ks| ks.as_raw_fd());
369
370
        // We only iterate over first `event_count` events and discard empty elements
371
        // at the end of the array.
372
0
        for e in events[..event_count].iter() {
373
            // Check the file descriptor which produced the notification `e`.
374
            // It could be that we need to shutdown, or have a new connection, or
375
            // one of our open connections is ready to exchange data with a client.
376
0
            if e.fd() == kill_fd {
377
                // Report that the kill switch was triggered.
378
0
                return Err(ServerError::ShutdownEvent);
379
0
            } else if e.fd() == self.socket.as_raw_fd() {
380
                // We have received a notification on the listener socket, which
381
                // means we have a new connection to accept.
382
0
                match self.handle_new_connection() {
383
                    // If the server is full, we send a message to the client
384
                    // notifying them that we will close the connection, then
385
                    // we discard it.
386
                    Err(ServerError::ServerFull) => {
387
0
                        self.socket
388
0
                            .accept()
389
0
                            .map_err(ServerError::IOError)
390
0
                            .and_then(move |(mut stream, _)| {
391
0
                                stream
392
0
                                    .write(SERVER_FULL_ERROR_MESSAGE)
393
0
                                    .map_err(ServerError::IOError)
394
0
                            })?;
395
                    }
396
                    // An internal error will compromise any in-flight requests.
397
0
                    Err(error) => return Err(error),
398
0
                    Ok(()) => {}
399
                };
400
            } else {
401
                // We have a notification on one of our open connections.
402
0
                let fd = e.fd();
403
0
                let client_connection = self.connections.get_mut(&fd).unwrap();
404
0
405
0
                // If we receive a hang up on a connection, we clear the write buffer and set
406
0
                // the connection state to closed to mark it ready for removal from the
407
0
                // connections map, which will gracefully close the socket.
408
0
                // The connection is also marked for removal when encountering `EPOLLERR`,
409
0
                // since this is an "error condition happened on the associated file
410
0
                // descriptor", according to the `epoll_ctl` man page.
411
0
                if e.event_set().contains(epoll::EventSet::ERROR)
412
0
                    || e.event_set().contains(epoll::EventSet::HANG_UP)
413
0
                    || e.event_set().contains(epoll::EventSet::READ_HANG_UP)
414
                {
415
0
                    client_connection.clear_write_buffer();
416
0
                    client_connection.state = ClientConnectionState::Closed;
417
0
                    continue;
418
0
                }
419
0
420
0
                if e.event_set().contains(epoll::EventSet::IN) {
421
                    // We have bytes to read from this connection.
422
                    // If our `read` yields `Request` objects, we wrap them with an ID before
423
                    // handing them to the user.
424
0
                    parsed_requests.append(
425
0
                        &mut client_connection
426
0
                            .read()?
427
0
                            .into_iter()
428
0
                            .map(|request| ServerRequest::new(request, e.data()))
429
0
                            .collect(),
430
0
                    );
431
0
                    // If the connection was incoming before we read and we now have to write
432
0
                    // either an error message or an `expect` response, we change its `epoll`
433
0
                    // event set to notify us when the stream is ready for writing.
434
0
                    if client_connection.state == ClientConnectionState::AwaitingOutgoing {
435
0
                        Self::epoll_mod(
436
0
                            &self.epoll,
437
0
                            fd,
438
0
                            epoll::EventSet::OUT | epoll::EventSet::READ_HANG_UP,
439
0
                        )?;
440
0
                    }
441
0
                } else if e.event_set().contains(epoll::EventSet::OUT) {
442
                    // We have bytes to write on this connection.
443
0
                    client_connection.write()?;
444
                    // If the connection was outgoing before we tried to write the responses
445
                    // and we don't have any more responses to write, we change the `epoll`
446
                    // event set to notify us when we have bytes to read from the stream.
447
0
                    if client_connection.state == ClientConnectionState::AwaitingIncoming {
448
0
                        Self::epoll_mod(
449
0
                            &self.epoll,
450
0
                            fd,
451
0
                            epoll::EventSet::IN | epoll::EventSet::READ_HANG_UP,
452
0
                        )?;
453
0
                    }
454
0
                }
455
            }
456
        }
457
458
        // Remove dead connections.
459
0
        let epoll = &self.epoll;
460
0
        self.connections.retain(|rawfd, client_connection| {
461
0
            if client_connection.is_done() {
462
                // The rawfd should have been registered to the epoll fd.
463
0
                Self::epoll_del(epoll, *rawfd).unwrap();
464
0
                false
465
            } else {
466
0
                true
467
            }
468
0
        });
469
0
470
0
        Ok(parsed_requests)
471
0
    }
472
473
    /// This function is responsible with flushing any remaining outgoing
474
    /// requests on the server.
475
    ///
476
    /// Note that this function can block the thread on write, since the
477
    /// operation is blocking.
478
0
    pub fn flush_outgoing_writes(&mut self) {
479
0
        for (_, connection) in self.connections.iter_mut() {
480
0
            while connection.state == ClientConnectionState::AwaitingOutgoing {
481
0
                if let Err(e) = connection.write() {
482
0
                    if let ServerError::ConnectionError(ConnectionError::InvalidWrite) = e {
483
0
                        // Nothing is logged since an InvalidWrite means we have successfully
484
0
                        // flushed the connection
485
0
                    }
486
0
                    break;
487
0
                }
488
            }
489
        }
490
0
    }
491
492
    /// The file descriptor of the `epoll` structure can enable the server to become
493
    /// a non-blocking structure in an application.
494
    ///
495
    /// Returns a reference to the instance of the server's internal `epoll` structure.
496
    ///
497
    /// # Example
498
    ///
499
    /// ## Non-blocking server
500
    /// ```
501
    /// use std::os::unix::io::AsRawFd;
502
    ///
503
    /// use micro_http::{HttpServer, Response, StatusCode};
504
    /// use vmm_sys_util::epoll;
505
    ///
506
    /// // Create our epoll manager.
507
    /// let epoll = epoll::Epoll::new().unwrap();
508
    ///
509
    /// let path_to_socket = "/tmp/epoll_example.sock";
510
    /// std::fs::remove_file(path_to_socket).unwrap_or_default();
511
    ///
512
    /// // Start the server.
513
    /// let mut server = HttpServer::new(path_to_socket).unwrap();
514
    /// server.start_server().unwrap();
515
    ///
516
    /// // Add our server to the `epoll` manager.
517
    /// epoll
518
    ///     .ctl(
519
    ///         epoll::ControlOperation::Add,
520
    ///         server.epoll().as_raw_fd(),
521
    ///         epoll::EpollEvent::new(epoll::EventSet::IN, 1234u64),
522
    ///     )
523
    ///     .unwrap();
524
    ///
525
    /// // Connect a client to the server so it doesn't block in our example.
526
    /// let mut socket = std::os::unix::net::UnixStream::connect(path_to_socket).unwrap();
527
    ///
528
    /// // Control loop of the application.
529
    /// let mut events = Vec::with_capacity(10);
530
    /// loop {
531
    ///     let num_ev = epoll.wait(-1, events.as_mut_slice());
532
    ///     for event in events {
533
    ///         match event.data() {
534
    ///             // The server notification.
535
    ///             1234 => {
536
    ///                 let request = server.requests();
537
    ///                 // Process...
538
    ///             }
539
    ///             // Other `epoll` notifications.
540
    ///             _ => {
541
    ///                 // Do other computation.
542
    ///             }
543
    ///         }
544
    ///     }
545
    ///     // Break this example loop.
546
    ///     break;
547
    /// }
548
    /// ```
549
0
    pub fn epoll(&self) -> &epoll::Epoll {
550
0
        &self.epoll
551
0
    }
552
553
    /// Enqueues the provided responses in the outgoing connection.
554
    ///
555
    /// # Errors
556
    /// `IOError` is returned when an `epoll::ctl` operation fails.
557
0
    pub fn enqueue_responses(&mut self, responses: Vec<ServerResponse>) -> Result<()> {
558
0
        for response in responses {
559
0
            self.respond(response)?;
560
        }
561
562
0
        Ok(())
563
0
    }
564
565
    /// Adds the provided response to the outgoing buffer in the corresponding connection.
566
    ///
567
    /// # Errors
568
    /// `IOError` is returned when an `epoll::ctl` operation fails.
569
    /// `Underflow` is returned when `enqueue_response` fails.
570
0
    pub fn respond(&mut self, response: ServerResponse) -> Result<()> {
571
0
        if let Some(client_connection) = self.connections.get_mut(&(response.id as i32)) {
572
            // If the connection was incoming before we enqueue the response, we change its
573
            // `epoll` event set to notify us when the stream is ready for writing.
574
0
            if let ClientConnectionState::AwaitingIncoming = client_connection.state {
575
0
                client_connection.state = ClientConnectionState::AwaitingOutgoing;
576
0
                Self::epoll_mod(
577
0
                    &self.epoll,
578
0
                    response.id as RawFd,
579
0
                    epoll::EventSet::OUT | epoll::EventSet::READ_HANG_UP,
580
0
                )?;
581
0
            }
582
0
            client_connection.enqueue_response(response.response)?;
583
0
        }
584
0
        Ok(())
585
0
    }
586
587
    /// Accepts a new incoming connection and adds it to the `epoll` notification structure.
588
    ///
589
    /// # Errors
590
    /// `IOError` is returned when socket or epoll operations fail.
591
    /// `ServerFull` is returned if server full capacity has been reached.
592
0
    fn handle_new_connection(&mut self) -> Result<()> {
593
0
        if self.connections.len() == MAX_CONNECTIONS {
594
            // If we want a replacement policy for connections
595
            // this is where we will have it.
596
0
            return Err(ServerError::ServerFull);
597
0
        }
598
0
599
0
        self.socket
600
0
            .accept()
601
0
            .map_err(ServerError::IOError)
602
0
            .and_then(|(stream, _)| {
603
0
                // `HttpConnection` is supposed to work with non-blocking streams.
604
0
                stream
605
0
                    .set_nonblocking(true)
606
0
                    .map(|_| stream)
607
0
                    .map_err(ServerError::IOError)
608
0
            })
609
0
            .and_then(|stream| {
610
0
                // Add the stream to the `epoll` structure and listen for bytes to be read.
611
0
                let raw_fd = stream.as_raw_fd();
612
0
                Self::epoll_add(&self.epoll, raw_fd)?;
613
0
                let mut conn = HttpConnection::new(stream);
614
0
                conn.set_payload_max_size(self.payload_max_size);
615
0
                // Then add it to our open connections.
616
0
                self.connections.insert(raw_fd, ClientConnection::new(conn));
617
0
                Ok(())
618
0
            })
619
0
    }
620
621
    /// Changes the event type for a connection to either listen for incoming bytes
622
    /// or for when the stream is ready for writing.
623
    ///
624
    /// # Errors
625
    /// `IOError` is returned when an `EPOLL_CTL_MOD` control operation fails.
626
0
    fn epoll_mod(epoll: &epoll::Epoll, stream_fd: RawFd, evset: epoll::EventSet) -> Result<()> {
627
0
        let event = epoll::EpollEvent::new(evset, stream_fd as u64);
628
0
        epoll
629
0
            .ctl(epoll::ControlOperation::Modify, stream_fd, event)
630
0
            .map_err(ServerError::IOError)
631
0
    }
632
633
    /// Adds a stream to the `epoll` notification structure with the `EPOLLIN` event set.
634
    ///
635
    /// # Errors
636
    /// `IOError` is returned when an `EPOLL_CTL_ADD` control operation fails.
637
0
    fn epoll_add(epoll: &epoll::Epoll, stream_fd: RawFd) -> Result<()> {
638
0
        epoll
639
0
            .ctl(
640
0
                epoll::ControlOperation::Add,
641
0
                stream_fd,
642
0
                epoll::EpollEvent::new(
643
0
                    epoll::EventSet::IN | epoll::EventSet::READ_HANG_UP,
644
0
                    stream_fd as u64,
645
0
                ),
646
0
            )
647
0
            .map_err(ServerError::IOError)
648
0
    }
649
650
    /// Removes a stream to the `epoll` notification structure.
651
0
    fn epoll_del(epoll: &epoll::Epoll, stream_fd: RawFd) -> Result<()> {
652
0
        epoll
653
0
            .ctl(
654
0
                epoll::ControlOperation::Delete,
655
0
                stream_fd,
656
0
                epoll::EpollEvent::new(epoll::EventSet::IN, stream_fd as u64),
657
0
            )
658
0
            .map_err(ServerError::IOError)
659
0
    }
660
}
661
662
#[cfg(test)]
663
mod tests {
664
    #![allow(clippy::undocumented_unsafe_blocks)]
665
666
    use super::*;
667
    use std::io::{Read, Write};
668
    use std::net::Shutdown;
669
    use std::os::unix::net::UnixStream;
670
    use std::sync::{Arc, Mutex};
671
672
    use crate::common::Body;
673
    use vmm_sys_util::{eventfd::EFD_NONBLOCK, tempfile::TempFile};
674
675
    fn get_temp_socket_file() -> TempFile {
676
        let mut path_to_socket = TempFile::new().unwrap();
677
        path_to_socket.remove().unwrap();
678
        path_to_socket
679
    }
680
681
    #[test]
682
    fn test_wait_one_connection() {
683
        let path_to_socket = get_temp_socket_file();
684
685
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
686
        server.start_server().unwrap();
687
688
        // Test one incoming connection.
689
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
690
        assert!(server.requests().unwrap().is_empty());
691
692
        socket
693
            .write_all(
694
                b"PATCH /machine-config HTTP/1.1\r\n\
695
                         Content-Length: 13\r\n\
696
                         Content-Type: application/json\r\n\r\nwhatever body",
697
            )
698
            .unwrap();
699
700
        let mut req_vec = server.requests().unwrap();
701
        let server_request = req_vec.remove(0);
702
703
        server
704
            .respond(server_request.process(|_request| {
705
                let mut response = Response::new(Version::Http11, StatusCode::OK);
706
                let response_body = b"response body";
707
                response.set_body(Body::new(response_body.to_vec()));
708
                response
709
            }))
710
            .unwrap();
711
        assert!(server.requests().unwrap().is_empty());
712
713
        let mut buf: [u8; 1024] = [0; 1024];
714
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
715
    }
716
717
    #[test]
718
    fn test_connection_size_limit_exceeded() {
719
        let path_to_socket = get_temp_socket_file();
720
721
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
722
        server.start_server().unwrap();
723
724
        // Test one incoming connection.
725
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
726
        assert!(server.requests().unwrap().is_empty());
727
728
        socket
729
            .write_all(
730
                b"PATCH /machine-config HTTP/1.1\r\n\
731
                         Content-Length: 51201\r\n\
732
                         Content-Type: application/json\r\n\r\naaaaa",
733
            )
734
            .unwrap();
735
        assert!(server.requests().unwrap().is_empty());
736
        assert!(server.requests().unwrap().is_empty());
737
        let mut buf: [u8; 265] = [0; 265];
738
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
739
        let error_message = b"HTTP/1.1 400 \r\n\
740
                              Server: Firecracker API\r\n\
741
                              Connection: keep-alive\r\n\
742
                              Content-Type: application/json\r\n\
743
                              Content-Length: 149\r\n\r\n{ \"error\": \"\
744
                              Request payload with size 51201 is larger than \
745
                              the limit of 51200 allowed by server.\nAll \
746
                              previous unanswered requests will be dropped.";
747
        assert_eq!(&buf[..], &error_message[..]);
748
    }
749
750
    #[test]
751
    fn test_set_payload_size() {
752
        let path_to_socket = get_temp_socket_file();
753
754
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
755
        server.start_server().unwrap();
756
        server.set_payload_max_size(4);
757
758
        // Test one incoming connection.
759
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
760
        assert!(server.requests().unwrap().is_empty());
761
762
        socket
763
            .write_all(
764
                b"PATCH /machine-config HTTP/1.1\r\n\
765
                         Content-Length: 5\r\n\
766
                         Content-Type: application/json\r\n\r\naaaaa",
767
            )
768
            .unwrap();
769
        assert!(server.requests().unwrap().is_empty());
770
        assert!(server.requests().unwrap().is_empty());
771
        let mut buf: [u8; 260] = [0; 260];
772
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
773
        let error_message = b"HTTP/1.1 400 \r\n\
774
                              Server: Firecracker API\r\n\
775
                              Connection: keep-alive\r\n\
776
                              Content-Type: application/json\r\n\
777
                              Content-Length: 141\r\n\r\n{ \"error\": \"\
778
                              Request payload with size 5 is larger than the \
779
                              limit of 4 allowed by server.\nAll previous \
780
                              unanswered requests will be dropped.\" }";
781
        assert_eq!(&buf[..], &error_message[..]);
782
    }
783
784
    #[test]
785
    fn test_wait_one_fd_connection() {
786
        use std::os::unix::io::IntoRawFd;
787
        let path_to_socket = get_temp_socket_file();
788
789
        let socket_listener = UnixListener::bind(path_to_socket.as_path()).unwrap();
790
        let socket_fd = socket_listener.into_raw_fd();
791
792
        let mut server = unsafe { HttpServer::new_from_fd(socket_fd).unwrap() };
793
        server.start_server().unwrap();
794
795
        // Test one incoming connection.
796
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
797
        assert!(server.requests().unwrap().is_empty());
798
799
        socket
800
            .write_all(
801
                b"PATCH /machine-config HTTP/1.1\r\n\
802
                         Content-Length: 13\r\n\
803
                         Content-Type: application/json\r\n\r\nwhatever body",
804
            )
805
            .unwrap();
806
807
        let mut req_vec = server.requests().unwrap();
808
        let server_request = req_vec.remove(0);
809
810
        server
811
            .respond(server_request.process(|request| {
812
                assert_eq!(
813
                    std::str::from_utf8(&request.body.as_ref().unwrap().body).unwrap(),
814
                    "whatever body"
815
                );
816
                let mut response = Response::new(Version::Http11, StatusCode::OK);
817
                let response_body = b"response body";
818
                response.set_body(Body::new(response_body.to_vec()));
819
                response
820
            }))
821
            .unwrap();
822
        assert!(server.requests().unwrap().is_empty());
823
824
        let mut buf: [u8; 1024] = [0; 1024];
825
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
826
        assert!(String::from_utf8_lossy(&buf).contains("response body"));
827
    }
828
829
    #[test]
830
    fn test_wait_concurrent_connections() {
831
        let path_to_socket = get_temp_socket_file();
832
833
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
834
        server.start_server().unwrap();
835
836
        // Test two concurrent connections.
837
        let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
838
        assert!(server.requests().unwrap().is_empty());
839
840
        first_socket
841
            .write_all(
842
                b"PATCH /machine-config HTTP/1.1\r\n\
843
                               Content-Length: 13\r\n\
844
                               Content-Type: application/json\r\n\r\nwhatever body",
845
            )
846
            .unwrap();
847
        let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
848
849
        let mut req_vec = server.requests().unwrap();
850
        let server_request = req_vec.remove(0);
851
852
        server
853
            .respond(server_request.process(|_request| {
854
                let mut response = Response::new(Version::Http11, StatusCode::OK);
855
                let response_body = b"response body";
856
                response.set_body(Body::new(response_body.to_vec()));
857
                response
858
            }))
859
            .unwrap();
860
        second_socket
861
            .write_all(
862
                b"GET /machine-config HTTP/1.1\r\n\
863
                                Content-Type: application/json\r\n\r\n",
864
            )
865
            .unwrap();
866
867
        let mut req_vec = server.requests().unwrap();
868
        let second_server_request = req_vec.remove(0);
869
870
        assert_eq!(
871
            second_server_request.request,
872
            Request::try_from(
873
                b"GET /machine-config HTTP/1.1\r\n\
874
            Content-Type: application/json\r\n\r\n",
875
                None
876
            )
877
            .unwrap()
878
        );
879
880
        let mut buf: [u8; 1024] = [0; 1024];
881
        assert!(first_socket.read(&mut buf[..]).unwrap() > 0);
882
        first_socket.shutdown(std::net::Shutdown::Both).unwrap();
883
884
        server
885
            .respond(second_server_request.process(|_request| {
886
                let mut response = Response::new(Version::Http11, StatusCode::OK);
887
                let response_body = b"response second body";
888
                response.set_body(Body::new(response_body.to_vec()));
889
                response
890
            }))
891
            .unwrap();
892
893
        assert!(server.requests().unwrap().is_empty());
894
        let mut buf: [u8; 1024] = [0; 1024];
895
        assert!(second_socket.read(&mut buf[..]).unwrap() > 0);
896
        second_socket.shutdown(std::net::Shutdown::Both).unwrap();
897
        assert!(server.requests().unwrap().is_empty());
898
    }
899
900
    #[test]
901
    fn test_wait_expect_connection() {
902
        let path_to_socket = get_temp_socket_file();
903
904
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
905
        server.start_server().unwrap();
906
907
        // Test one incoming connection with `Expect: 100-continue`.
908
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
909
        assert!(server.requests().unwrap().is_empty());
910
911
        socket
912
            .write_all(
913
                b"PATCH /machine-config HTTP/1.1\r\n\
914
                         Content-Length: 13\r\n\
915
                         Expect: 100-continue\r\n\r\n",
916
            )
917
            .unwrap();
918
        // `wait` on server to receive what the client set on the socket.
919
        // This will set the stream direction to `Outgoing`, as we need to send a `100 CONTINUE` response.
920
        let req_vec = server.requests().unwrap();
921
        assert!(req_vec.is_empty());
922
        // Another `wait`, this time to send the response.
923
        // Will be called because of an `EPOLLOUT` notification.
924
        let req_vec = server.requests().unwrap();
925
        assert!(req_vec.is_empty());
926
        let mut buf: [u8; 1024] = [0; 1024];
927
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
928
929
        socket.write_all(b"whatever body").unwrap();
930
        let mut req_vec = server.requests().unwrap();
931
        let server_request = req_vec.remove(0);
932
933
        server
934
            .respond(server_request.process(|_request| {
935
                let mut response = Response::new(Version::Http11, StatusCode::OK);
936
                let response_body = b"response body";
937
                response.set_body(Body::new(response_body.to_vec()));
938
                response
939
            }))
940
            .unwrap();
941
942
        let req_vec = server.requests().unwrap();
943
        assert!(req_vec.is_empty());
944
945
        let mut buf: [u8; 1024] = [0; 1024];
946
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
947
    }
948
949
    #[test]
950
    fn test_wait_many_connections() {
951
        let path_to_socket = get_temp_socket_file();
952
953
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
954
        server.start_server().unwrap();
955
956
        let mut sockets: Vec<UnixStream> = Vec::with_capacity(MAX_CONNECTIONS + 1);
957
        for _ in 0..MAX_CONNECTIONS {
958
            sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap());
959
            assert!(server.requests().unwrap().is_empty());
960
        }
961
962
        sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap());
963
        assert!(server.requests().unwrap().is_empty());
964
        let mut buf: [u8; 120] = [0; 120];
965
        sockets[MAX_CONNECTIONS].read_exact(&mut buf).unwrap();
966
        assert_eq!(&buf[..], SERVER_FULL_ERROR_MESSAGE);
967
        assert_eq!(server.connections.len(), 10);
968
        {
969
            // Drop this stream.
970
            let _refused_stream = sockets.pop().unwrap();
971
        }
972
        assert_eq!(server.connections.len(), 10);
973
974
        // Check that the server detects a connection shutdown.
975
        let sock: &UnixStream = sockets.first().unwrap();
976
        sock.shutdown(Shutdown::Both).unwrap();
977
        assert!(server.requests().unwrap().is_empty());
978
        // Server should drop a closed connection.
979
        assert_eq!(server.connections.len(), 9);
980
981
        // Close the backing FD of this connection by dropping
982
        // it out of scope.
983
        {
984
            // Enforce the drop call on the stream
985
            let _sock = sockets.pop().unwrap();
986
        }
987
        assert!(server.requests().unwrap().is_empty());
988
        // Server should drop a closed connection.
989
        assert_eq!(server.connections.len(), 8);
990
991
        let sock: &UnixStream = sockets.get(1).unwrap();
992
        // Close both the read and write sides of the socket
993
        // separately and check that the server detects it.
994
        sock.shutdown(Shutdown::Read).unwrap();
995
        sock.shutdown(Shutdown::Write).unwrap();
996
        assert!(server.requests().unwrap().is_empty());
997
        // Server should drop a closed connection.
998
        assert_eq!(server.connections.len(), 7);
999
    }
1000
1001
    #[test]
1002
    fn test_wait_parse_error() {
1003
        let path_to_socket = get_temp_socket_file();
1004
1005
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
1006
        server.start_server().unwrap();
1007
1008
        // Test one incoming connection.
1009
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
1010
        socket.set_nonblocking(true).unwrap();
1011
        assert!(server.requests().unwrap().is_empty());
1012
1013
        socket
1014
            .write_all(
1015
                b"PATCH /machine-config HTTP/1.1\r\n\
1016
                         Content-Length: alpha\r\n\
1017
                         Content-Type: application/json\r\n\r\nwhatever body",
1018
            )
1019
            .unwrap();
1020
1021
        assert!(server.requests().unwrap().is_empty());
1022
        assert!(server.requests().unwrap().is_empty());
1023
        let mut buf: [u8; 255] = [0; 255];
1024
        assert!(socket.read(&mut buf[..]).unwrap() > 0);
1025
        let error_message = b"HTTP/1.1 400 \r\n\
1026
                              Server: Firecracker API\r\n\
1027
                              Connection: keep-alive\r\n\
1028
                              Content-Type: application/json\r\n\
1029
                              Content-Length: 136\r\n\r\n{ \"error\": \"Invalid header. \
1030
                              Reason: Invalid value. Key:Content-Length; Value: alpha\nAll previous unanswered requests will be dropped.\" }";
1031
        assert_eq!(&buf[..], &error_message[..]);
1032
1033
        socket
1034
            .write_all(
1035
                b"PATCH /machine-config HTTP/1.1\r\n\
1036
                         Content-Length: alpha\r\n\
1037
                         Content-Type: application/json\r\n\r\nwhatever body",
1038
            )
1039
            .unwrap();
1040
    }
1041
1042
    #[test]
1043
    fn test_wait_in_flight_responses() {
1044
        let path_to_socket = get_temp_socket_file();
1045
1046
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
1047
        server.start_server().unwrap();
1048
1049
        // Test a connection dropped and then a new one appearing
1050
        // before the user had a chance to send the response to the
1051
        // first one.
1052
        let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
1053
        assert!(server.requests().unwrap().is_empty());
1054
1055
        first_socket
1056
            .write_all(
1057
                b"PATCH /machine-config HTTP/1.1\r\n\
1058
                               Content-Length: 13\r\n\
1059
                               Content-Type: application/json\r\n\r\nwhatever body",
1060
            )
1061
            .unwrap();
1062
1063
        let mut req_vec = server.requests().unwrap();
1064
        let server_request = req_vec.remove(0);
1065
1066
        first_socket.shutdown(std::net::Shutdown::Both).unwrap();
1067
        assert!(server.requests().unwrap().is_empty());
1068
        let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
1069
        second_socket.set_nonblocking(true).unwrap();
1070
        assert!(server.requests().unwrap().is_empty());
1071
1072
        server
1073
            .enqueue_responses(vec![server_request.process(|_request| {
1074
                let mut response = Response::new(Version::Http11, StatusCode::OK);
1075
                let response_body = b"response body";
1076
                response.set_body(Body::new(response_body.to_vec()));
1077
                response
1078
            })])
1079
            .unwrap();
1080
        assert!(server.requests().unwrap().is_empty());
1081
        assert_eq!(server.connections.len(), 1);
1082
        let mut buf: [u8; 1024] = [0; 1024];
1083
        assert!(second_socket.read(&mut buf[..]).is_err());
1084
1085
        second_socket
1086
            .write_all(
1087
                b"GET /machine-config HTTP/1.1\r\n\
1088
                                Content-Type: application/json\r\n\r\n",
1089
            )
1090
            .unwrap();
1091
1092
        let mut req_vec = server.requests().unwrap();
1093
        let second_server_request = req_vec.remove(0);
1094
1095
        assert_eq!(
1096
            second_server_request.request,
1097
            Request::try_from(
1098
                b"GET /machine-config HTTP/1.1\r\n\
1099
            Content-Type: application/json\r\n\r\n",
1100
                None
1101
            )
1102
            .unwrap()
1103
        );
1104
1105
        server
1106
            .respond(second_server_request.process(|_request| {
1107
                let mut response = Response::new(Version::Http11, StatusCode::OK);
1108
                let response_body = b"response second body";
1109
                response.set_body(Body::new(response_body.to_vec()));
1110
                response
1111
            }))
1112
            .unwrap();
1113
1114
        assert!(server.requests().unwrap().is_empty());
1115
        let mut buf: [u8; 1024] = [0; 1024];
1116
        assert!(second_socket.read(&mut buf[..]).unwrap() > 0);
1117
        second_socket.shutdown(std::net::Shutdown::Both).unwrap();
1118
        assert!(server.requests().is_ok());
1119
    }
1120
1121
    #[test]
1122
    fn test_kill_switch() {
1123
        let path_to_socket = get_temp_socket_file();
1124
1125
        let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
1126
        let kill_switch = EventFd::new(EFD_NONBLOCK).unwrap();
1127
        server
1128
            .add_kill_switch(kill_switch.try_clone().unwrap())
1129
            .unwrap();
1130
        server.start_server().unwrap();
1131
1132
        let request_result = Arc::new(Mutex::new(Ok(vec![])));
1133
        let res_clone = request_result.clone();
1134
        // Start a thread running the server, expect it to report shutdown event.
1135
        let handler = std::thread::spawn(move || {
1136
            *res_clone.lock().unwrap() = server.requests();
1137
        });
1138
1139
        // Trigger kill switch.
1140
        kill_switch.write(1).unwrap();
1141
        // Then send request.
1142
        let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
1143
        socket
1144
            .write_all(
1145
                b"PATCH /machine-config HTTP/1.1\r\n\
1146
                         Content-Length: 13\r\n\
1147
                         Content-Type: application/json\r\n\r\nwhatever body",
1148
            )
1149
            .unwrap();
1150
        // Wait for server thread to handle events.
1151
        handler.join().unwrap();
1152
1153
        // Expect shutdown event instead of http request event.
1154
        let res = request_result.lock().unwrap();
1155
        assert_eq!(
1156
            res.as_ref().unwrap_err(),
1157
            &ServerError::ShutdownEvent,
1158
            "Expected shutdown event, instead got {res:?}"
1159
        );
1160
    }
1161
}