Coverage Report

Created: 2024-09-11 06:20

/src/crow/include/crow/http_server.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#ifdef CROW_USE_BOOST
4
#include <boost/asio.hpp>
5
#ifdef CROW_ENABLE_SSL
6
#include <boost/asio/ssl.hpp>
7
#endif
8
#else
9
#ifndef ASIO_STANDALONE
10
#define ASIO_STANDALONE
11
#endif
12
#include <asio.hpp>
13
#ifdef CROW_ENABLE_SSL
14
#include <asio/ssl.hpp>
15
#endif
16
#endif
17
18
#include <atomic>
19
#include <chrono>
20
#include <cstdint>
21
#include <future>
22
#include <memory>
23
#include <vector>
24
25
#include "crow/version.h"
26
#include "crow/http_connection.h"
27
#include "crow/logging.h"
28
#include "crow/task_timer.h"
29
30
31
namespace crow // NOTE: Already documented in "crow/app.h"
32
{
33
#ifdef CROW_USE_BOOST
34
    namespace asio = boost::asio;
35
    using error_code = boost::system::error_code;
36
#else
37
    using error_code = asio::error_code;
38
#endif
39
    using tcp = asio::ip::tcp;
40
41
    template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42
    class Server
43
    {
44
    public:
45
        Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr):
46
          acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
47
          signals_(io_service_),
48
          tick_timer_(io_service_),
49
          handler_(handler),
50
          concurrency_(concurrency),
51
          timeout_(timeout),
52
          server_name_(server_name),
53
          port_(port),
54
          bindaddr_(bindaddr),
55
          task_queue_length_pool_(concurrency_ - 1),
56
          middlewares_(middlewares),
57
          adaptor_ctx_(adaptor_ctx)
58
0
        {}
59
60
        void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
61
0
        {
62
0
            tick_interval_ = d;
63
0
            tick_function_ = f;
64
0
        }
65
66
        void on_tick()
67
0
        {
68
0
            tick_function_();
69
0
            tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
70
0
            tick_timer_.async_wait([this](const error_code& ec) {
71
0
                if (ec)
72
0
                    return;
73
0
                on_tick();
74
0
            });
75
0
        }
76
77
        void run()
78
0
        {
79
0
            uint16_t worker_thread_count = concurrency_ - 1;
80
0
            for (int i = 0; i < worker_thread_count; i++)
81
0
                io_service_pool_.emplace_back(new asio::io_service());
82
0
            get_cached_date_str_pool_.resize(worker_thread_count);
83
0
            task_timer_pool_.resize(worker_thread_count);
84
85
0
            std::vector<std::future<void>> v;
86
0
            std::atomic<int> init_count(0);
87
0
            for (uint16_t i = 0; i < worker_thread_count; i++)
88
0
                v.push_back(
89
0
                  std::async(
90
0
                    std::launch::async, [this, i, &init_count] {
91
                        // thread local date string get function
92
0
                        auto last = std::chrono::steady_clock::now();
93
94
0
                        std::string date_str;
95
0
                        auto update_date_str = [&] {
96
0
                            auto last_time_t = time(0);
97
0
                            tm my_tm;
98
99
#if defined(_MSC_VER) || defined(__MINGW32__)
100
                            gmtime_s(&my_tm, &last_time_t);
101
#else
102
0
                            gmtime_r(&last_time_t, &my_tm);
103
0
#endif
104
0
                            date_str.resize(100);
105
0
                            size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
106
0
                            date_str.resize(date_str_sz);
107
0
                        };
108
0
                        update_date_str();
109
0
                        get_cached_date_str_pool_[i] = [&]() -> std::string {
110
0
                            if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
111
0
                            {
112
0
                                last = std::chrono::steady_clock::now();
113
0
                                update_date_str();
114
0
                            }
115
0
                            return date_str;
116
0
                        };
117
118
                        // initializing task timers
119
0
                        detail::task_timer task_timer(*io_service_pool_[i]);
120
0
                        task_timer.set_default_timeout(timeout_);
121
0
                        task_timer_pool_[i] = &task_timer;
122
0
                        task_queue_length_pool_[i] = 0;
123
124
0
                        init_count++;
125
0
                        while (1)
126
0
                        {
127
0
                            try
128
0
                            {
129
0
                                if (io_service_pool_[i]->run() == 0)
130
0
                                {
131
                                    // when io_service.run returns 0, there are no more works to do.
132
0
                                    break;
133
0
                                }
134
0
                            }
135
0
                            catch (std::exception& e)
136
0
                            {
137
0
                                CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
138
0
                            }
139
0
                        }
140
0
                    }));
141
142
0
            if (tick_function_ && tick_interval_.count() > 0)
143
0
            {
144
0
                tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
145
0
                tick_timer_.async_wait(
146
0
                  [this](const error_code& ec) {
147
0
                      if (ec)
148
0
                          return;
149
0
                      on_tick();
150
0
                  });
151
0
            }
152
153
0
            port_ = acceptor_.local_endpoint().port();
154
0
            handler_->port(port_);
155
156
157
0
            CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
158
0
            CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
159
160
0
            signals_.async_wait(
161
0
              [&](const error_code& /*error*/, int /*signal_number*/) {
162
0
                  stop();
163
0
              });
164
165
0
            while (worker_thread_count != init_count)
166
0
                std::this_thread::yield();
167
168
0
            do_accept();
169
170
0
            std::thread(
171
0
              [this] {
172
0
                  notify_start();
173
0
                  io_service_.run();
174
0
                  CROW_LOG_INFO << "Exiting.";
175
0
              })
176
0
              .join();
177
0
        }
178
179
        void stop()
180
0
        {
181
0
            shutting_down_ = true; // Prevent the acceptor from taking new connections
182
0
            for (auto& io_service : io_service_pool_)
183
0
            {
184
0
                if (io_service != nullptr)
185
0
                {
186
0
                    CROW_LOG_INFO << "Closing IO service " << &io_service;
187
0
                    io_service->stop(); // Close all io_services (and HTTP connections)
188
0
                }
189
0
            }
190
191
0
            CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
192
0
            io_service_.stop(); // Close main io_service
193
0
        }
194
195
        /// Wait until the server has properly started
196
        void wait_for_start()
197
        {
198
            std::unique_lock<std::mutex> lock(start_mutex_);
199
            while (!server_started_)
200
                cv_started_.wait(lock);
201
        }
202
203
        void signal_clear()
204
        {
205
            signals_.clear();
206
        }
207
208
        void signal_add(int signal_number)
209
0
        {
210
0
            signals_.add(signal_number);
211
0
        }
212
213
    private:
214
        uint16_t pick_io_service_idx()
215
0
        {
216
0
            uint16_t min_queue_idx = 0;
217
218
            // TODO improve load balancing
219
            // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
220
            // even though the max value of this can be only uint16_t as concurrency is uint16_t.
221
0
            for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
222
            // No need to check other io_services if the current one has no tasks
223
0
            {
224
0
                if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
225
0
                    min_queue_idx = i;
226
0
            }
227
0
            return min_queue_idx;
228
0
        }
229
230
        void do_accept()
231
0
        {
232
0
            if (!shutting_down_)
233
0
            {
234
0
                uint16_t service_idx = pick_io_service_idx();
235
0
                asio::io_service& is = *io_service_pool_[service_idx];
236
0
                task_queue_length_pool_[service_idx]++;
237
0
                CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
238
239
0
                auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
240
0
                  is, handler_, server_name_, middlewares_,
241
0
                  get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
242
243
0
                acceptor_.async_accept(
244
0
                  p->socket(),
245
0
                  [this, p, &is, service_idx](error_code ec) {
246
0
                      if (!ec)
247
0
                      {
248
0
                          is.post(
249
0
                            [p] {
250
0
                                p->start();
251
0
                            });
252
0
                      }
253
0
                      else
254
0
                      {
255
0
                          task_queue_length_pool_[service_idx]--;
256
0
                          CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
257
0
                      }
258
0
                      do_accept();
259
0
                  });
260
0
            }
261
0
        }
262
263
        /// Notify anything using `wait_for_start()` to proceed
264
        void notify_start()
265
0
        {
266
0
            std::unique_lock<std::mutex> lock(start_mutex_);
267
0
            server_started_ = true;
268
0
            cv_started_.notify_all();
269
0
        }
270
271
    private:
272
        std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
273
        asio::io_service io_service_;
274
        std::vector<detail::task_timer*> task_timer_pool_;
275
        std::vector<std::function<std::string()>> get_cached_date_str_pool_;
276
        tcp::acceptor acceptor_;
277
        bool shutting_down_ = false;
278
        bool server_started_{false};
279
        std::condition_variable cv_started_;
280
        std::mutex start_mutex_;
281
        asio::signal_set signals_;
282
283
        asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
284
285
        Handler* handler_;
286
        uint16_t concurrency_{2};
287
        std::uint8_t timeout_;
288
        std::string server_name_;
289
        uint16_t port_;
290
        std::string bindaddr_;
291
        std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
292
293
        std::chrono::milliseconds tick_interval_;
294
        std::function<void()> tick_function_;
295
296
        std::tuple<Middlewares...>* middlewares_;
297
298
        typename Adaptor::context* adaptor_ctx_;
299
    };
300
} // namespace crow