Coverage Report

Created: 2025-01-24 06:22

/src/crow/include/crow/http_server.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#ifdef CROW_USE_BOOST
4
#include <boost/asio.hpp>
5
#ifdef CROW_ENABLE_SSL
6
#include <boost/asio/ssl.hpp>
7
#endif
8
#else
9
#ifndef ASIO_STANDALONE
10
#define ASIO_STANDALONE
11
#endif
12
#include <asio.hpp>
13
#ifdef CROW_ENABLE_SSL
14
#include <asio/ssl.hpp>
15
#endif
16
#endif
17
18
#include <atomic>
19
#include <chrono>
20
#include <cstdint>
21
#include <future>
22
#include <memory>
23
#include <vector>
24
25
#include "crow/version.h"
26
#include "crow/http_connection.h"
27
#include "crow/logging.h"
28
#include "crow/task_timer.h"
29
30
31
namespace crow // NOTE: Already documented in "crow/app.h"
32
{
33
#ifdef CROW_USE_BOOST
34
    namespace asio = boost::asio;
35
    using error_code = boost::system::error_code;
36
#else
37
    using error_code = asio::error_code;
38
#endif
39
    using tcp = asio::ip::tcp;
40
41
    template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42
    class Server
43
    {
44
    public:
45
      Server(Handler* handler,
46
             const tcp::endpoint& endpoint,
47
             std::string server_name = std::string("Crow/") + VERSION,
48
             std::tuple<Middlewares...>* middlewares = nullptr,
49
             uint16_t concurrency = 1,
50
             uint8_t timeout = 5,
51
             typename Adaptor::context* adaptor_ctx = nullptr):
52
0
          acceptor_(io_context_,endpoint),
53
0
          signals_(io_context_),
54
0
          tick_timer_(io_context_),
55
0
          handler_(handler),
56
0
          concurrency_(concurrency),
57
0
          timeout_(timeout),
58
0
          server_name_(server_name),
59
0
          task_queue_length_pool_(concurrency_ - 1),
60
0
          middlewares_(middlewares),
61
0
          adaptor_ctx_(adaptor_ctx)
62
0
        {}
63
64
        void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
65
0
        {
66
0
            tick_interval_ = d;
67
0
            tick_function_ = f;
68
0
        }
69
70
        void on_tick()
71
0
        {
72
0
            tick_function_();
73
0
            tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
74
0
            tick_timer_.async_wait([this](const error_code& ec) {
75
0
                if (ec)
76
0
                    return;
77
0
                on_tick();
78
0
            });
79
0
        }
80
81
        void run()
82
0
        {
83
0
            uint16_t worker_thread_count = concurrency_ - 1;
84
0
            for (int i = 0; i < worker_thread_count; i++)
85
0
                io_context_pool_.emplace_back(new asio::io_context());
86
0
            get_cached_date_str_pool_.resize(worker_thread_count);
87
0
            task_timer_pool_.resize(worker_thread_count);
88
89
0
            std::vector<std::future<void>> v;
90
0
            std::atomic<int> init_count(0);
91
0
            for (uint16_t i = 0; i < worker_thread_count; i++)
92
0
                v.push_back(
93
0
                  std::async(
94
0
                    std::launch::async, [this, i, &init_count] {
95
                        // thread local date string get function
96
0
                        auto last = std::chrono::steady_clock::now();
97
98
0
                        std::string date_str;
99
0
                        auto update_date_str = [&] {
100
0
                            auto last_time_t = time(0);
101
0
                            tm my_tm;
102
103
#if defined(_MSC_VER) || defined(__MINGW32__)
104
                            gmtime_s(&my_tm, &last_time_t);
105
#else
106
0
                            gmtime_r(&last_time_t, &my_tm);
107
0
#endif
108
0
                            date_str.resize(100);
109
0
                            size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
110
0
                            date_str.resize(date_str_sz);
111
0
                        };
112
0
                        update_date_str();
113
0
                        get_cached_date_str_pool_[i] = [&]() -> std::string {
114
0
                            if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
115
0
                            {
116
0
                                last = std::chrono::steady_clock::now();
117
0
                                update_date_str();
118
0
                            }
119
0
                            return date_str;
120
0
                        };
121
122
                        // initializing task timers
123
0
                        detail::task_timer task_timer(*io_context_pool_[i]);
124
0
                        task_timer.set_default_timeout(timeout_);
125
0
                        task_timer_pool_[i] = &task_timer;
126
0
                        task_queue_length_pool_[i] = 0;
127
128
0
                        init_count++;
129
0
                        while (1)
130
0
                        {
131
0
                            try
132
0
                            {
133
0
                                if (io_context_pool_[i]->run() == 0)
134
0
                                {
135
                                    // when io_service.run returns 0, there are no more works to do.
136
0
                                    break;
137
0
                                }
138
0
                            }
139
0
                            catch (std::exception& e)
140
0
                            {
141
0
                                CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
142
0
                            }
143
0
                        }
144
0
                    }));
145
146
0
            if (tick_function_ && tick_interval_.count() > 0)
147
0
            {
148
0
                tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
149
0
                tick_timer_.async_wait(
150
0
                  [this](const error_code& ec) {
151
0
                      if (ec)
152
0
                          return;
153
0
                      on_tick();
154
0
                  });
155
0
            }
156
157
0
            handler_->port(acceptor_.local_endpoint().port());
158
159
160
0
            CROW_LOG_INFO << server_name_
161
0
                          << " server is running at " << (handler_->ssl_used() ? "https://" : "http://")
162
0
                          << acceptor_.local_endpoint().address() << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
163
0
            CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
164
165
0
            signals_.async_wait(
166
0
              [&](const error_code& /*error*/, int /*signal_number*/) {
167
0
                  stop();
168
0
              });
169
170
0
            while (worker_thread_count != init_count)
171
0
                std::this_thread::yield();
172
173
0
            do_accept();
174
175
0
            std::thread(
176
0
              [this] {
177
0
                  notify_start();
178
0
                  io_context_.run();
179
0
                  CROW_LOG_INFO << "Exiting.";
180
0
              })
181
0
              .join();
182
0
        }
183
184
        void stop()
185
0
        {
186
0
            shutting_down_ = true; // Prevent the acceptor from taking new connections
187
0
            for (auto& io_context : io_context_pool_)
188
0
            {
189
0
                if (io_context != nullptr)
190
0
                {
191
0
                    CROW_LOG_INFO << "Closing IO service " << &io_context;
192
0
                    io_context->stop(); // Close all io_services (and HTTP connections)
193
0
                }
194
0
            }
195
196
0
            CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
197
0
            io_context_.stop(); // Close main io_service
198
0
        }
199
200
        uint16_t port() const {
201
            return acceptor_.local_endpoint().port();
202
        }
203
204
        /// Wait until the server has properly started or until timeout
205
        std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
206
        {
207
            std::unique_lock<std::mutex> lock(start_mutex_);
208
            
209
            std::cv_status status = std::cv_status::no_timeout;
210
            while (!server_started_ && ( status==std::cv_status::no_timeout ))
211
                status = cv_started_.wait_until(lock,wait_until);
212
            return status;
213
        }
214
215
        void signal_clear()
216
        {
217
            signals_.clear();
218
        }
219
220
        void signal_add(int signal_number)
221
0
        {
222
0
            signals_.add(signal_number);
223
0
        }
224
225
    private:
226
        uint16_t pick_io_context_idx()
227
0
        {
228
0
            uint16_t min_queue_idx = 0;
229
230
            // TODO improve load balancing
231
            // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
232
            // even though the max value of this can be only uint16_t as concurrency is uint16_t.
233
0
            for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
234
            // No need to check other io_services if the current one has no tasks
235
0
            {
236
0
                if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
237
0
                    min_queue_idx = i;
238
0
            }
239
0
            return min_queue_idx;
240
0
        }
241
242
        void do_accept()
243
0
        {
244
0
            if (!shutting_down_)
245
0
            {
246
0
                uint16_t context_idx = pick_io_context_idx();
247
0
                asio::io_context& ic = *io_context_pool_[context_idx];
248
0
                task_queue_length_pool_[context_idx]++;
249
0
                CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
250
251
0
                auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
252
0
                  ic, handler_, server_name_, middlewares_,
253
0
                  get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
254
255
0
                acceptor_.async_accept(
256
0
                  p->socket(),
257
0
                  [this, p, &ic, context_idx](error_code ec) {
258
0
                      if (!ec)
259
0
                      {
260
0
                          asio::post(ic,
261
0
                            [p] {
262
0
                                p->start();
263
0
                            });
264
0
                      }
265
0
                      else
266
0
                      {
267
0
                          task_queue_length_pool_[context_idx]--;
268
0
                          CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
269
0
                      }
270
0
                      do_accept();
271
0
                  });
272
0
            }
273
0
        }
274
275
        /// Notify anything using `wait_for_start()` to proceed
276
        void notify_start()
277
0
        {
278
0
            std::unique_lock<std::mutex> lock(start_mutex_);
279
0
            server_started_ = true;
280
0
            cv_started_.notify_all();
281
0
        }
282
283
    private:
284
        std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
285
        asio::io_context io_context_;
286
        std::vector<detail::task_timer*> task_timer_pool_;
287
        std::vector<std::function<std::string()>> get_cached_date_str_pool_;
288
        tcp::acceptor acceptor_;
289
        bool shutting_down_ = false;
290
        bool server_started_{false};
291
        std::condition_variable cv_started_;
292
        std::mutex start_mutex_;
293
        asio::signal_set signals_;
294
295
        asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
296
297
        Handler* handler_;
298
        uint16_t concurrency_{2};
299
        std::uint8_t timeout_;
300
        std::string server_name_;
301
        std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
302
303
        std::chrono::milliseconds tick_interval_;
304
        std::function<void()> tick_function_;
305
306
        std::tuple<Middlewares...>* middlewares_;
307
308
        typename Adaptor::context* adaptor_ctx_;
309
    };
310
} // namespace crow